分布式多进程加速DQN算法
分布式多进程CPU无限加速Deep Q-Learning Network
分布式多进程CPU无限加速Deep Q-Learning Network
意义:python语言被大家吐槽慢已经由来已久,python由于GIL(全局解释器锁,GIL)的存在,使得我们编写的python程序只能同时由一个CPU处理。而现在都2022年底了,大家的电脑随随便便哪一个不是8核CPU以上的电脑?所以,如果我们不启用多进程功能的话,真的是太浪费我们的生命了,试问,人生能有几个3秒?此外,如果仅使用python默认的单进程去训练智能体的话,如果环境复杂了,训练1000回合需要12小时,那么就很容易把我们急死(急死——等着等着,就慢慢的急死了)。但是,如果我们用了multiprocessing库,就可以让我们的8核电脑同时开启6个核,那么原本12小时的训练,现在就只需x小时(0<x<12)。
如何将多进程与DQN结合?
目前多进程与深度强化学习算法结合的思路大致有以下两种:
第一种,最容易想到的一种方案:
1.多个子进程训练网络,每个子进程独立与环境交互,采集数据,拥有单独的记忆库,并计算相应的网络权重参数
2.在主进程将子进程网络的权重取平均更新到net
3.再将net传入子进程,回到1
第二种,最主流的一种方案:
1.多个子进程不训练网络,只是拿到主进程的网络后去探索环境,并将所得数据通过pipe技术(pipe技术,一种进程间的通信技术)传回主进程
2.主进程将所有子进程交互得到的数据扔到记忆库中供网络训练
3.将更新后的net再传到子进程,回到1
现在,我们先开始实验第二种方案。为什么不从第一个方案开始讲解?因为第一种方案我并不是很感兴趣,所以就先把第一种方案放在以后吧,以后闲了再搞。
ok,首先,方案二的思路是——多个子进程与环境独立交互,毫无疑问,我们需要先初始化N个环境与N个子进程,为的就是同时运行这些环境。
for i in range(PROCESS_NUM):
p = mp.Process(target=process_env, args=("MountainCar-v0", '进程{}'.format(i),))
p.start()
其中的process_env是:
def process_env(env_name,name):
print(f'子进程:{name}({os.getpid()})开始...')
env=gym.make(env_name).unwrapped
s=env.reset()
a=Agent()
我们到底有没有成功开启多进程呢?测试结果如下:
其次,我们还需要定义一个Agent类,N个不同的环境应该有N个不同的Agent与其交互,即choose_action函数独属于每个agent,但记忆库与learn函数是所有agent共用的。
至此,代码如下,拿走不谢,复制即用,不行砍我!
# -*- coding: utf-8 -*-
#开发者:Bright Fang
#开发时间:2022/10/29 15:24
import torch
import torch.nn as nn
import torch.nn.functional as F
import multiprocessing as mp
from multiprocessing import Pipe
from copy import deepcopy
import numpy as np
import gym
from matplotlib import pyplot as plt
import os
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
Greedy=0.9
MemoryCapacity=2000
LearnSwitch=200
Batch=64
Gamma=0.9
LearningRate=0.01
RENDER=False
Switch=0
PROCESS_NUM=4
env = gym.make("CartPole-v1").unwrapped
'''CartPole的环境状态特征量为推车的位置x、速度x_dot、杆子的角度theta、角速度theta_dot,状态是这四个状态特征所组成的,情况将是无限个,是连续的(即无限个状态),动作是推车向左为0,向右为1,(离散的,有限个,2个)'''
state_number=env.observation_space.shape[0]
action_number=env.action_space.n
def process_env(env_name,pipe):
env=gym.make(env_name).unwrapped
s=env.reset()
reward=0
while True:
net=pipe.recv()
a=Agent(net.cpu())
action=a.choose_action(s,Greedy)
s_, r, done, info = env.step(action)
# env.render()
x, x_dot, theta, theta_dot = s_
r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
r = 3 * r1 + r2
# pos,vel=s_
# if pos>=0.5:
# r=100
reward=reward+r
data=np.hstack((s,action,r,s_))
pipe.send(data)
s=s_
if done:
s = env.reset()
print('r',reward)
if reward>-150:
save_data={'net':a.real_net.state_dict()}
torch.save(save_data,"E:\process_model_mountaincar.pth")
reward=0
#现在子进程永不停止
# if done:
# break
'''搭建神经网络'''
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.in_to_y1=nn.Linear(state_number,20)
self.in_to_y1.weight.data.normal_(0,0.1)
self.in_to_y2=nn.Linear(20,10)
self.in_to_y2.weight.data.normal_(0,0.1)
self.out=nn.Linear(10,action_number)
self.out.weight.data.normal_(0,0.1)
def forward(self,inputstate):
inputstate=self.in_to_y1(inputstate)
inputstate=F.relu(inputstate)
inputstate=self.in_to_y2(inputstate)
inputstate=torch.sigmoid(inputstate)
action_Q=self.out(inputstate)
return action_Q
'''第二步 定义选择动作函数,它接受1*2的状态,输出动作'''
class DQN():
def __init__(self):
self.real_net,self.target_net=Net().cuda(),Net().cuda()
self.memory_counter=0
self.mem=np.zeros((MemoryCapacity,state_number*2+2))
self.learn_step=0
self.random_step=0
self.act_his=0
self.lossfunc=nn.MSELoss()
self.optimizer=torch.optim.Adam(self.real_net.parameters(),lr=LearningRate)
'''第三步 定义记忆库,从记忆库里选取动作'''
def store_transition(self,tran):
# tran=np.hstack((s,a,r,s_))
index=self.memory_counter%MemoryCapacity
self.mem[index,:]=tran
self.memory_counter+=1
'''第四步 写Qlearning算法'''
def learn(self):
if self.learn_step%LearnSwitch==0:
self.target_net.load_state_dict(self.real_net.state_dict())
self.learn_step+=1
sample_index=np.random.choice(MemoryCapacity,Batch)
new_mem=self.mem[sample_index,:]
b_s=torch.FloatTensor(new_mem[:,0:state_number]).cuda()
b_a=torch.LongTensor(new_mem[:,state_number:state_number+1]).cuda()
b_r=torch.FloatTensor(new_mem[:,state_number+1:state_number+2]).cuda()
b_s_=torch.FloatTensor(new_mem[:,-state_number:]).cuda()
real_Q=self.real_net(b_s).gather(1,b_a)
next_Q=self.target_net(b_s_).detach()
target_Q=b_r+Gamma*next_Q.max(1)[0].view(Batch,1)
loss=self.lossfunc(real_Q,target_Q)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
class Agent():
def __init__(self,net):
self.real_net=net
self.optimizer = torch.optim.Adam(self.real_net.parameters(), lr=LearningRate)
def choose_action(self,inputstate,G=Greedy):
inputstate=torch.unsqueeze(torch.FloatTensor(inputstate), 0)
if np.random.uniform()<G:
action_Q=self.real_net.forward(inputstate)
action=torch.max(action_Q,1)[1].item()
else:
action = np.random.randint(0, action_number)
return action
'''训练'''
if __name__ == '__main__':
if Switch==0:
print("训练中...")
net=Net()#在主进程里定义一个net,让所有的子进程的神经网络的权重初始值相同
f = DQN()
#让主进程里的real_net网络和子进程的real_net网络参数在初始时 相同
f.real_net.load_state_dict(net.state_dict())
pipe_dict = dict((i, (child_pipe, main_pipe)) for i in range(PROCESS_NUM) for child_pipe, main_pipe in (Pipe(),))
[pipe_dict[j][1].send(net) for j in range(PROCESS_NUM)]
for i in range(PROCESS_NUM):
p=mp.Process(target=process_env,args=("CartPole-v1",pipe_dict[i][0],))
p.start()
while True:
# if data[3]>50:
# print('**************************',data[3])
for j in range(PROCESS_NUM):
data=pipe_dict[j][1].recv()
f.store_transition(data)
if f.memory_counter>MemoryCapacity and f.memory_counter%5==0:
f.learn()
net.load_state_dict(f.real_net.state_dict())
[pipe_dict[j][1].send(net) for j in range(PROCESS_NUM)]#主进程的网络发到子进程
else:
'''使用训练好的网络参数离线测试'''
print("测试DQN中...")
c=DQN()
checkpoint = torch.load("E:\process_model_mountaincar.pth")
c.real_net.load_state_dict(checkpoint['net'])
for j in range(10):
state = env.reset()
total_rewards = 0
while True:
env.render()
state = torch.unsqueeze(torch.FloatTensor(state), 0).cuda()
action_Q = c.real_net.forward(state)
action = torch.max(action_Q, 1)[1].item()
new_state, reward, done, info = env.step(action) # 执行动作
total_rewards += reward
if done:
print("Score", total_rewards)
break
state = new_state
env.close()
代码用法:
先把Switch标志为赋为0,先训练,训练个29秒就直接停止训练(不要等了),因为神经网络的参数已经被我们保存在E盘里了。然后,把Switch标志为赋为1,就可以看到训练的效果了。
remark:
1.神经网络的参数被保存在了电脑E盘里,别告诉我你的电脑没有E盘。没有自己改代码。
2.我感觉版本信息不重要,但还是给一下以供参考。我用的gym版本:0.20.0;我用的pytorch版本:1.10.0+cu113。
多进程CPU加速的效果测试:
1.首先让我们来测试代码的收敛性,收敛性对强化学习有多重要,懂得都懂~~所以,要是不收敛,赶紧让博主滚蛋,别搁这浪费大家的时间!!
我觉得让我体现它的收敛性,一切说辞都是无力,不如让你们自己看来的实在。
多进程的CartPole环境收敛性测试
2.上面的视频的确能说明一个问题:那就是————多进程的代码编写没有问题,各个进程间的数据通信与神经网络的传参方式一切正常。但是,它的加速效果呢?是加速了还是减速了?如果加速了,又加快了多少呢?毕竟,如果加速效果没有令我们眼前一亮的话,那还是算了吧。如果没有奇效,又有谁愿意顶着秃头的风险入坑多进程呢?
现在就让我们开始多进程算法的优越性测试:
2022年11月2日更新,测…ce…测8出来,无限期测试中…
对于代码作者在编写时是怎么想的:
1.子进程用CPU跑,并且只与环境做交互,不训练,不牵扯到learn函数,只是得到数据(s,a,r,s_),把数据放在记忆库的这个操作也不在子进程中执行。当时编写代码的时候的确面临着两个选择:一是在子进程中执行把数据塞入记忆库的操作,但是这就意味着各个子进程间的数据共享,这个子进程塞了数据,其他的子进程也必须更新记忆库,一个不留意很容易引发数据安全问题,考虑到我的智商为9,于是这种方案我果断放弃;二是子进程不把数据塞入记忆库,只是把数据传出到主进程中,而把数据塞入记忆库的这个操作,在主进程中执行,现在就是第二种。
2.采集数据(即choose_action)这个功能只有子进程才有,子进程也只有采集数据的功能。存放记忆库和learn的功能只有主进程里才有,并且主进程没有采集数据的能力。
3.未改多进程之前的代码是这样执行的:主进程先采集数据————主进程停止采集采集数据开始learn————主进程停止learn开始采集数据————…(无限套娃下去)
现在经过我们多进程改写之后的代码,它的加速原理是这样的:主进程永远都在learn(训练神经网络的权重),而主进程在learn的同时,有4个子进程在为它采集数据。learn与choose_action同时在进行,即我们现在可以一边learn一边采集数据,可以同时干两件事情。
给大家排一些雷:
用这个命令得到的并不是CPU的核数:
我的电脑是Dell 游匣15G(i5-112600H+RTX3050),上面的数字12就能说明我的电脑是12核电脑吗?不是!!!经过实测,别说同时跑12个核了,我就算是开启6个核,电脑都会跑崩溃,如下图所示:
开了6个进程,报错:页面文件太小,无法完成操作。
经过百度,想解决这个报错,可以这么设置电脑。但其实报错的原因并不出在这里,而是因为我的电脑是6核电脑。6核怎么知道的?打开任务管理器:
难怪我最多只能开5个子进程o( ̄▽ ̄)o ,这里为了电脑安全考虑,我就只开4个进程吧。
一个来自别人的小细节:
子进程探索环境时可以把模型全部放在CPU上跑,这样做可以防一手显卡内存溢出。主进程的模型是我们需要更新的,因此放在GPU上跑。
这样的话,CPU与GPU的利用率都可以处于90%以上了。如果我们只用CPU的话,那么CPU的利用率经常会达到99%,而GPU的利用率为1%;只用GPU训练模型的话,那么GPU疯狂运行,而CPU直接偷懒。唉,我为了一碗水端平,CPU、GPU一个也别想好儿,都给我狠狠地运转!!!同一时刻内榨干电脑的所有计算资源,我简直是个畜生啊o(▽)o ,我知道我的电脑一定想谢谢我。
电脑说:我真的会谢!
我:不用谢!
参考文献:
更多推荐
所有评论(0)