在编写、使用策略时,经常会使用一些不常用的K线周期数据。然而交易所、数据源又没有提供这些周期的数据。只能通过使用已有周期的数据进行合成。合成算法已经有一个JavaScript版本了,其实移植一段JavaScript代码为Python版本很简单。接下来我们一起写一个Python版本的K线合成算法。
function GetNewCycleRecords (sourceRecords, targetCycle) { // K线合成函数 var ret = [] // 首先获取源K线数据的周期 if (!sourceRecords || sourceRecords.length < 2) { return null } var sourceLen = sourceRecords.length var sourceCycle = sourceRecords[sourceLen - 1].Time - sourceRecords[sourceLen - 2].Time if (targetCycle % sourceCycle != 0) { Log("targetCycle:", targetCycle) Log("sourceCycle:", sourceCycle) throw "targetCycle is not an integral multiple of sourceCycle." } if ((1000 * 60 * 60) % targetCycle != 0 && (1000 * 60 * 60 * 24) % targetCycle != 0) { Log("targetCycle:", targetCycle) Log("sourceCycle:", sourceCycle) Log((1000 * 60 * 60) % targetCycle, (1000 * 60 * 60 * 24) % targetCycle) throw "targetCycle cannot complete the cycle." } var multiple = targetCycle / sourceCycle var isBegin = false var count = 0 var high = 0 var low = 0 var open = 0 var close = 0 var time = 0 var vol = 0 for (var i = 0 ; i < sourceLen ; i++) { // 获取 时区偏移数值 var d = new Date() var n = d.getTimezoneOffset() if (((1000 * 60 * 60 * 24) - sourceRecords[i].Time % (1000 * 60 * 60 * 24) + (n * 1000 * 60)) % targetCycle == 0) { isBegin = true } if (isBegin) { if (count == 0) { high = sourceRecords[i].High low = sourceRecords[i].Low open = sourceRecords[i].Open close = sourceRecords[i].Close time = sourceRecords[i].Time vol = sourceRecords[i].Volume count++ } else if (count < multiple) { high = Math.max(high, sourceRecords[i].High) low = Math.min(low, sourceRecords[i].Low) close = sourceRecords[i].Close vol += sourceRecords[i].Volume count++ } if (count == multiple || i == sourceLen - 1) { ret.push({ High : high, Low : low, Open : open, Close : close, Time : time, Volume : vol, }) count = 0 } } } return ret }
有JavaScript算法,对于Python其实逐行翻译移植就可以了,遇到JavaScript的内置函数,或者固有方法,对应的去Python中查找对应的方法即可,所以移植还是比较容易的。
算法逻辑完全一模一样,只是JavaScript的函数调用var n = d.getTimezoneOffset()
,移植到Python时,使用Python的time库中的n = time.altzone
代替。其它差异仅仅是语言语法方面的了(例如for循环的使用,布尔值的差别,逻辑与、逻辑非、逻辑或的使用差别等..)。
import time def GetNewCycleRecords(sourceRecords, targetCycle): ret = [] # 首先获取源K线数据的周期 if not sourceRecords or len(sourceRecords) < 2 : return None sourceLen = len(sourceRecords) sourceCycle = sourceRecords[-1]["Time"] - sourceRecords[-2]["Time"] if targetCycle % sourceCycle != 0 : Log("targetCycle:", targetCycle) Log("sourceCycle:", sourceCycle) raise "targetCycle is not an integral multiple of sourceCycle." if (1000 * 60 * 60) % targetCycle != 0 and (1000 * 60 * 60 * 24) % targetCycle != 0 : Log("targetCycle:", targetCycle) Log("sourceCycle:", sourceCycle) Log((1000 * 60 * 60) % targetCycle, (1000 * 60 * 60 * 24) % targetCycle) raise "targetCycle cannot complete the cycle." multiple = targetCycle / sourceCycle isBegin = False count = 0 barHigh = 0 barLow = 0 barOpen = 0 barClose = 0 barTime = 0 barVol = 0 for i in range(sourceLen) : # 获取时区偏移数值 n = time.altzone if ((1000 * 60 * 60 * 24) - (sourceRecords[i]["Time"] * 1000) % (1000 * 60 * 60 * 24) + (n * 1000)) % targetCycle == 0 : isBegin = True if isBegin : if count == 0 : barHigh = sourceRecords[i]["High"] barLow = sourceRecords[i]["Low"] barOpen = sourceRecords[i]["Open"] barClose = sourceRecords[i]["Close"] barTime = sourceRecords[i]["Time"] barVol = sourceRecords[i]["Volume"] count += 1 elif count < multiple : barHigh = max(barHigh, sourceRecords[i]["High"]) barLow = min(barLow, sourceRecords[i]["Low"]) barClose = sourceRecords[i]["Close"] barVol += sourceRecords[i]["Volume"] count += 1 if count == multiple or i == sourceLen - 1 : ret.append({ "High" : barHigh, "Low" : barLow, "Open" : barOpen, "Close" : barClose, "Time" : barTime, "Volume" : barVol, }) count = 0 return ret