Skip to main content

Exploring multi-agent reinforcement learning (MARL)

This article provides a practical introduction to multi-agent reinforcement learning (MARL), explaining its theoretical foundations, key algorithms, and frameworks, and showcasing a custom-coded multi-agent Pong environment with self-play DQN agents to illustrate the opportunities and challenges of training AI agents that interact, compete, or cooperate within shared environments.
Created on May 21|Last edited on May 22
Multi-agent reinforcement learning (MARL) is a rapidly evolving branch of artificial intelligence that explores how multiple autonomous agents learn, adapt, and interact within shared environments. Unlike traditional single-agent reinforcement learning—where one agent learns in isolation—MARL considers scenarios where several agents simultaneously make decisions, often influencing each other’s experiences and outcomes.
This collective learning framework is becoming increasingly vital in real-world applications. In robotics, for example, MARL enables teams of robots to collaborate on complex tasks like warehouse logistics or disaster response. Similarly, advances in large language models (LLMs) have opened up possibilities where multiple language agents can work together or hold sophisticated dialogues to solve problems that exceed the capability of any single AI. By leveraging MARL, these systems can develop coordinated strategies, adapt in dynamic settings, and achieve goals that require cooperation, competition, or negotiation.
With its growing impact, understanding MARL is key for building intelligent, adaptive systems that operate effectively in multi-agent scenarios—whether in physical, virtual, or hybrid environments.

Run: pong_mp4_video_log
1


Table of contents



What is multi-agent reinforcement learning?

Reinforcement learning (RL) is a branch of machine learning where an agent learns to make a sequence of decisions by interacting with an environment. The agent observes the state of the environment, takes actions, and receives feedback in the form of rewards or penalties. Over time, by trial and error, the agent aims to learn a policy—a strategy for choosing actions—that maximizes its total cumulative reward.
Multi-agent reinforcement learning extends the principles of single-agent reinforcement learning to scenarios involving multiple interacting agents. Unlike single-agent systems, MARL requires agents to cooperate, compete, and coordinate within a shared environment, making it essential for complex decision-making tasks.
In single-agent reinforcement learning, a single agent interacts with an environment to learn a policy—a mapping from states or observations to actions—that maximizes cumulative reward over time. The agent’s learning process is shaped by trial and error, using feedback in the form of rewards or penalties provided by the environment.
MARL introduces a new layer of complexity by incorporating multiple agents that simultaneously interact with both the environment and each other. Each agent perceives the environment, takes actions that affect both their own state and that of others, and receives feedback based on the collective outcome. This setup gives rise to key concepts such as:
  • Cooperation: Agents may need to work together to achieve a shared goal, as in robotic teams collaborating to accomplish a task that would be impossible for a single robot.
  • Competition: In some environments, agents have opposing objectives, as seen in adversarial games where the success of one agent comes at the expense of another.
  • Coordination: Even in situations without strict cooperation or competition, agents must often synchronize their actions to avoid conflicts or inefficiencies, such as ensuring multiple autonomous vehicles can navigate an intersection safely.
The core entities in MARL—the agents, the environment, and the reinforcement learning framework—combine to create a dynamic system where learning must account not only for environmental feedback, but also for the diverse strategies and adaptations of other agents. This dynamic interplay makes MARL a powerful tool for developing intelligent systems capable of operating in complex, interactive domains.

MARL, game theory, and optimization: Overlapping foundations

While multi-agent reinforcement learning is centered on agents learning through interaction, its theoretical roots are intertwined with both game theory and optimization theory.
Game theory offers a framework for modeling and analyzing the strategic interplay between agents, helping to understand what happens when decision-makers have competing, cooperating, or mixed objectives. Alongside this, optimization theory plays a crucial role in MARL, guiding agents in systematically improving their learning processes, policies, and coordination mechanisms.
Optimization theory addresses questions like: How can agents most efficiently improve their performance? What algorithms allow an agent (or a team of agents) to approach the best possible outcome, given the dynamic and interactive nature of the environment? In MARL, this often leads to leveraging powerful tools such as gradient descent, regret minimization, and distributed optimization techniques. For example, policy gradient methods—rooted directly in optimization—allow agents to iteratively update their decision-making strategies to maximize expected rewards, even as other agents are simultaneously adapting. When multiple agents need to work together, distributed optimization algorithms enable them to coordinate actions and share information to find solutions that benefit the whole system.
The rich overlap between these fields is apparent in a range of MARL scenarios. Zero-sum games, such as self-play in chess or StarCraft II, emphasize competition and adversarial learning—classic topics for both game and optimization theory. Cooperative tasks, like teams of robots jointly lifting an object or coordinating area coverage, draw on optimization methods for collective planning and resource allocation. In more nuanced mixed-motive settings—such as chatbots negotiating trade-offs or autonomous vehicles managing intersections—agents must simultaneously optimize their own outcomes while anticipating, and adapting to, the strategies of other agents.
By integrating the perspectives of both game theory and optimization, MARL provides a foundation for designing multi-agent systems that are robust, adaptive, and capable of efficiently solving complex problems in environments where strategic interaction and coordination are essential.

Frameworks and algorithms in MARL

Building effective multi-agent reinforcement learning (MARL) systems requires both robust software frameworks and sophisticated learning algorithms. The frameworks—such as PettingZoo, RLlib, and PyMARL—provide standardized environments and tools that make it easier for researchers to implement, compare, and scale up MARL experiments. These ecosystems support a variety of multi-agent scenarios, from cooperative teams to competitive adversaries and everything in between.
At the algorithmic core of MARL, different approaches are designed to address the spectrum of agent interactions: cooperation, competition, and coordination.
One fundamental class of approaches is independent learning, where each agent treats the other agents as part of the environment and learns purely from its own observations and rewards. An example of this is IPPO (Independent Proximal Policy Optimization), an adaptation of the popular single-agent PPO algorithm for the multi-agent context. In IPPO, every agent optimizes its policy independently, without sharing gradients or internal states. This simplicity makes it flexible enough for both cooperative and competitive environments, but it can struggle in tasks where agents need to closely coordinate, as each agent is “blind” to the learning process of others.
For tasks that demand teamwork, cooperative algorithms come into play. Here, agents work together towards a shared objective, and success depends on how well they coordinate. QMIX is a prominent cooperative algorithm, particularly effective in environments where a team must act in concert, like groups of agents in StarCraft II.
QMIX employs a specialized network during training that mixes individual agents’ value functions using a centralized but trainable method, capturing the global context to ensure agents’ decisions contribute meaningfully to the team’s overall reward. When deployed, each agent still acts independently, but their behaviors are shaped by the joint learning experienced during training. This approach directly tackles the credit assignment problem by helping the system attribute parts of the team’s success or failure to individual agents.
Straddling the line between full independence and pure cooperation, algorithms like MAPPO (Multi-Agent Proximal Policy Optimization) use a concept called centralized training with decentralized execution. During training, agents have access to broader information—sometimes including the observations and actions of teammates or opponents—enabling more robust and stable learning. However, during actual execution, each agent must act solely on its own local viewpoint. MAPPO extends PPO by including a “centralized critic” that leverages this shared information during learning, but policies themselves remain decentralized for practical deployment. This setup allows agents to learn sophisticated strategies that incorporate both competitive and cooperative elements, making MAPPO effective in a range of MARL tasks.
Altogether, advances in tools and algorithms like IPPO, QMIX, and MAPPO have significantly expanded what is possible in multi-agent RL. By addressing the nuances of cooperation, competition, and coordination in different ways, these methods have laid the foundation for intelligent systems that can operate effectively in interacting, dynamic groups.

Frameworks for MARL

Developing and evaluating MARL algorithms would be extremely challenging without standardized frameworks that simplify environment creation, agent orchestration, simulation, and benchmarking. Over the past few years, several dedicated MARL libraries and platforms have become essential tools for researchers and practitioners.
PettingZoo is a widely adopted Python library that provides a broad suite of multi-agent environments, ranging from simple classic games to more intricate scenarios involving hundreds of agents. PettingZoo standardizes the environment interface, making it easy to test various MARL algorithms across both cooperative and competitive tasks without needing to rewrite code for each new scenario.
RLlib, part of the Ray distributed computing ecosystem, supports scalable reinforcement learning—including MARL—with powerful parallelization features. RLlib includes off-the-shelf implementations of many single- and multi-agent algorithms and is well-suited for industrial-scale experiments or applications that require efficient use of compute resources.
PyMARL specializes in cooperative MARL, offering well-established benchmarks such as the StarCraft Multi-Agent Challenge (SMAC). PyMARL has become a reference for developing and evaluating new cooperative learning algorithms, providing standardized settings and baselines for reproducibility.
All told, these frameworks have lowered the barrier to entry for MARL research, making the process of deploying and evaluating multi-agent systems far more accessible and reliable. They complement state-of-the-art algorithms by providing the practical scaffolding needed to test theoretical ideas in real, often complex, scenarios.

Asymmetric multi-agent reinforcement learning

In multi-agent reinforcement learning, the symmetry or asymmetry of a setup refers to whether agents have equivalent knowledge, observation spaces, objectives, or learning policies. In symmetric MARL, every agent operates under the same conditions: they possess the same types of observations, follow the same set of actions, and typically share objectives or reward functions. Agents in symmetric settings are essentially interchangeable, and any policy learned by one agent can be directly transferred to the others.
Asymmetric MARL, by contrast, describes scenarios where agents differ in at least one major aspect—such as access to information, observation granularity, available actions, or even their individual goals. In such settings, one agent might have a global view of the environment while another sees only local information; some might be tasked with entirely different roles or objectives within the team. Asymmetric setups are common in realistic applications: for example, in distributed sensor networks, hierarchical team tasks, or environments like MATE (Multi-Agent Test Environments), where agents naturally operate with different perspectives and responsibilities.
The primary difference, then, is that while symmetric approaches assume homogeneity among agents, asymmetric approaches acknowledge and leverage agent heterogeneity. This allows for richer cooperation, specialization, and more faithful modeling of complex real-world systems.
A particularly impactful method within asymmetric MARL is asymmetric self-play. Here, agents improve by repeatedly encountering versions of themselves (or other agents) that have different informational vantage points or capabilities. This dynamic, adversarial, or cooperative learning pushes agents to become more robust and adaptive, preparing them for diverse strategies and unforeseen scenarios.
By explicitly addressing agent differences, asymmetric MARL provides greater flexibility for real-world deployment—making it possible to design and train agents for environments where identical roles and knowledge cannot be assumed. This significantly broadens the applicability and effectiveness of MARL, especially in domains that require agent specialization, partial observability, or variable expertise.

Tutorial: Multi-agent Pong with self-play

For this tutorial, I created a custom multi-agent Pong environment instead of using a standard Gym environment. The main reason was flexibility: by designing my own environment, I can fully control the game rules, reward structure, agent actions, and state observability, adapting everything to the needs of multi-agent reinforcement learning research and experimentation.
One key design choice was to offer both a lightweight, low-dimensional observation space (based on coordinates and velocities) and the option for a full pixel-based state. The lightweight representation dramatically speeds up learning and experimentation, making it easier to train agents quickly and efficiently.
At the same time, being able to render and record the game in high-resolution pixel space lets me visually inspect agent behavior, replay and debug episodes, and generate insightful visualizations. This hybrid approach gives me the best of both worlds: fast, efficient RL training and rich, actionable insights through replay and visualization.
By having full control over state, rewards, and randomness, I can ensure that experiments are reproducible and tunable. Recording and replay features are built in, making it easy to analyze training episodes or diagnose mistakes. Overall, building my own environment lets me quickly adapt to new research ideas and experiment flexibly, unconstrained by external libraries.
import gym
from gym import spaces
import numpy as np
import pygame
from typing import Any, Dict, Optional, List, Tuple, Union
import random
import random
import math
import random
import math

class PongEnv(gym.Env):
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 60} # <-- class var, not in __init__

def __init__(
self,
render_mode: Optional[str] = None,
frames_to_return: int = 4,
state_mode: str = "coordinate_state",
record_replay: bool = False
):
self.WIDTH, self.HEIGHT = 800, 400
self.PADDLE_WIDTH, self.PADDLE_HEIGHT = 10, 70
self.BALL_RADIUS = 10
self.PADDLE_SPEED = 8
self.CPU_SPEED = 8
self.BALL_X_SPEED = 10
self.BALL_Y_SPEED = 10

self.WHITE = (255, 255, 255)
self.BLACK = (0, 0, 0)
self.render_mode = render_mode
self.frames_to_return = frames_to_return
self.state_mode = state_mode
self.last_serve_angle = None


# 0: stay, 1: up, 2: down, 3: cpu (will be mapped to -100 internally for compatibility)
self.action_space = spaces.MultiDiscrete([3, 4])

if self.state_mode == "coordinate_state":
self.observation_space = spaces.Box(
low=-1.0, high=1.0,
shape=(self.frames_to_return, 6),
dtype=np.float32
)
else:
self.observation_space = spaces.Box(
low=0, high=255,
shape=(self.HEIGHT, self.WIDTH, 3),
dtype=np.uint8
)

self.window = None
self.clock = None
self.frame_history = []

# Recording
self.record_replay = record_replay
self.episode_buffer: List[Dict[str, Any]] = []
self.recorded_episodes: List[List[Dict[str, Any]]] = []

def _get_obs(self):
if self.state_mode == "coordinate_state":
ball_x_norm = (self.ball_x / (self.WIDTH / 2)) - 1.0
ball_y_norm = (self.ball_y / (self.HEIGHT / 2)) - 1.0
player_y_norm = (self.player_y / (self.HEIGHT - self.PADDLE_HEIGHT)) * 2 - 1.0
cpu_y_norm = (self.cpu_y / (self.HEIGHT - self.PADDLE_HEIGHT)) * 2 - 1.0
ball_x_vel_norm = self.ball_x_vel / self.BALL_X_SPEED
ball_y_vel_norm = self.ball_y_vel / self.BALL_Y_SPEED
current_state = np.array([
ball_x_norm, ball_y_norm,
player_y_norm, cpu_y_norm,
ball_x_vel_norm, ball_y_vel_norm
], dtype=np.float32)
self.frame_history.append(current_state)
if len(self.frame_history) > self.frames_to_return:
self.frame_history.pop(0)
while len(self.frame_history) < self.frames_to_return:
self.frame_history.append(current_state)
return np.array(self.frame_history)
else:
if self.window is not None:
return np.transpose(
np.array(pygame.surfarray.pixels3d(self.window)),
axes=(1, 0, 2)
)
else:
return np.zeros((self.HEIGHT, self.WIDTH, 3), dtype=np.uint8)

def _get_info(self):
return {
"player_score": self.player_score,
"cpu_score": self.cpu_score,
"ball_position": (self.ball_x, self.ball_y),
"player_position": (20, self.player_y),
"cpu_position": (self.WIDTH - 20 - self.PADDLE_WIDTH, self.cpu_y)
}

def get_env_state(self):
return {
"ball_x": self.ball_x,
"ball_y": self.ball_y,
"ball_x_vel": self.ball_x_vel,
"ball_y_vel": self.ball_y_vel,
"player_y": self.player_y,
"cpu_y": self.cpu_y,
"player_score": self.player_score,
"cpu_score": self.cpu_score,
"frame_history": [np.copy(f) for f in self.frame_history],
}

def set_env_state(self, state: Dict[str, Any]):
self.ball_x = state["ball_x"]
self.ball_y = state["ball_y"]
self.ball_x_vel = state["ball_x_vel"]
self.ball_y_vel = state["ball_y_vel"]
self.player_y = state["player_y"]
self.cpu_y = state["cpu_y"]
self.player_score = state["player_score"]
self.cpu_score = state["cpu_score"]
self.frame_history = [np.copy(f) for f in state.get("frame_history", [])]


def _custom_serve(self, serve_to=None):
"""
Serve ball toward a random spot on CPU or player paddle.
Ball always moves EXACTLY at BALL_X_SPEED, no bounces off goal.
"""
# Center of court is (self.WIDTH//2, self.HEIGHT//2)
if serve_to == "player":
target_x = 20 + self.PADDLE_WIDTH // 2
target_y = random.uniform(self.player_y, self.player_y + self.PADDLE_HEIGHT)
elif serve_to == "cpu":
target_x = self.WIDTH - 20 - self.PADDLE_WIDTH // 2
target_y = random.uniform(self.cpu_y, self.cpu_y + self.PADDLE_HEIGHT)
else:
# Pick randomly left or right paddle
if random.random() < 0.5:
target_x = 20 + self.PADDLE_WIDTH // 2
target_y = random.uniform(self.player_y, self.player_y + self.PADDLE_HEIGHT)
else:
target_x = self.WIDTH - 20 - self.PADDLE_WIDTH // 2
target_y = random.uniform(self.cpu_y, self.cpu_y + self.PADDLE_HEIGHT)

dx = target_x - self.WIDTH // 2
dy = target_y - self.HEIGHT // 2
norm = (dx ** 2 + dy ** 2) ** 0.5
speed = self.BALL_X_SPEED

self.ball_x_vel = speed * dx / norm
self.ball_y_vel = speed * dy / norm

self.last_serve_angle = math.atan2(dy, dx)

def reset(self, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None, serve_to:str=None):
super().reset(seed=seed)
# Randomized paddle positions:
self.player_y = random.randint(0, self.HEIGHT - self.PADDLE_HEIGHT)
self.cpu_y = random.randint(0, self.HEIGHT - self.PADDLE_HEIGHT)
self.ball_x = self.WIDTH // 2
self.ball_y = self.HEIGHT // 2
self.player_score = 0
self.cpu_score = 0
self.frame_history = []
self.player_touched = False
self.cpu_touched = False
self._custom_serve(serve_to=serve_to)
obs = self._get_obs()
info = self._get_info()
if self.record_replay:
self.episode_buffer = []
self.episode_buffer.append({
"env_state": self.get_env_state(),
"action": None,
"reward": [0.0, 0.0],
"terminated": False,
"info": info,
})
return obs, info

def _reset_ball(self, serve_to=None):
self.ball_x = self.WIDTH // 2
self.ball_y = self.HEIGHT // 2
# Randomize paddles every serve!
self.player_y = random.randint(0, self.HEIGHT - self.PADDLE_HEIGHT)
self.cpu_y = random.randint(0, self.HEIGHT - self.PADDLE_HEIGHT)
self._custom_serve(serve_to=serve_to)

def step(self, action):
# Parse actions
if isinstance(action, (list, tuple, np.ndarray)):
player_action, cpu_action = action
else:
player_action, cpu_action = action, -100
if cpu_action == 3:
cpu_action = -100

# Move player paddle
if player_action == 1 and self.player_y - self.PADDLE_SPEED >= 0:
self.player_y -= self.PADDLE_SPEED
elif player_action == 2 and self.player_y + self.PADDLE_SPEED + self.PADDLE_HEIGHT <= self.HEIGHT:
self.player_y += self.PADDLE_SPEED

# Move CPU paddle
if cpu_action == -100:
# Simple AI follows the ball
if self.cpu_y + self.PADDLE_HEIGHT // 2 < self.ball_y and self.cpu_y + self.PADDLE_HEIGHT <= self.HEIGHT:
self.cpu_y += self.CPU_SPEED
elif self.cpu_y + self.PADDLE_HEIGHT // 2 > self.ball_y and self.cpu_y >= 0:
self.cpu_y -= self.CPU_SPEED
else:
if cpu_action == 1 and self.cpu_y - self.PADDLE_SPEED >= 0:
self.cpu_y -= self.PADDLE_SPEED
elif cpu_action == 2 and self.cpu_y + self.PADDLE_SPEED + self.PADDLE_HEIGHT <= self.HEIGHT:
self.cpu_y += self.PADDLE_SPEED

# Move the ball
self.ball_x += self.ball_x_vel
self.ball_y += self.ball_y_vel

# Ball bounces off top/bottom
if self.ball_y - self.BALL_RADIUS <= 0 or self.ball_y + self.BALL_RADIUS >= self.HEIGHT:
self.ball_y_vel *= -1

reward1, reward2 = 0.0, 0.0
terminated = False

# Ball bounces off player paddle
hit_player = False
if (
20 < self.ball_x - self.BALL_RADIUS < 20 + self.PADDLE_WIDTH and
self.player_y < self.ball_y < self.player_y + self.PADDLE_HEIGHT
):
self.ball_x_vel *= -1
self.player_touched = True
hit_player = True

# Ball bounces off cpu paddle
hit_cpu = False
if (
self.WIDTH - 20 - self.PADDLE_WIDTH < self.ball_x + self.BALL_RADIUS < self.WIDTH - 20 and
self.cpu_y < self.ball_y < self.cpu_y + self.PADDLE_HEIGHT
):
self.ball_x_vel *= -1
self.cpu_touched = True
hit_cpu = True

# Add rewards for hitting paddle!
# You can set self.r_for_hit elsewhere (e.g. in __init__ or dynamically)
if hit_player:
reward1 += getattr(self, 'r_for_hit', 0.0)
if hit_cpu:
reward2 += getattr(self, 'r_for_hit', 0.0) # Comment this line if only want to reward player

# Only scoring events give reward IF THE OPPOSING PADDLE WAS TOUCHED THIS RALLY
if self.ball_x < 0:
# CPU "scores"
if self.cpu_touched:
self.cpu_score += 1
reward1 = -1 # Player penalized
reward2 = 1 # CPU rewarded
# else: both rewards zero
self._reset_ball()
# Reset touch flags for next rally
self.player_touched = False
self.cpu_touched = False
if self.cpu_score >= 21:
terminated = True
elif self.ball_x > self.WIDTH:
# Player "scores"
if self.player_touched:
self.player_score += 1
reward1 = 1 # Player rewarded
reward2 = -1 # CPU penalized
# else: both zero
self._reset_ball()
# Reset touch flags for next rally
self.player_touched = False
self.cpu_touched = False
if self.player_score >= 21:
terminated = True

# Prepare outputs
obs = self._get_obs()
info = self._get_info()

# Optionally record for replay
if self.record_replay:
self.episode_buffer.append({
"env_state": self.get_env_state(),
"action": (player_action, cpu_action),
"reward": [reward1, reward2],
"terminated": terminated,
"info": info,
})
if terminated:
self.recorded_episodes.append(self.episode_buffer)

if self.render_mode == "human":
self._render_frame()

return obs, [reward1, reward2], terminated, False, info






def _randomize_ball_velocity(self, last_angle=None, angle_min_deg=30, angle_max_deg=50):
"""
Set self.ball_x_vel and self.ball_y_vel to a random, less extreme angle, with consistent speed.
Optionally, avoid being too close to `last_angle`.
"""
for attempt in range(50):
angle = random.uniform(0, 2 * math.pi)
deg = math.degrees(angle) % 180
# Allow only less extreme angles
if angle_min_deg < deg < angle_max_deg or (180 - angle_max_deg) < deg < (180 - angle_min_deg):
if last_angle is not None and abs(angle - last_angle) < 0.05:
continue # too close to previous angle, try again
speed = self.BALL_X_SPEED # <-- consistent speed!
self.ball_x_vel = speed * math.cos(angle)
self.ball_y_vel = speed * math.sin(angle)
self.last_serve_angle = angle
return
# Fallback, use 45°
speed = self.BALL_X_SPEED
angle = math.pi / 4
self.ball_x_vel = speed * math.cos(angle)
self.ball_y_vel = speed * math.sin(angle)
self.last_serve_angle = angle



def render(self):
if self.render_mode == "rgb_array":
return self._render_frame()

def _render_frame(self):
if self.window is None and self.render_mode == "human":
pygame.init()
pygame.display.init()
self.window = pygame.display.set_mode((self.WIDTH, self.HEIGHT))
pygame.display.set_caption('Pong Gym Environment')
if self.clock is None and self.render_mode == "human":
self.clock = pygame.time.Clock()
if self.window is not None:
self.window.fill(self.BLACK)
pygame.draw.rect(self.window, self.WHITE, (20, self.player_y, self.PADDLE_WIDTH, self.PADDLE_HEIGHT))
pygame.draw.rect(self.window, self.WHITE, (self.WIDTH - 20 - self.PADDLE_WIDTH, self.cpu_y, self.PADDLE_WIDTH, self.PADDLE_HEIGHT))
pygame.draw.circle(self.window, self.WHITE, (int(self.ball_x), int(self.ball_y)), self.BALL_RADIUS)
font = pygame.font.SysFont('Arial', 40)
text = font.render(f"{self.player_score} {self.cpu_score}", True, self.WHITE)
self.window.blit(text, (self.WIDTH//2 - text.get_width()//2, 20))
if self.render_mode == "human":
pygame.event.pump()
pygame.display.update()
# Use bulletproof fps getter
render_fps = getattr(type(self), "metadata", {}).get("render_fps", 60)
if self.clock is not None:
self.clock.tick(render_fps)
if self.render_mode == "rgb_array":
return np.transpose(
np.array(pygame.surfarray.pixels3d(self.window)),
axes=(1, 0, 2)
)

def close(self):
if self.window is not None:
pygame.display.quit()
pygame.quit()
self.window = None
self.clock = None


def _render_frame(self, return_rgb=False):
# Always ensure window exists
if self.window is None:
if self.render_mode == "human":
pygame.init()
pygame.display.init()
self.window = pygame.display.set_mode((self.WIDTH, self.HEIGHT))
pygame.display.set_caption('Pong Gym Environment')
else:
# Offscreen surface for rgb_array mode
pygame.init()
self.window = pygame.Surface((self.WIDTH, self.HEIGHT))

if self.clock is None and self.render_mode == "human":
self.clock = pygame.time.Clock()
self.window.fill(self.BLACK)
pygame.draw.rect(self.window, self.WHITE, (20, self.player_y, self.PADDLE_WIDTH, self.PADDLE_HEIGHT))
pygame.draw.rect(self.window, self.WHITE, (self.WIDTH - 20 - self.PADDLE_WIDTH, self.cpu_y, self.PADDLE_WIDTH, self.PADDLE_HEIGHT))
pygame.draw.circle(self.window, self.WHITE, (int(self.ball_x), int(self.ball_y)), self.BALL_RADIUS)
font = pygame.font.SysFont('Arial', 40)
text = font.render(f"{self.player_score} {self.cpu_score}", True, self.WHITE)
self.window.blit(text, (self.WIDTH//2 - text.get_width()//2, 20))
if self.render_mode == "human":
pygame.event.pump()
pygame.display.update()
# Use bulletproof fps getter
render_fps = getattr(type(self), "metadata", {}).get("render_fps", 60)
if self.clock is not None:
self.clock.tick(render_fps)
# ALWAYS return RGB if requested!
if return_rgb or self.render_mode == "rgb_array":
# Can always grab the pixels. Works even for "human" window.
arr = pygame.surfarray.pixels3d(self.window)
return np.transpose(np.array(arr), (1, 0, 2)).copy()


def save_episode(
self,
idx=0,
path="pong_ep.mp4",
fps=60,
resize_to=None
):
"""
Save a recorded episode to mp4 by replaying it and rendering frames as RGB.
Args:
idx (int): episode index in self.recorded_episodes
path (str): output mp4 filename
fps (int): frames per second for video
resize_to (tuple): optional (width, height) to resize frames
"""
import imageio.v2 as imageio
from PIL import Image

if not self.recorded_episodes:
print("No episodes recorded for saving.")
return
if not (0 <= idx < len(self.recorded_episodes)):
print(f"Index {idx} out of range for recorded episodes.")
return

prev_render_mode = self.render_mode
self.render_mode = "human" # Forcing human, but our _render_frame can always return RGB now

episode = self.recorded_episodes[idx]
self.reset()
# Step through episode and save frames
frames = []
for i, state in enumerate(episode):
self.set_env_state(state["env_state"])
frame = self._render_frame(return_rgb=True)
if frame is None:
frame = np.zeros((self.HEIGHT, self.WIDTH, 3), dtype=np.uint8)
if resize_to is not None:
frame = np.array(Image.fromarray(frame).resize(resize_to, Image.BICUBIC))
frames.append(frame)
self.render_mode = prev_render_mode
if frames:
imageio.mimsave(path, frames, fps=fps)
print(f"Episode video saved to {path}")
else:
print("No frames were rendered, mp4 not saved.")

if __name__ == '__main__':
import time
env = PongEnv(state_mode="coordinate_state", frames_to_return=4, render_mode=None, record_replay=True)
obs, info = env.reset()

print("Playing and recording an episode...")
done = False
while not done:
player_action = env.action_space.sample()[0]
cpu_action = -100 # AI
obs, reward, terminated, truncated, info = env.step([player_action, cpu_action])
print(obs, flush=True)
done = terminated or truncated

print(f"Recorded {len(env.recorded_episodes)} episode(s). Now replaying the first...")
time.sleep(2)
env.replay_episode(0, render_mode='human', delay=0.016)
print("Done!")
This code defines a fully custom Pong environment for reinforcement learning built upon OpenAI Gym:
  • It simulates a digital Pong game with physics-based paddle and ball movement, scoring, and collision detection.
  • The left paddle is controlled by player actions (stay, up, down), while the right paddle can be scripted or AI-controlled.
  • The environment supports two observation modes: either compact, normalized vectors representing positions and velocities (including frame stacking for temporal awareness), or raw pixel images of the game for visual deep learning approaches.
  • The action space allows both paddles to be commanded independently, enabling single- or multi-agent experiments.
  • The environment includes options for randomizing paddle positions and ball serve angles after each point.
  • Scoring and episode termination occur when a player reaches 21 points.
  • Additional advanced features help RL research, such as full episode replay recording (for video or analysis), customizable reward shaping, and exact state serialization for reproducibility.
  • Rendering is handled via Pygame, supporting both human-interactive and programmatic image outputs.

The learning algorithm

For training, we use Deep Q-Networks (DQN) in a self-play multi-agent setting:
  • Independent learners: Each agent (the two paddles in Pong) has its own DQN policy. Each agent interacts with the environment, stores transitions in its own replay buffer, and learns independently.
  • Replay buffer for each agent: By storing past experiences and sampling from them, the agents mitigate instability and correlation in consecutive samples, a hallmark of modern RL methods.
  • Target networks: To stabilize training, each agent’s DQN is periodically synced to a target network. This helps prevent oscillations and divergence in Q-learning.
  • Epsilon-greedy exploration: Agents balance exploration (random action selection) and exploitation (using learned policy); epsilon decays over time to shift from exploration to exploitation.
  • Self-play: By letting both sides be learned agents (rather than hand-coded), the environment produces a continuously adapting challenge. This is crucial for developing competition-ready, robust strategies.
  • Preprocessing & reward engineering: Observations are flattened and normalized; rewards can be tuned for custom behaviors (positive for scoring, neutral or negative for failures, small positive rewards for hitting the ball etc.).
  • Logging and checkpointing: Training progress is logged W&B and model weights are periodically saved for analysis and resumption.
Why DQN?
DQN is well-supported, sample-efficient for discrete actions, and allows leveraging various neural architectures (MLPs, CNNs). In our multi-agent Pong, DQN allows both players to learn concurrently, adapting to each other’s strategies. Here's the training script:

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
from scoreOnlyPongEnv import PongEnv
import os

import wandb

# ---- DQN and ReplayBuffer ----
class DQN(nn.Module):
def __init__(self, state_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, 512), nn.ReLU(),
nn.Linear(512, 512), nn.ReLU(),
nn.Linear(512, 256), nn.ReLU(),
nn.Linear(256, 128), nn.ReLU(),
nn.Linear(128, 64), nn.ReLU(),
nn.Linear(64, action_dim)
)
def forward(self, x):
return self.net(x)

class ReplayBuffer:
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
return (
np.stack(states),
np.array(actions),
np.array(rewards, dtype=np.float32),
np.stack(next_states),
np.array(dones, dtype=np.uint8)
)
def __len__(self):
return len(self.buffer)

def get_epsilon(steps_done, EPS_START=1.0, EPS_MIN=0.05, EPS_DECAY=0.995):
return max(EPS_MIN, EPS_START * (EPS_DECAY ** steps_done))

def select_action(state, policy_net, epsilon, n_actions, device):
if random.random() < epsilon:
return random.randint(0, n_actions-1)
else:
state = torch.FloatTensor(state).unsqueeze(0).to(device)
with torch.no_grad():
q_values = policy_net(state)
return int(q_values.argmax().item())

def preprocess_obs(obs):
return obs.flatten()

def train_step(policy_net, target_net, buffer, optimizer, device, gamma=0.99, batch_size=64):
if len(buffer) < batch_size:
return None
states, actions, rewards, next_states, dones = buffer.sample(batch_size)
states = torch.FloatTensor(states).to(device)
actions = torch.LongTensor(actions).to(device).unsqueeze(1)
rewards = torch.FloatTensor(rewards).to(device).unsqueeze(1)
next_states = torch.FloatTensor(next_states).to(device)
dones = torch.FloatTensor(dones).to(device).unsqueeze(1)

q_values = policy_net(states).gather(1, actions)
with torch.no_grad():
next_q_values = target_net(next_states).max(1, keepdim=True)[0]
expected_q = rewards + (1 - dones) * gamma * next_q_values

loss = nn.functional.mse_loss(q_values, expected_q)
optimizer.zero_grad()
loss.backward()
optimizer.step()
return loss.item()

# ---- Main Self-Play Training ----
def main():
# HYPERPARAMS ...
GAMMA = 0.99
BATCH_SIZE = 64
MEMORY_SIZE = 100_000
LR = 1e-3
EPS_START = 1.0
EPS_MIN = 0.25
EPS_DECAY = 0.995
TARGET_UPDATE = 250
TRAIN_START = 2000
NUM_EPISODES = 250_000
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 2. Initialize wandb
wandb.init(project="pong-dqn-multi-agent", name="self-play", config={
"batch_size": BATCH_SIZE,
"memory_size": MEMORY_SIZE,
"lr": LR,
"eps_start": EPS_START,
"eps_min": EPS_MIN,
"eps_decay": EPS_DECAY,
"target_update": TARGET_UPDATE,
"train_start": TRAIN_START
})

env = PongEnv(state_mode="coordinate_state", frames_to_return=8, render_mode=None)
obs, _ = env.reset()
state_dim = np.prod(obs.shape)
n_actions = 3

policy_net1 = DQN(state_dim, n_actions).to(DEVICE)
policy_net2 = DQN(state_dim, n_actions).to(DEVICE)

if os.path.exists("pong_dqn.pt"):
print("Loading pretrained weights into both agents!")
# policy_net1.load_state_dict(torch.load("pong_dqn.pt", map_location=DEVICE))
# policy_net2.load_state_dict(torch.load("pong_dqn.pt", map_location=DEVICE))
else:
print("No pretrained pong_dqn.pt found. Training from scratch.")

target_net1 = DQN(state_dim, n_actions).to(DEVICE)
target_net1.load_state_dict(policy_net1.state_dict())
target_net1.eval()
optimizer1 = optim.Adam(policy_net1.parameters(), lr=LR)
replay_buffer1 = ReplayBuffer(MEMORY_SIZE)
steps_done1 = 0
all_rewards1 = []

target_net2 = DQN(state_dim, n_actions).to(DEVICE)
target_net2.load_state_dict(policy_net2.state_dict())
target_net2.eval()
optimizer2 = optim.Adam(policy_net2.parameters(), lr=LR)
replay_buffer2 = ReplayBuffer(MEMORY_SIZE)
steps_done2 = 0
all_rewards2 = []

best_avg1 = -1000
best_avg2 = -1000

for episode in range(NUM_EPISODES):
obs, _ = env.reset()
state = preprocess_obs(obs)
total_reward1 = 0
total_reward2 = 0
done = False

while not done:
epsilon1 = get_epsilon(steps_done1, EPS_START, EPS_MIN, EPS_DECAY)
epsilon2 = get_epsilon(steps_done2, EPS_START, EPS_MIN, EPS_DECAY)

action1 = select_action(state, policy_net1, epsilon1, n_actions, DEVICE)
action2 = select_action(state, policy_net2, epsilon2, n_actions, DEVICE)

obs_next, rewards, terminated, truncated, _ = env.step([action1, action2])
r1, r2 = rewards
state_next = preprocess_obs(obs_next)
done_flag = terminated or truncated

replay_buffer1.push(state, action1, r1, state_next, done_flag)
replay_buffer2.push(state, action2, r2, state_next, done_flag)

total_reward1 += r1
total_reward2 += r2

state = state_next

steps_done1 += 1
steps_done2 += 1

if steps_done1 > TRAIN_START:
train_step(policy_net1, target_net1, replay_buffer1, optimizer1, DEVICE, GAMMA, BATCH_SIZE)
if steps_done2 > TRAIN_START:
train_step(policy_net2, target_net2, replay_buffer2, optimizer2, DEVICE, GAMMA, BATCH_SIZE)

if steps_done1 % TARGET_UPDATE == 0:
target_net1.load_state_dict(policy_net1.state_dict())
if steps_done2 % TARGET_UPDATE == 0:
target_net2.load_state_dict(policy_net2.state_dict())
if done_flag:
done = True

all_rewards1.append(total_reward1)
all_rewards2.append(total_reward2)
avg1 = np.mean(all_rewards1[-20:])
avg2 = np.mean(all_rewards2[-20:])

print(f"Ep {episode} R1:{total_reward1: 6.2f} Avg20-1:{avg1: 6.2f} Eps1:{epsilon1: .3f} |"
f" R2:{total_reward2: 6.2f} Avg20-2:{avg2: 6.2f} Eps2:{epsilon2: .3f}", flush=True)

# 3. Log episode metrics to wandb
wandb.log({
"reward1": total_reward1,
"reward2": total_reward2,
"avg20_reward1": avg1,
"avg20_reward2": avg2,
"epsilon1": epsilon1,
"epsilon2": epsilon2,
"episode": episode
})

if episode % 10 == 0:
torch.save(policy_net1.state_dict(), f"pong_dqn_left_ep{episode}.pt")
torch.save(policy_net2.state_dict(), f"pong_dqn_right_ep{episode}.pt")

env.close()
wandb.finish() # Finalize W&B run

if __name__ == '__main__':
main()

Here are the training logs for my run:

Run: self-play
1

The first thing you’ll notice is that the rewards do not increase like single-agent setups. In self-play, both agents are constantly adapting to each other, so every time one agent starts to improve, it quickly becomes a harder opponent for the other. This leads to more fluctuating or plateaued reward curves, where progress looks more like a tug-of-war than a steady climb. As a result, even strong performance may show as flat or oscillating average rewards, which is typical for competitive multi-agent reinforcement learning.
Its was a bit risky implementing this specific example, as I wasn’t able to find much existing work applying this sort of environment to a self-play scenario. Fortunately, I was able to train a few agents to play pong pretty well! This was not without several iterations of the reward function, as well as discovering some strange behavior of my agents after the 100 episode mark. My training script above logs the model every 10 epochs, and these checkpoints made it easy to analyze how agent strategies evolved over time.
I also implemented a script for visualizing the performance of my agents, which made it much easier to debug problems or odd patterns. For example, one issue I spotted was that after about 100 episodes, sometimes one or both agents would simply freeze and not move at all. I'm still not entirely sure what the cause of this, but generally this kind of behavior can happen for a few reasons: sometimes agents stumble into a local optimum where staying still is inadvertently rewarded or at least not punished due to the reward structure. Another possible cause is that exploration drops too quickly (as epsilon decays), so the agents stop trying new actions and just repeat a stuck pattern. Instabilities in DQN training, such as overfitting or bad sampling, can also sometimes lead to this kind of sudden, trivial behavior.
Spotting and understanding these failures was possible thanks to having full episode replays. This allowed me to iterate on the environment and the learning setup to gradually get more playable, reliable agents.
Here’s the script to visualize the agents:
import torch
import numpy as np
from scoreOnlyPongEnv import PongEnv
import torch.nn as nn
import os
import wandb

class DQN(nn.Module):
def __init__(self, state_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, 512), nn.ReLU(),
nn.Linear(512, 512), nn.ReLU(),
nn.Linear(512, 256), nn.ReLU(),
nn.Linear(256, 128), nn.ReLU(),
nn.Linear(128, 64), nn.ReLU(),
nn.Linear(64, action_dim)
)
def forward(self, x):
return self.net(x)

def preprocess_obs(obs):
return obs.flatten()

def main():
EVAL_MODE = "selfplay" # or "selfplay"
LEFT_PATH = "a_best_models/pong_dqn_left_ep90.pt"
RIGHT_PATH = "a_best_models/pong_dqn_right_ep90.pt"
NUM_EPISODES = 3
MAX_RALLIES = 20

wandb.init(
project="pong-dqn-multi-agent",
name="pong_mp4_video_log",
group="pong_video_run"
)

env = PongEnv(state_mode="coordinate_state", frames_to_return=8, render_mode=None, record_replay=True)
obs, _ = env.reset()
state_dim = np.prod(obs.shape)
n_actions = 3

policy_left = DQN(state_dim, n_actions)
policy_left.load_state_dict(torch.load(LEFT_PATH, map_location="cpu"))
policy_left.eval()
if EVAL_MODE == "selfplay":
policy_right = DQN(state_dim, n_actions)
policy_right.load_state_dict(torch.load(RIGHT_PATH, map_location="cpu"))
policy_right.eval()
else:
policy_right = None

mp4_paths = []

for ep in range(NUM_EPISODES):
obs, info = env.reset()
state = preprocess_obs(obs)
done = False
total_reward_left = 0.0
total_reward_right = 0.0
rallies = 0
prev_ball_x_vel = env.ball_x_vel

while not done:
state_tensor = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
action_left = int(policy_left(state_tensor).argmax(1).item())
if EVAL_MODE == "selfplay":
with torch.no_grad():
action_right = int(policy_right(state_tensor).argmax(1).item())
else:
action_right = -100

obs_, rewards, terminated, truncated, info = env.step([action_left, action_right])
if np.sign(env.ball_x_vel) != np.sign(prev_ball_x_vel):
rallies += 1
prev_ball_x_vel = env.ball_x_vel

state = preprocess_obs(obs_)
total_reward_left += rewards[0]
total_reward_right += rewards[1]
done = terminated or truncated
if rallies >= MAX_RALLIES:
done = True

# --- PATCH: Make sure buffer is flushed if early stop ---
if env.record_replay and env.episode_buffer and (env.episode_buffer not in env.recorded_episodes):
env.recorded_episodes.append(env.episode_buffer)

mp4_path = os.path.join(wandb.run.dir, f"pong_ep{ep:02d}.mp4")
env.save_episode(idx=ep, path=mp4_path, fps=60, resize_to=(800, 400))
mp4_paths.append(mp4_path)
print(f"Saved: {mp4_path}")

# LOG THEM TO WANDB AS mp4
artifact_dict = {}
for ep, mp4_path in enumerate(mp4_paths):
if os.path.exists(mp4_path):
tag = f"pong_ep{ep:02d}_video"
artifact_dict[tag] = wandb.Video(mp4_path, caption=tag, fps=60, format="mp4")
print(f"WandB Video logged: {mp4_path}")

# Log all at once
wandb.log(artifact_dict)
wandb.finish()
print("Done! mp4s saved and logged to wandb.")

if __name__ == "__main__":
main()



Run: pong_mp4_video_log
1

And here's an example of the agent playing against the CPU, which is simply aligning itself with the ball:

Run: pong_mp4_video_log
1


Conclusion

Multi-agent reinforcement learning (MARL) extends traditional RL to environments where multiple agents interact, collaborate, or compete—reflecting the complexity of real-world systems. By combining foundational ideas from game theory and optimization, leveraging modern frameworks, and addressing challenges through practical algorithm design, MARL enables the creation of adaptive and intelligent multi-agent systems.
As demonstrated by the custom Pong example, MARL brings both opportunities and unique difficulties, especially in training stability and strategy evolution. With continued advances in algorithms, tools, and environments, MARL stands to play a central role in the development of versatile AI agents capable of effective teamwork, competition, and negotiation in diverse scenarios.







Iterate on AI agents and models faster. Try Weights & Biases today.