发布日期:2024-08-26 06:04 点击次数:180
空洞妹妹 自慰
本文先容了何如通过MetaTrader5来回平台使用PyTorch Lightning和PyTorch Forecasting框架来终了基于神经汇注的金融时代序列展望。
在本文中,咱们还将讲明遴荐这两个框架的原因以及咱们使用的数据神情。
启动化
领先,咱们需要导入所需的库。这些库包括MetaTrader5(用于与MT5终局交互)、PyTorch Lightning(用于查验模子)以过甚他一些用于数据惩处和可视化的库。
import MetaTrader5 as mt5
import lightning.pytorch as pl
from lightning.pytorch.callbacks import EarlyStopping
import matplotlib.pyplot as plt
import pandas as pd
from pytorch_forecasting import Baseline, NHiTS, TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder
from pytorch_forecasting.metrics import MAE, SMAPE, MQF2DistributionLoss, QuantileLoss
from lightning.pytorch.tuner import Tuner
接下来,咱们需要启动化MetaTrader5。这是通过调用mt.initialize()函数来完成的。如果你不成简单地使用它来启动化它,你需要将MT5终局的旅途看成参数传递给这个函数(在示例中“D:\Project\mt\MT5\terminal64.exe”是我的个东说念主旅途位置,在实质诈欺中你需要将它建树到你我方的旅途位置)。如果启动化生效,函数将复返True,不然复返False。
if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"):
print('initialize() failed!')
else:
print(mt.version())
mt.symbols_total()函数用于获取MT5终局中可用的可来回品种的总额。咱们不错用它来判断咱们是否草率正确地赢得数据。如果总额大于0,咱们不错使用mt.copy_rates_from_pos()函数来获取指定来回品种的历史数据。在本例中,咱们赢得了“GOLD_micro”品种的“mt_data_len”M15(15分钟)周期数据的最新长度。
sb=mt.symbols_total()
rts=None
if sb > 0:
rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len)
mt.shutdown()
临了,咱们使用mt.shutdown()函数关闭与MT5终局的伙同,并将赢得的数据退换为Pandas DataFrame神情。
mt.shutdown()
rts_fm=pd.DataFrame(rts)
现时,让咱们接头何如对从MT5终局赢得的数据进行预惩处。
咱们需要先将时代戳退换为日历:妹妹 自慰
rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s')
在这里,咱们不再阵势何如秀丽数据。你不错在我之前的两篇著作中找到智商(本文简介中有著作贯穿)。为了简明地演示何如使用展望模子,咱们只需将每个“max_encoder_length+2max_prediction_length”数据片断永别为一组。每组有一个从0到“max_encoder_length+2max_prediction_length-1”的序列,并填充它们。通过这种方式,咱们将所需的标签添加到原始数据中。领先,咱们需要将原始时代索引(即DataFrame的索引)退换为时代索引。计较原始时代索引的尾数除以(max_encoder_length+2max_prediction_length),并将效力用作新的时代索引。这将时代索引映射到从0到“max_encoder_length+2*max_prediction_length-1”的界限内:
rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length)
咱们还需要将原始时代索引退换为一个组。计较原始时代指数除以“max_encoder_length+2*max_prediction_length”,并将效力用作新组:
rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
咱们将数据预惩处部分封装到一个函数中。咱们只需要将咱们需要赢得的数据长度传递给它,它就不错完成数据预惩处职责:
def get_data(mt_data_len:int):
if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"):
print('initialize() failed!')
else:
print(mt.version())
sb=mt.symbols_total()
rts=None
if sb > 0:
rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len)
mt.shutdown()
# print(len(rts))
rts_fm=pd.DataFrame(rts)
rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s')
rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length)
rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
return rts_fm
重写 pytorch_forecasting.TimeSeriesDataSet 类
重写pytorch_foredicting中的to_datalader()函数。这允许您截至是否对数据进行混洗以及是否丢弃临了一组批惩处(主淌若为了能干临了一组数据长度不及导致的不可展望空幻)。以下是您的操作智商:
class New_TmSrDt(TimeSeriesDataSet):
def to_dataloader(self, train: bool = True,
batch_size: int = 64,
batch_sampler: Sampler | str = None,
shuffle:bool=False,
drop_last:bool=False,
性爱经历**kwargs) -> DataLoader:
default_kwargs = dict(
shuffle=shuffle,
drop_last=drop_last, #modification
collate_fn=self._collate_fn,
batch_size=batch_size,
batch_sampler=batch_sampler,
)
default_kwargs.update(kwargs)
kwargs = default_kwargs
if kwargs["batch_sampler"] is not None:
sampler = kwargs["batch_sampler"]
if isinstance(sampler, str):
if sampler == "synchronized":
kwargs["batch_sampler"] = TimeSynchronizedBatchSampler(
SequentialSampler(self),
batch_size=kwargs["batch_size"],
shuffle=kwargs["shuffle"],
drop_last=kwargs["drop_last"],
)
else:
raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler")
del kwargs["batch_size"]
del kwargs["shuffle"]
del kwargs["drop_last"]
return DataLoader(self,**kwargs)
此代码创建从TimeSeriesDataSet汲取的新类new_TmSrDt。to_datalader()函数随后在这个新类中被重写,以包括shuffle和drop_last参数。这么妹妹 自慰,您就不错更好地截至数据加载经由。请记取在代码中使用New_TmSrDt替换TimeSeriesDataSet的实例。