分布式多进程CPU无限加速Deep Q-Learning Network

意义:python语言被大家吐槽慢已经由来已久,python由于GIL(全局解释器锁,GIL)的存在,使得我们编写的python程序只能同时由一个CPU处理。而现在都2022年底了,大家的电脑随随便便哪一个不是8核CPU以上的电脑?所以,如果我们不启用多进程功能的话,真的是太浪费我们的生命了,试问,人生能有几个3秒?此外,如果仅使用python默认的单进程去训练智能体的话,如果环境复杂了,训练1000回合需要12小时,那么就很容易把我们急死(急死——等着等着,就慢慢的急死了)。但是,如果我们用了multiprocessing库,就可以让我们的8核电脑同时开启6个核,那么原本12小时的训练,现在就只需x小时(0<x<12)。

如何将多进程与DQN结合?

目前多进程与深度强化学习算法结合的思路大致有以下两种:

第一种,最容易想到的一种方案:

1.多个子进程训练网络,每个子进程独立与环境交互,采集数据,拥有单独的记忆库,并计算相应的网络权重参数
2.在主进程将子进程网络的权重取平均更新到net
3.再将net传入子进程,回到1

第二种,最主流的一种方案:

1.多个子进程不训练网络,只是拿到主进程的网络后去探索环境,并将所得数据通过pipe技术(pipe技术,一种进程间的通信技术)传回主进程
2.主进程将所有子进程交互得到的数据扔到记忆库中供网络训练
3.将更新后的net再传到子进程,回到1

现在,我们先开始实验第二种方案。为什么不从第一个方案开始讲解?因为第一种方案我并不是很感兴趣,所以就先把第一种方案放在以后吧,以后闲了再搞。

ok,首先,方案二的思路是——多个子进程与环境独立交互,毫无疑问,我们需要先初始化N个环境与N个子进程,为的就是同时运行这些环境。

for i in range(PROCESS_NUM):
    p = mp.Process(target=process_env, args=("MountainCar-v0", '进程{}'.format(i),))
    p.start()

其中的process_env是:

def process_env(env_name,name):
    print(f'子进程:{name}{os.getpid()})开始...')
    env=gym.make(env_name).unwrapped
    s=env.reset()
    a=Agent()

我们到底有没有成功开启多进程呢?测试结果如下:

在这里插入图片描述
其次,我们还需要定义一个Agent类,N个不同的环境应该有N个不同的Agent与其交互,即choose_action函数独属于每个agent,但记忆库与learn函数是所有agent共用的。

至此,代码如下,拿走不谢,复制即用,不行砍我!

# -*- coding: utf-8 -*-
#开发者:Bright Fang
#开发时间:2022/10/29 15:24
import torch
import torch.nn as nn
import torch.nn.functional as F
import multiprocessing as mp
from multiprocessing import Pipe
from copy import deepcopy
import numpy as np
import gym
from matplotlib import pyplot as plt
import os
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
Greedy=0.9
MemoryCapacity=2000
LearnSwitch=200
Batch=64
Gamma=0.9
LearningRate=0.01
RENDER=False
Switch=0
PROCESS_NUM=4
env = gym.make("CartPole-v1").unwrapped
'''CartPole的环境状态特征量为推车的位置x、速度x_dot、杆子的角度theta、角速度theta_dot,状态是这四个状态特征所组成的,情况将是无限个,是连续的(即无限个状态),动作是推车向左为0,向右为1,(离散的,有限个,2个)'''
state_number=env.observation_space.shape[0]
action_number=env.action_space.n

def process_env(env_name,pipe):
    env=gym.make(env_name).unwrapped
    s=env.reset()
    reward=0
    while True:
        net=pipe.recv()
        a=Agent(net.cpu())
        action=a.choose_action(s,Greedy)
        s_, r, done, info = env.step(action)
        # env.render()
        x, x_dot, theta, theta_dot = s_
        r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
        r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
        r = 3 * r1 + r2
        # pos,vel=s_
        # if pos>=0.5:
        #     r=100
        reward=reward+r
        data=np.hstack((s,action,r,s_))
        pipe.send(data)
        s=s_
        if done:
            s = env.reset()
            print('r',reward)
            if reward>-150:
                save_data={'net':a.real_net.state_dict()}
                torch.save(save_data,"E:\process_model_mountaincar.pth")
            reward=0
        #现在子进程永不停止
        # if done:
        #     break

'''搭建神经网络'''
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.in_to_y1=nn.Linear(state_number,20)
        self.in_to_y1.weight.data.normal_(0,0.1)
        self.in_to_y2=nn.Linear(20,10)
        self.in_to_y2.weight.data.normal_(0,0.1)
        self.out=nn.Linear(10,action_number)
        self.out.weight.data.normal_(0,0.1)
    def forward(self,inputstate):
        inputstate=self.in_to_y1(inputstate)
        inputstate=F.relu(inputstate)
        inputstate=self.in_to_y2(inputstate)
        inputstate=torch.sigmoid(inputstate)
        action_Q=self.out(inputstate)
        return action_Q
'''第二步  定义选择动作函数,它接受1*2的状态,输出动作'''
class DQN():
    def __init__(self):
        self.real_net,self.target_net=Net().cuda(),Net().cuda()
        self.memory_counter=0
        self.mem=np.zeros((MemoryCapacity,state_number*2+2))
        self.learn_step=0
        self.random_step=0
        self.act_his=0
        self.lossfunc=nn.MSELoss()
        self.optimizer=torch.optim.Adam(self.real_net.parameters(),lr=LearningRate)

    '''第三步 定义记忆库,从记忆库里选取动作'''
    def store_transition(self,tran):
        # tran=np.hstack((s,a,r,s_))
        index=self.memory_counter%MemoryCapacity
        self.mem[index,:]=tran
        self.memory_counter+=1
    '''第四步 写Qlearning算法'''
    def learn(self):
        if self.learn_step%LearnSwitch==0:
            self.target_net.load_state_dict(self.real_net.state_dict())
        self.learn_step+=1
        sample_index=np.random.choice(MemoryCapacity,Batch)
        new_mem=self.mem[sample_index,:]
        b_s=torch.FloatTensor(new_mem[:,0:state_number]).cuda()
        b_a=torch.LongTensor(new_mem[:,state_number:state_number+1]).cuda()
        b_r=torch.FloatTensor(new_mem[:,state_number+1:state_number+2]).cuda()
        b_s_=torch.FloatTensor(new_mem[:,-state_number:]).cuda()
        real_Q=self.real_net(b_s).gather(1,b_a)
        next_Q=self.target_net(b_s_).detach()
        target_Q=b_r+Gamma*next_Q.max(1)[0].view(Batch,1)
        loss=self.lossfunc(real_Q,target_Q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

class Agent():
    def __init__(self,net):
        self.real_net=net
        self.optimizer = torch.optim.Adam(self.real_net.parameters(), lr=LearningRate)
    def choose_action(self,inputstate,G=Greedy):
        inputstate=torch.unsqueeze(torch.FloatTensor(inputstate), 0)
        if np.random.uniform()<G:
            action_Q=self.real_net.forward(inputstate)
            action=torch.max(action_Q,1)[1].item()
        else:
            action = np.random.randint(0, action_number)
        return action
'''训练'''
if __name__ == '__main__':
    if Switch==0:
        print("训练中...")
        net=Net()#在主进程里定义一个net,让所有的子进程的神经网络的权重初始值相同
        f = DQN()
        #让主进程里的real_net网络和子进程的real_net网络参数在初始时 相同
        f.real_net.load_state_dict(net.state_dict())
        pipe_dict = dict((i, (child_pipe, main_pipe)) for i in range(PROCESS_NUM) for child_pipe, main_pipe in (Pipe(),))
        [pipe_dict[j][1].send(net) for j in range(PROCESS_NUM)]
        for i in range(PROCESS_NUM):
            p=mp.Process(target=process_env,args=("CartPole-v1",pipe_dict[i][0],))
            p.start()
        while True:
            # if data[3]>50:
            #     print('**************************',data[3])
            for j in range(PROCESS_NUM):
                data=pipe_dict[j][1].recv()
                f.store_transition(data)
            if f.memory_counter>MemoryCapacity and f.memory_counter%5==0:
                f.learn()
                net.load_state_dict(f.real_net.state_dict())
            [pipe_dict[j][1].send(net) for j in range(PROCESS_NUM)]#主进程的网络发到子进程
    else:
        '''使用训练好的网络参数离线测试'''
        print("测试DQN中...")
        c=DQN()
        checkpoint = torch.load("E:\process_model_mountaincar.pth")
        c.real_net.load_state_dict(checkpoint['net'])
        for j in range(10):
            state = env.reset()
            total_rewards = 0
            while True:
                env.render()
                state = torch.unsqueeze(torch.FloatTensor(state), 0).cuda()
                action_Q = c.real_net.forward(state)
                action = torch.max(action_Q, 1)[1].item()
                new_state, reward, done, info = env.step(action)  # 执行动作
                total_rewards += reward
                if done:
                    print("Score", total_rewards)
                    break
                state = new_state
        env.close()

代码用法:
先把Switch标志为赋为0,先训练,训练个29秒就直接停止训练(不要等了),因为神经网络的参数已经被我们保存在E盘里了。然后,把Switch标志为赋为1,就可以看到训练的效果了。
remark:
1.神经网络的参数被保存在了电脑E盘里,别告诉我你的电脑没有E盘。没有自己改代码。
2.我感觉版本信息不重要,但还是给一下以供参考。我用的gym版本:0.20.0;我用的pytorch版本:1.10.0+cu113。

多进程CPU加速的效果测试:

1.首先让我们来测试代码的收敛性,收敛性对强化学习有多重要,懂得都懂~~所以,要是不收敛,赶紧让博主滚蛋,别搁这浪费大家的时间!!

我觉得让我体现它的收敛性,一切说辞都是无力,不如让你们自己看来的实在。

多进程的CartPole环境收敛性测试

2.上面的视频的确能说明一个问题:那就是————多进程的代码编写没有问题,各个进程间的数据通信与神经网络的传参方式一切正常。但是,它的加速效果呢?是加速了还是减速了?如果加速了,又加快了多少呢?毕竟,如果加速效果没有令我们眼前一亮的话,那还是算了吧。如果没有奇效,又有谁愿意顶着秃头的风险入坑多进程呢?

现在就让我们开始多进程算法的优越性测试:
2022年11月2日更新,测…ce…测8出来,无限期测试中…

对于代码作者在编写时是怎么想的:

1.子进程用CPU跑,并且只与环境做交互,不训练,不牵扯到learn函数,只是得到数据(s,a,r,s_),把数据放在记忆库的这个操作也不在子进程中执行。当时编写代码的时候的确面临着两个选择:一是在子进程中执行把数据塞入记忆库的操作,但是这就意味着各个子进程间的数据共享,这个子进程塞了数据,其他的子进程也必须更新记忆库,一个不留意很容易引发数据安全问题,考虑到我的智商为9,于是这种方案我果断放弃;二是子进程不把数据塞入记忆库,只是把数据传出到主进程中,而把数据塞入记忆库的这个操作,在主进程中执行,现在就是第二种。

2.采集数据(即choose_action)这个功能只有子进程才有,子进程也只有采集数据的功能。存放记忆库和learn的功能只有主进程里才有,并且主进程没有采集数据的能力。

3.未改多进程之前的代码是这样执行的:主进程先采集数据————主进程停止采集采集数据开始learn————主进程停止learn开始采集数据————…(无限套娃下去)

现在经过我们多进程改写之后的代码,它的加速原理是这样的:主进程永远都在learn(训练神经网络的权重),而主进程在learn的同时,有4个子进程在为它采集数据。learn与choose_action同时在进行,即我们现在可以一边learn一边采集数据,可以同时干两件事情。

给大家排一些雷:

用这个命令得到的并不是CPU的核数:

在这里插入图片描述
我的电脑是Dell 游匣15G(i5-112600H+RTX3050),上面的数字12就能说明我的电脑是12核电脑吗?不是!!!经过实测,别说同时跑12个核了,我就算是开启6个核,电脑都会跑崩溃,如下图所示:
在这里插入图片描述
开了6个进程,报错:页面文件太小,无法完成操作。

经过百度,想解决这个报错,可以这么设置电脑。但其实报错的原因并不出在这里,而是因为我的电脑是6核电脑。6核怎么知道的?打开任务管理器:
在这里插入图片描述
难怪我最多只能开5个子进程o( ̄▽ ̄)o ,这里为了电脑安全考虑,我就只开4个进程吧。

一个来自别人的小细节:

子进程探索环境时可以把模型全部放在CPU上跑,这样做可以防一手显卡内存溢出。主进程的模型是我们需要更新的,因此放在GPU上跑。

这样的话,CPU与GPU的利用率都可以处于90%以上了。如果我们只用CPU的话,那么CPU的利用率经常会达到99%,而GPU的利用率为1%;只用GPU训练模型的话,那么GPU疯狂运行,而CPU直接偷懒。唉,我为了一碗水端平,CPU、GPU一个也别想好儿,都给我狠狠地运转!!!同一时刻内榨干电脑的所有计算资源,我简直是个畜生啊o()o ,我知道我的电脑一定想谢谢我。

电脑说:我真的会谢!

我:不用谢!

参考文献:

B站视频:Python 并发编程实战,用多线程、多进程、多协程加速程序运行

CSDN:DPPO深度强化学习算法实现思路(分布式多进程加速)

Logo

鸿蒙生态一站式服务平台。

更多推荐