很多同学在入门强化学习的时候都会遇到困难,那我这里就简单介绍一下应该如何入门强化学习,并以开源代码为例详解强化学习实战。
这边首先推荐莫凡python,人工智能的初学者如果不知道莫凡,那可真是纵称英雄也枉然。
他的官网长这样,链接在此!
链接: https://mofanpy.com/.
莫凡python的内容以简单著称,每个视频只有短短几分钟,长一点的也就20分钟左右,能够迅速的帮你入门强化学习,但是仅仅看这个是不够的,还需要扎实的理论基础,那就需要转战B站。
Bilibili有UCL大学的David Silver教授教学的Reinforcement learning课程。
链接在此!链接: https://www.bilibili.com/video/BV1kb411i7KG?from=search&seid=15348457829154001765&spm_id_from=333.337.0.0.
David Silver教授推导出了DPG公式,非常厉害的一个人物,所以他的课程必听!
这个课程主要以公式推导为主,能够让初学者对强化学习的基础公式推导有一定的了解,例如贝尔曼公式。
当然B站李宏毅老师的强化学习课程也非常值得一听,但是我个人觉得他和莫凡python的课程差不多,但是他没有代码层面的教学,他的课程更多是在理论推导和科普介绍…
最后最后,我要推荐一本书籍,这本书籍也非常适用于初学者,Sutton的书很厚,我个人觉得不适用于初学者。
清华大学出版社邹伟老师出品的强化学习,结合了很多莫凡Python的代码,能够很好的将理论与实际相结合,很多代码都有注释,能够迅速帮初学者入门。
如果说到强化学习,那么必须要会的就是Python和Gym
代码从Github上搜索DDPG就能够获取,这里采用的是floodsung的项目。
关键的几个文件:
actor_network critic.network ddpg//用于将各个模块连在一起 gym_ddpg//主文件,用于将DDPG与环境互动 ou_noise//Ou噪声 replay_buffer//经验存储
代码如下(示例):
//ou_noise import numpy as np import numpy.random as nr class OUNoise: """docstring for OUNoise""" def __init__(self,action_dimension,mu=0, theta=0.15, sigma=0.2): self.action_dimension = action_dimension self.mu = mu self.theta = theta self.sigma = sigma self.state = np.ones(self.action_dimension) * self.mu self.reset() def reset(self): self.state = np.ones(self.action_dimension) * self.mu def noise(self): //按照公式建立OU噪声,更符合时间序列的样本 x = self.state dx = self.theta * (self.mu - x) + self.sigma * nr.randn(len(x)) self.state = x + dx return self.state //以下内容用不到,只是看一看ou噪声是啥 if __name__ == '__main__': ou = OUNoise(3) states = [] for i in range(1000): states.append(ou.noise()) import matplotlib.pyplot as plt plt.plot(states) plt.show()
//replay_buffer from collections import deque import random class ReplayBuffer(object): def __init__(self, buffer_size): self.buffer_size = buffer_size self.num_experiences = 0 self.buffer = deque() def get_batch(self, batch_size)://随机批量获取,用于训练 # Randomly sample batch_size examples return random.sample(self.buffer, batch_size) def size(self): return self.buffer_size //将agent与环境互动的情况存储在buffer中,如果超过最大存储量,就删去最早的。 def add(self, state, action, reward, new_state, done): experience = (state, action, reward, new_state, done) if self.num_experiences < self.buffer_size: self.buffer.append(experience) self.num_experiences += 1 else: self.buffer.popleft() self.buffer.append(experience) //计数,一般用不到 def count(self): # if buffer is full, return buffer size # otherwise, return experience counter return self.num_experiences //删除,一般用不到 def erase(self): self.buffer = deque() self.num_experiences = 0
//actor_network import tensorflow as tf import numpy as np import math # Hyper Parameters LAYER1_SIZE = 400 LAYER2_SIZE = 300 LEARNING_RATE = 1e-4 TAU = 0.001 BATCH_SIZE = 64 class ActorNetwork: """docstring for ActorNetwork""" def __init__(self,sess,state_dim,action_dim): self.sess = sess self.state_dim = state_dim self.action_dim = action_dim # create actor network self.state_input,self.action_output,self.net = self.create_network(state_dim,action_dim) # create target actor network self.target_state_input,self.target_action_output,self.target_update,self.target_net = self.create_target_network(state_dim,action_dim,self.net) # define training rules self.create_training_method() self.sess.run(tf.initialize_all_variables()) self.update_target() #self.load_network() //计算梯度,gradients函数必须要有参数是相关的才能求,如果是两个数值不能够使用这个函数 def create_training_method(self): self.q_gradient_input = tf.placeholder("float",[None,self.action_dim]) self.parameters_gradients = tf.gradients(self.action_output,self.net,-self.q_gradient_input) self.optimizer = tf.train.AdamOptimizer(LEARNING_RATE).apply_gradients(zip(self.parameters_gradients,self.net)) //建立actor网络,(state_dim,400),(400,300),(300,action_dim)建立了这样的线性网络。 def create_network(self,state_dim,action_dim): layer1_size = LAYER1_SIZE layer2_size = LAYER2_SIZE state_input = tf.placeholder("float",[None,state_dim]) W1 = self.variable([state_dim,layer1_size],state_dim) b1 = self.variable([layer1_size],state_dim) W2 = self.variable([layer1_size,layer2_size],layer1_size) b2 = self.variable([layer2_size],layer1_size) W3 = tf.Variable(tf.random_uniform([layer2_size,action_dim],-3e-3,3e-3)) b3 = tf.Variable(tf.random_uniform([action_dim],-3e-3,3e-3)) layer1 = tf.nn.relu(tf.matmul(state_input,W1) + b1) layer2 = tf.nn.relu(tf.matmul(layer1,W2) + b2) action_output = tf.tanh(tf.matmul(layer2,W3) + b3) return state_input,action_output,[W1,b1,W2,b2,W3,b3] //建立目标网络,同上 def create_target_network(self,state_dim,action_dim,net): state_input = tf.placeholder("float",[None,state_dim]) ema = tf.train.ExponentialMovingAverage(decay=1-TAU) target_update = ema.apply(net) target_net = [ema.average(x) for x in net] layer1 = tf.nn.relu(tf.matmul(state_input,target_net[0]) + target_net[1]) layer2 = tf.nn.relu(tf.matmul(layer1,target_net[2]) + target_net[3]) action_output = tf.tanh(tf.matmul(layer2,target_net[4]) + target_net[5]) return state_input,action_output,target_update,target_net //更新 def update_target(self): self.sess.run(self.target_update) //训练,这个函数很重要 def train(self,q_gradient_batch,state_batch): self.sess.run(self.optimizer,feed_dict={ self.q_gradient_input:q_gradient_batch, self.state_input:state_batch }) //用于生成批量state的动作 def actions(self,state_batch): return self.sess.run(self.action_output,feed_dict={ self.state_input:state_batch }) //用于生成单个state的动作,例如与环境的互动中 def action(self,state): return self.sess.run(self.action_output,feed_dict={ self.state_input:[state] })[0] //目标期望动作 def target_actions(self,state_batch): return self.sess.run(self.target_action_output,feed_dict={ self.target_state_input:state_batch }) # f fan-in size def variable(self,shape,f): return tf.Variable(tf.random_uniform(shape,-1/math.sqrt(f),1/math.sqrt(f))) ''' //agent训练到一定程度时存储网络,下次用的时候加载网络 def load_network(self): self.saver = tf.train.Saver() checkpoint = tf.train.get_checkpoint_state("saved_actor_networks") if checkpoint and checkpoint.model_checkpoint_path: self.saver.restore(self.sess, checkpoint.model_checkpoint_path) print "Successfully loaded:", checkpoint.model_checkpoint_path else: print "Could not find old network weights" def save_network(self,time_step): print 'save actor-network...',time_step self.saver.save(self.sess, 'saved_actor_networks/' + 'actor-network', global_step = time_step) '''
由于critic网络和actor网络在网络结构上是一样的,就不重复介绍了。
//ddpg import gym import tensorflow as tf import numpy as np from ou_noise import OUNoise from critic_network import CriticNetwork from actor_network_bn import ActorNetwork //注意加载的头文件 from replay_buffer import ReplayBuffer # Hyper Parameters: REPLAY_BUFFER_SIZE = 1000000 REPLAY_START_SIZE = 10000 BATCH_SIZE = 64 GAMMA = 0.99 class DDPG: """docstring for DDPG""" def __init__(self, env): self.name = 'DDPG' # name for uploading results self.environment = env # Randomly initialize actor network and critic network # with both their target networks self.state_dim = env.observation_space.shape[0] self.action_dim = env.action_space.shape[0] self.sess = tf.InteractiveSession() self.actor_network = ActorNetwork(self.sess,self.state_dim,self.action_dim) self.critic_network = CriticNetwork(self.sess,self.state_dim,self.action_dim) # initialize replay buffer self.replay_buffer = ReplayBuffer(REPLAY_BUFFER_SIZE) # Initialize a random process the Ornstein-Uhlenbeck process for action exploration self.exploration_noise = OUNoise(self.action_dim) def train(self): #print "train step",self.time_step # Sample a random minibatch of N transitions from replay buffer //从经验池里面提取数据用来训练 minibatch = self.replay_buffer.get_batch(BATCH_SIZE) state_batch = np.asarray([data[0] for data in minibatch]) action_batch = np.asarray([data[1] for data in minibatch]) reward_batch = np.asarray([data[2] for data in minibatch]) next_state_batch = np.asarray([data[3] for data in minibatch]) done_batch = np.asarray([data[4] for data in minibatch]) # for action_dim = 1 action_batch = np.resize(action_batch,[BATCH_SIZE,self.action_dim]) # Calculate y_batch 计算y_batch,实际上就是在运用TD——error公式 next_action_batch = self.actor_network.target_actions(next_state_batch) q_value_batch = self.critic_network.target_q(next_state_batch,next_action_batch) y_batch = [] for i in range(len(minibatch)): if done_batch[i]: y_batch.append(reward_batch[i]) else : y_batch.append(reward_batch[i] + GAMMA * q_value_batch[i]) y_batch = np.resize(y_batch,[BATCH_SIZE,1]) # Update critic by minimizing the loss L 训练Critic网络 self.critic_network.train(y_batch,state_batch,action_batch) # Update the actor policy using the sampled gradient: action_batch_for_gradients = self.actor_network.actions(state_batch) q_gradient_batch = self.critic_network.gradients(state_batch,action_batch_for_gradients) //训练actor网络 self.actor_network.train(q_gradient_batch,state_batch) # Update the target networks 通过滑动的方式更新 self.actor_network.update_target() self.critic_network.update_target() //加噪声 def noise_action(self,state): # Select action a_t according to the current policy and exploration noise action = self.actor_network.action(state) return action+self.exploration_noise.noise() def action(self,state): action = self.actor_network.action(state) return action //这一步是存储经验,并根据经验值决定是否开始训练 def perceive(self,state,action,reward,next_state,done): # Store transition (s_t,a_t,r_t,s_{t+1}) in replay buffer self.replay_buffer.add(state,action,reward,next_state,done) # Store transitions to replay start size then start training if self.replay_buffer.count() > REPLAY_START_SIZE: self.train() #if self.time_step % 10000 == 0: #self.actor_network.save_network(self.time_step) #self.critic_network.save_network(self.time_step) # Re-iniitialize the random process when an episode ends //每一回合都要重新更换探索网络的噪声 if done: self.exploration_noise.reset()
//gym_ddpg import filter_env //这句可以直接删掉 from ddpg import * import gc gc.enable() ENV_NAME = 'InvertedPendulum-v1' //这边可以换一个例如 Pendulum-v0 EPISODES = 100000 TEST = 10 def main(): #env = filter_env.makeFilteredEnv(gym.make(ENV_NAME)) env=gym.make(ENV_NAME) env = env.unwrapped env.seed(1) agent = DDPG(env) #agent = DDPG(env) #env.monitor.start('experiments/' + ENV_NAME,force=True) //打#的三句话是原来的,我这里改成自己的会更合适。 //回合开始 for episode in xrange(EPISODES): state = env.reset() #print "episode:",episode # Train for step in xrange(env.spec.timestep_limit): action = agent.noise_action(state)//加噪声的动作 next_state,reward,done,_ = env.step(action)//agent与环境互动 agent.perceive(state,action,reward,next_state,done)//存储经验并训练 state = next_state//更新状态 if done: break # Testing:测试阶段和训练阶段相同 if episode % 100 == 0 and episode > 100: total_reward = 0 for i in xrange(TEST): state = env.reset() for j in xrange(env.spec.timestep_limit): #env.render() action = agent.action(state) # direct action for test state,reward,done,_ = env.step(action) total_reward += reward if done: break ave_reward = total_reward/TEST print 'episode: ',episode,'Evaluation Average Reward:',ave_reward env.close() if __name__ == '__main__': main()
以上就是本文的全部内容啦,如果喜欢的就点个赞吧!
0基础初学者学习强化学习我觉得大概花费一个月的时间是比较合适的,如果学习无人驾驶车辆的同学,强烈建议使用Carla仿真软件!个人认为强化学习最难的还是设计奖励函数,为了奖励函数的设计,还专门有一类方法,逆强化学习,这也是一门很深的学问啊!