动手学强化学习reinforcement_learning-chapter-twenty-多智能体强化学习入门

本文主要是介绍动手学强化学习reinforcement_learning-chapter-twenty-多智能体强化学习入门,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

动手学强化学习reinforcement learning-chapter-twenty-多智能体强化学习入门

动手学强化学习​hrl.boyuai.com/

《动手学强化学习》(张伟楠 沈键 俞勇)【简介_书评_在线阅读】 - 当当图书 (dangdang.com)​product.dangdang.com/29391150.html

ZouJiu1/Hands-on-RL: https://hrl.boyuai.com/ (github.com)​github.com/ZouJiu1/Hands-on-RL/tree/main

简介

在这里插入图片描述

multi-agent reinforcement learning,MARL

单智能体强化学习假设动态环境是 稳态stationary 的,状态转移函数和奖励函数不变,多智能体更加复杂,不仅和环境交互,还直接或者间接的和其他智能体交互。难点:1、环境是non-stationary,2、多个智能体可能多目标,使自己利益最大化,3、评估更难的,需要分布式train

问题modeling

多智能体环境用数组表示: ( N , S , A , R , P ) (N,S,A,R,P) (N,S,A,R,P) ,分别是智能体个数N,所有智能体状态集合 S = ( s 1 , s 2 , . . . ) S=(s_1,s_2,...) S=(s1,s2,...) ,所有智能体的动作集合 A = ( a 1 , a 2 , . . . ) A=(a_1,a_2,...) A=(a1,a2,...) ,R则是所有智能体的奖励函数,P是状态转移概率函数

多智能体强化学习的基本求解范式

完全中心化(full centralized):将多个智能体决策当作一个超级智能体在决策,将状态拼在一起,动作拼在一起,用一个策略网络就可以。

完全去中心化(full decentralized):每个智能体单独train,互不干扰的,每个智能体一个策略网络的。

IPPO algorithm

independent ppo,每个智能体都使用单智能体PPO

IPPO program

首先git clone ma-gym库到本地的

git clone https://github.com/boyu-ai/ma-gym.git

运行的时候报错

Traceback (most recent call last):File "c:\Users\10696\Desktop\access\Hands-on-RL\chapter20.py", line 19, in <module>from ma_gym.envs.combat.combat import CombatFile "C:\Users\10696\Desktop\access\ma-gym\ma_gym\__init__.py", line 10, in <module>env_specs = [env_spec for env_spec in envs.registry.all() if 'gym.envs' in env_spec.entry_point]
AttributeError: 'dict' object has no attribute 'all'

修改的方式是

env_specs = [env_spec for env_spec in envs.registry.all() if 'gym.envs' in env_spec.entry_point]
修改到
env_specs = [env_spec for key, env_spec in envs.registry.items() if 'gym.envs' in env_spec.entry_point]

然后又报错的

Traceback (most recent call last):File "c:\Users\10696\Desktop\access\Hands-on-RL\chapter20.py", line 19, in <module>from ma_gym.envs.combat.combat import CombatFile "C:\Users\10696\Desktop\access\ma-gym\ma_gym\__init__.py", line 15, in <module>kwargs={'name': spec.id, **spec._kwargs}
AttributeError: 'EnvSpec' object has no attribute '_kwargs'. Did you mean: 'kwargs'?

修改的方式是

kwargs={'name': spec.id, **spec._kwargs}
修改到
kwargs={'name': spec.id, **spec.kwargs}

fix two errors by ZouJiu1 · Pull Request #1 · boyu-ai/ma-gym (github.com)

动作的个数是(上、下、左、右、保持不动的),已经存在了共 five 个动作,其他的动作则是 是否攻击敌对目标,program内配置了2个智能体,所以也存在2个敌人,所以共 five + 2 = seven 个动作的呢。

也就是(上、下、左、右、保持不动的、攻击1号敌人,攻击2号敌人)共7个动作,移动和攻击不能同时发生。

状态则不相同的,每个智能体的状态会被初始化到 _agent_i_obs = np.zeros((6, 5, 5)),也就是遍历每个智能体,具体见下面的注释

也就是record 5x5的格子区间内的状态,每个智能体所在的 5x5 格子区间内容。也就是保存了以智能体$中心的5x5格子内的其他智能体或者敌人的内容,包括ID、类型、坐标、cool、健康状况的等。

ma-gym\ma_gym\envs\combat\

combat.py

    def get_agent_obs(self):"""When input to a model, each agent is represented by a set of one-hot binary vectors {i, t, l, h, c}encoding its unique ID, team ID, location, health points and cooldown.A model controlling an agent also sees other agents in its visual range (5 × 5 surrounding area).:return:"""_obs = []for agent_i in range(self.n_agents):  ## 遍历每个智能体pos = self.agent_pos[agent_i]     ## 拿到该智能体的 (x, y) 坐标 # _agent_i_obs = self._one_hot_encoding(agent_i, self.n_agents)# _agent_i_obs += [pos[0] / self._grid_shape[0], pos[1] / (self._grid_shape[1] - 1)]  # coordinates# _agent_i_obs += [self.agent_health[agent_i]]# _agent_i_obs += [1 if self._agent_cool else 0]  # flag if agent is cooling down# team id, unique id, location, health, cooldown_agent_i_obs = np.zeros((6, 5, 5))  ## 初始化状态到全 0, 5x5的格子区间for row in range(0, 5):     ## 行5for col in range(0, 5):  ## 列5## 5x5的起始点是【(pos[0] - 2),(pos[1] - 2)】,满足 is_valid 且 这个格子包含智能体,不是空格子的if self.is_valid([row + (pos[0] - 2), col + (pos[1] - 2)]) and (PRE_IDS['empty'] not in self._full_obs[row + (pos[0] - 2)][col + (pos[1] - 2)]):x = self._full_obs[row + pos[0] - 2][col + pos[1] - 2] ## 智能体的名称 A2_type = 1 if PRE_IDS['agent'] in x else -1   ## 智能体或者敌人_id = int(x[1:]) - 1  # id         A2内的 2 - 1_agent_i_obs[0][row][col] = _type   ## 智能体或者敌人,标识符_agent_i_obs[1][row][col] = _id     ## id
#                         print('type', type, '_type', _type)  ## 智能体或者敌人,的健康状况 <= 3_agent_i_obs[2][row][col] = self.agent_health[_id] if _type == 1 else self.opp_health[_id]## 智能体或者敌人是否已经 game over_agent_i_obs[3][row][col] = self._agent_cool[_id] if _type == 1 else self._opp_cool[_id]## game over or not_agent_i_obs[3][row][col] = 1 if _agent_i_obs[3][row][col] else -1  # cool/uncool## 归一化以后的坐标_agent_i_obs[4][row][col] = pos[0] / self._grid_shape[0]  # x-coordinate_agent_i_obs[5][row][col] = pos[1] / self._grid_shape[1]  # y-coordinate_agent_i_obs = _agent_i_obs.flatten().tolist()_obs.append(_agent_i_obs)return _obs

Program

因 for _ in range(self.epochs): 内的epochs=1,所以重要性采样的数值一直都是1,不变

两个智能体同质所以共享了策略,若状态空间或动作空间不行,那么就不行的呢

## 构造智能体 agent 的大脑,也就是输入状态,返回该状态下,选择每个动作的概率
## 输入是状态的,也就是 (车子center-point的坐标,车子的速度,杆的竖直角度,杆的角速度)
## 返回值应该是2 dim
class PolicyNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)self.fc3 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc2(F.relu(self.fc1(x))))return F.softmax(self.fc3(x), dim=1)  ## 返回该状态下,选择的动作的概率## 构造智能体 agent 的大脑,也就是输入状态,返回该状态下,每个动作的动作价值
## 输入是状态的,也就是 (车子center-point的坐标,车子的速度,杆的竖直角度,杆的角速度)
## 返回值应该是2 dim
class ValueNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim):super(ValueNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)self.fc3 = torch.nn.Linear(hidden_dim, 1)def forward(self, x):x = F.relu(self.fc2(F.relu(self.fc1(x))))return self.fc3(x)class PPO:''' PPO算法,采用截断方式 '''def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,lmbda, epochs, eps, gamma, device):self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)   ##  策略网络的self.critic = ValueNet(state_dim, hidden_dim).to(device)       ##  价值网络self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),lr=actor_lr)           ##  函数配置优化器self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr)         ##  价值函数配置优化器self.gamma = gamma          ## 衰减因子的呢self.lmbda = lmbdaself.eps = eps  # PPO中截断范围的参数self.device = deviceself.epochs = epochsdef take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)probs = self.actor(state)       ## 拿到该状态下,每个动作的选择概率action_dist = torch.distributions.Categorical(probs)    ##   配置 好采样的概率action = action_dist.sample()        ## 对该状态下,所有的动作采样,采样的概率是probsreturn action.item()                 ## 返回依概率采样得到的动作def update(self, transition_dict):## 拿到这条序列内的 状态、动作和奖励,下一个状态、是否完成的states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device)## 用下个状态求下一个状态的状态动作价值,然后间接求出当前状态的状态动作价值td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones)## 间接求出的价值 - 直接求出的当前状态的状态动作价值,也就是 TD-error,或者是优势函数 Atd_delta = td_target - self.critic(states)##  算出优势函数,广义优势估计,也就是每一步优势的均值advantage = rl_utils.compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device)## 选择的旧动作概率的log值,不反向传播求梯度,detachold_log_probs = torch.log(self.actor(states).gather(1,actions)).detach()for _ in range(self.epochs):  # 实现是包括了epoch数量的#ret = self.actor(states).gather(1, actions)log_probs = torch.log(ret)ratio = torch.exp(log_probs - old_log_probs)    ## 算重要性采样surr1 = ratio * advantage  ## 重要性采样和优势估计相乘的surr2 = torch.clamp(ratio, 1 - self.eps,1 + self.eps) * advantage  # 截断## 算出来的重要性采样,求出两者间的最小值,然后加负号,也就是最大化目标函数,不加负号的话是最小化目标函数# kk = torch.min(surr1, surr2)# kkk = -torch.sum(kk)actor_loss = torch.mean(-torch.min(surr1, surr2))  # PPO损失函数# kl_grad = torch.autograd.grad(actor_loss, surr1, create_graph=True, retain_graph=True)[0]# kl_grad2 = torch.autograd.grad(actor_loss, surr2, create_graph=True, retain_graph=True)[0]# kll = (kl_grad + kl_grad2 ) * advantage# kl_gra = torch.autograd.grad(actor_loss, ratio, create_graph=True, retain_graph=True)[0]# su = torch.sum(kl_gra!=kll)# kl_g = torch.autograd.grad(torch.sum(log_probs), ret, create_graph=True, retain_graph=True)[0]## 直接求出当前状态的状态动作价值,和 间接求出的价值,使用 MSE 来算损失函数的,td_target不反向传播求梯度,detachcritic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))self.actor_optimizer.zero_grad()self.critic_optimizer.zero_grad()      ## 价值网络的参数梯度置零的actor_loss.backward()critic_loss.backward()                 ## 价值网络的损失loss反向传播梯度self.actor_optimizer.step() self.critic_optimizer.step()           ## update 价值网络的参数team_size = 2
grid_size = (15, 15)
#创建Combat环境,格子世界的大小$15x15,己方智能体和敌方智能体数量都$2
env = Combat(grid_shape=grid_size, n_agents=team_size, n_opponents=team_size)state_dim = env.observation_space[0].shape[0]
action_dim = env.action_space[0].n
#两个智能体共享同一个策略
agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device)win_list = []
epoch = 10
allimage = []
for i in range(epoch):with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:for i_episode in range(int(num_episodes / 10)):s = env.reset()terminal = Falsewhile not terminal:if int(num_episodes / 10)-1 == i_episode and i == epoch - 1:img = env.render(mode = r'rgb_array')allimage.append(img)a_1 = agent.take_action(s[0])                      ## 智能体1采取了动作的a_2 = agent.take_action(s[1])                      ## 智能体2采取了动作的next_s, r, done, info = env.step([a_1, a_2])       ## 环境执行动作的transition_dict_1['states'].append(s[0])           ## 加入智能体1的当前状态transition_dict_1['actions'].append(a_1)           ## 加入智能体1采取的动作transition_dict_1['next_states'].append(next_s[0]) ## 加入智能体1的下一个状态transition_dict_1['rewards'].append(               ## 加入智能体1的奖励,胜+100,负-0.1,胜的奖励值多了1000倍r[0] + 100 if info['win'] else r[0] - 0.1)  transition_dict_1['dones'].append(False)           ## 没有完成的呢transition_dict_2['states'].append(s[1])           ## 加入智能体2的当前状态transition_dict_2['actions'].append(a_2)           ## 加入智能体2采取的动作transition_dict_2['next_states'].append(next_s[1]) ## 加入智能体2的下一个状态transition_dict_2['rewards'].append(               ## 加入智能体2的奖励,胜+100,负-0.1r[1] + 100 if info['win'] else r[1] - 0.1)transition_dict_2['dones'].append(False)           ## 没有完成的呢s = next_sterminal = all(done)                               ## 是否都已经完成的win_list.append(1 if info["win"] else 0)## 两个智能体默认 共享 同一个策略网络和价值网络agent.update(transition_dict_1)  ## 使用智能体1的数据来train策略网络和价值网络agent.update(transition_dict_2)  ## 使用智能体2的数据来train策略网络和价值网络if (i_episode + 1) % 100 == 0:pbar.set_postfix({'episode':'%d' % (num_episodes / 10 * i + i_episode + 1),'return':'%.3f' % np.mean(win_list[-100:])})pbar.update(1)

https://zhuanlan.zhihu.com/p/659219840

这篇关于动手学强化学习reinforcement_learning-chapter-twenty-多智能体强化学习入门的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/332686

相关文章

从入门到精通C++11 <chrono> 库特性

《从入门到精通C++11<chrono>库特性》chrono库是C++11中一个非常强大和实用的库,它为时间处理提供了丰富的功能和类型安全的接口,通过本文的介绍,我们了解了chrono库的基本概念... 目录一、引言1.1 为什么需要<chrono>库1.2<chrono>库的基本概念二、时间段(Durat

解析C++11 static_assert及与Boost库的关联从入门到精通

《解析C++11static_assert及与Boost库的关联从入门到精通》static_assert是C++中强大的编译时验证工具,它能够在编译阶段拦截不符合预期的类型或值,增强代码的健壮性,通... 目录一、背景知识:传统断言方法的局限性1.1 assert宏1.2 #error指令1.3 第三方解决

从入门到精通MySQL 数据库索引(实战案例)

《从入门到精通MySQL数据库索引(实战案例)》索引是数据库的目录,提升查询速度,主要类型包括BTree、Hash、全文、空间索引,需根据场景选择,建议用于高频查询、关联字段、排序等,避免重复率高或... 目录一、索引是什么?能干嘛?核心作用:二、索引的 4 种主要类型(附通俗例子)1. BTree 索引(

Redis 配置文件使用建议redis.conf 从入门到实战

《Redis配置文件使用建议redis.conf从入门到实战》Redis配置方式包括配置文件、命令行参数、运行时CONFIG命令,支持动态修改参数及持久化,常用项涉及端口、绑定、内存策略等,版本8... 目录一、Redis.conf 是什么?二、命令行方式传参(适用于测试)三、运行时动态修改配置(不重启服务

MySQL DQL从入门到精通

《MySQLDQL从入门到精通》通过DQL,我们可以从数据库中检索出所需的数据,进行各种复杂的数据分析和处理,本文将深入探讨MySQLDQL的各个方面,帮助你全面掌握这一重要技能,感兴趣的朋友跟随小... 目录一、DQL 基础:SELECT 语句入门二、数据过滤:WHERE 子句的使用三、结果排序:ORDE

Go学习记录之runtime包深入解析

《Go学习记录之runtime包深入解析》Go语言runtime包管理运行时环境,涵盖goroutine调度、内存分配、垃圾回收、类型信息等核心功能,:本文主要介绍Go学习记录之runtime包的... 目录前言:一、runtime包内容学习1、作用:① Goroutine和并发控制:② 垃圾回收:③ 栈和

Android学习总结之Java和kotlin区别超详细分析

《Android学习总结之Java和kotlin区别超详细分析》Java和Kotlin都是用于Android开发的编程语言,它们各自具有独特的特点和优势,:本文主要介绍Android学习总结之Ja... 目录一、空安全机制真题 1:Kotlin 如何解决 Java 的 NullPointerExceptio

Python中OpenCV与Matplotlib的图像操作入门指南

《Python中OpenCV与Matplotlib的图像操作入门指南》:本文主要介绍Python中OpenCV与Matplotlib的图像操作指南,本文通过实例代码给大家介绍的非常详细,对大家的学... 目录一、环境准备二、图像的基本操作1. 图像读取、显示与保存 使用OpenCV操作2. 像素级操作3.

基于Python实现智能天气提醒助手

《基于Python实现智能天气提醒助手》这篇文章主要来和大家分享一个实用的Python天气提醒助手开发方案,这个工具可以方便地集成到青龙面板或其他调度框架中使用,有需要的小伙伴可以参考一下... 目录项目概述核心功能技术实现1. 天气API集成2. AI建议生成3. 消息推送环境配置使用方法完整代码项目特点

JavaScript实战:智能密码生成器开发指南

本文通过JavaScript实战开发智能密码生成器,详解如何运用crypto.getRandomValues实现加密级随机密码生成,包含多字符组合、安全强度可视化、易混淆字符排除等企业级功能。学习密码强度检测算法与信息熵计算原理,获取可直接嵌入项目的完整代码,提升Web应用的安全开发能力 目录