本实验使用环境为Anaconda3 Jupyter,调用Sklearn包,调用keras包,请提前准备好。
主要有keras包、numpy包、metrics包、pandas包等。
import csv import numpy as np import time from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score from sklearn.metrics import confusion_matrix from sklearn.metrics import classification_report from sklearn.metrics import explained_variance_score from sklearn import metrics from sklearn.svm import SVR import matplotlib.pyplot as plt from pandas import DataFrame from pandas import Series from pandas import concat from pandas import read_csv from pandas import datetime from sklearn.metrics import mean_squared_error from sklearn.preprocessing import MinMaxScaler from keras.models import Sequential from keras.layers import Dense from keras.layers import LSTM from math import sqrt from matplotlib import pyplot import numpy
其中data为全部数据、traffic_feature为特征集、traffic_target目标集
data=[] traffic_feature=[] traffic_target=[] csv_file = csv.reader(open('GoodData.csv')) for content in csv_file: content=list(map(float,content)) if len(content)!=0: data.append(content) traffic_feature.append(content[0:4]) traffic_target.append(content[-1]) traffic_feature=np.array(traffic_feature) traffic_target=np.array(traffic_target) data=np.array(data)
对数据进行分割,本文使用70%为分割点。
feature_train=traffic_feature[0:int(len(traffic_feature)*0.7)] feature_test=traffic_feature[int(len(traffic_feature)*0.7):int(len(traffic_feature))] target_train=traffic_target[0:int(len(traffic_target)*0.7)] target_test =traffic_target[int(len(traffic_target)*0.7):int(len(traffic_target))]
对后30%的目标值继续分割,分割点仍然为70%,预留。
target_test_last=target_test[int(len(target_test)*0.7):int(len(target_test))]
使用StandardScaler()方法将特征数据标准化归一化。
scaler = StandardScaler() # 标准化转换 scaler.fit(traffic_feature) # 训练标准化对象 traffic_feature= scaler.transform(traffic_feature) # 转换数据集
class HiddenLayer: def __init__(self, x, num): # x:输入矩阵 num:隐含层神经元个数 row = x.shape[0] columns = x.shape[1] rnd = np.random.RandomState(9999) self.w = rnd.uniform(-1, 1, (columns, num)) # self.b = np.zeros([row, num], dtype=float) # 随机设定隐含层神经元阈值,即bi的值 for i in range(num): rand_b = rnd.uniform(-0.4, 0.4) # 随机产生-0.4 到 0.4 之间的数 for j in range(row): self.b[j, i] = rand_b # 设定输入层与隐含层的连接权值 self.h = self.sigmoid(np.dot(x, self.w) + self.b) # 计算隐含层输出矩阵H self.H_ = np.linalg.pinv(self.h) # 获取H的逆矩阵 # print(self.H_.shape) # 定义激活函数g(x) ,需为无限可微函数 def sigmoid(self, x): print(x) return 1.0 / (1 + np.exp(-x)) ''' 若进行分析的训练数据为回归问题,则使用该方式 ,计算隐含层输出权值,即β ''' def regressor_train(self, T): C = 2 I = len(T) sub_former = np.dot(np.transpose(self.h), self.h) + I / C all_m = np.dot(np.linalg.pinv(sub_former), np.transpose(self.h)) T = T.reshape(-1, 1) self.beta = np.dot(all_m, T) return self.beta """ 计算隐含层输出权值,即β """ def classifisor_train(self, T): en_one = OneHotEncoder() # print(T) T = en_one.fit_transform(T.reshape(-1, 1)).toarray() # 独热编码之后一定要用toarray()转换成正常的数组 # print(T) C = 3 I = len(T) sub_former = np.dot(np.transpose(self.h), self.h) + I / C all_m = np.dot(np.linalg.pinv(sub_former), np.transpose(self.h)) self.beta = np.dot(all_m, T) return self.beta def regressor_test(self, test_x): b_row = test_x.shape[0] h = self.sigmoid(np.dot(test_x, self.w) + self.b[:b_row, :]) result = np.dot(h, self.beta) return result def classifisor_test(self, test_x): b_row = test_x.shape[0] h = self.sigmoid(np.dot(test_x, self.w) + self.b[:b_row, :]) result = np.dot(h, self.beta) result = [item.tolist().index(max(item.tolist())) for item in result] return result
跑起来~
设置神经元个数为8,可以自行调优。
import matplotlib.pyplot as plt from sklearn.metrics import explained_variance_score a = HiddenLayer(feature_train,8) a.regressor_train(target_train) result = a.regressor_test(feature_test) plt.plot(result)#测试数组 plt.plot(target_test)#测试数组 plt.legend(['ELM','TRUE']) fig = plt.gcf() fig.set_size_inches(18.5, 10.5) plt.title("ELM") # 标题 plt.show() print("EVS:",explained_variance_score(target_test,result))
结果如下:
EVS等于0.8705
a=[]#真实值 for i in target_test: a.append(i) b=[]#预测值 for i in result: b.append(i[0]) c=[]#残差值 num=[] for inx,i in enumerate(a): c.append(b[inx]-i) num.append(inx) plt.plot(c)#残差 fig = plt.gcf() fig.set_size_inches(18.5,5) plt.xlim(0,1560) plt.title("Residual Signal") # 标题 plt.show()
残差值变化如下:
将残差的后30%截取,预留
result_last=b[int(len(b)*0.7):int(len(b))]
使用残差值的前70%作为测试集,使用后30%作为验证集。
train, test =c[0:int(len(c)*0.7)], c[int(len(c)*0.7):int(len(c))] def timeseries_to_supervised(data, lag=1): df = DataFrame(data) columns = [df.shift(i) for i in range(1, lag+1)] columns.append(df) df = concat(columns, axis=1) df.fillna(0, inplace=True) return df def fit_lstm(train, batch_size, nb_epoch, neurons): X, y = train[:, 0:-1], train[:, -1] X = X.reshape(X.shape[0], 1, X.shape[1]) model = Sequential() model.add(LSTM(neurons, batch_input_shape=(batch_size, X.shape[1], X.shape[2]), stateful=True)) model.add(Dense(1)) model.compile(loss='mean_squared_error', optimizer='adam') for i in range(nb_epoch): model.fit(X, y, epochs=1, batch_size=batch_size, verbose=0, shuffle=False) model.reset_states() return model # make a one-step forecast def forecast_lstm(model, batch_size, X): X = X.reshape(1, 1, len(X)) yhat = model.predict(X, batch_size=batch_size) return yhat[0,0] c2d=[] for i in c: c2d.append([i,i]) scaler = StandardScaler() # 标准化转换 scaler.fit(c2d) # 训练标准化对象 supervised= scaler.transform(c2d) # 转换数据集 c1d=[] for j in supervised: c1d.append(j[0]) supervised = timeseries_to_supervised(c1d, 1) train_scaled, test_scaled =supervised[0:int(len(supervised)*0.7)], supervised[int(len(supervised)*0.7):int(len(supervised))] train_scaled=np.array(train_scaled) test_scaled=np.array(test_scaled) print("开始") # fit the model lstm_model = fit_lstm(train_scaled, 1, 30, 4) # forecast the entire training dataset to build up state for forecasting train_reshaped = train_scaled[:, 0].reshape(len(train_scaled), 1, 1) lstm_model.predict(train_reshaped, batch_size=1) # walk-forward validation on the test data predictions = list() for i in range(len(test_scaled)): # make one-step forecast X, y = test_scaled[i, 0:-1], test_scaled[i, -1] yhat = forecast_lstm(lstm_model, 1, X) # store forecast predictions.append(yhat) print("结束")
经过数据格式改变后,残差预测效果如下。
predictions2d=[] for i in predictions: predictions2d.append([i,i]) predictions2d predictions2d=scaler.inverse_transform(predictions2d) predictions1d=[] for j in predictions2d: predictions1d.append(j[0])
# report performance rmse = sqrt(mean_squared_error(test, predictions1d)) print('RMSE: %.3f' % rmse) # line plot of observed vs predicted fig = pyplot.gcf() fig.set_size_inches(18.5, 10.5) pyplot.plot(test) pyplot.plot(predictions1d) pyplot.show()
ELM:
print("mse:",metrics.mean_squared_error(target_test_last,result_last)) print("R2:",metrics.r2_score(target_test_last,result_last))
ELM-LSTM:
test1=np.array(test) predictions1d1=np.array(predictions1d) result1=result_last-test1+predictions1d1 print("mse:",metrics.mean_squared_error(target_test_last,result1)) print("R2:",metrics.r2_score(target_test_last,result1))
COMPARE:
x=range(1089,1556,1) plt.plot(x,target_test_last,marker=',') plt.plot(x,result_last,marker='+') plt.plot(x,result1,marker='x') plt.legend(['True','ELM','ELM_LSTM']) fig = plt.gcf() fig.set_size_inches(18.5, 10.5) plt.title("COMPARE") # 标题 plt.show()
结果如下:
由上可知ELM-LSTM比ELM的R方要高、MSE要低。且下图显示ELM-LSTM比ELM更贴近真实值。
提供思路