worker.py

#
import cv2
import multiprocessing
import multiprocessing.connection
import gym
import numpy as np
#

Game environment

This is a wrapper for OpenAI gym game environment. We do a few things here:

  1. Apply the same action on four frames
  2. Convert observation frames to gray and scale it to (84, 84)
  3. Take the maximum of last two of those four frames
  4. Collect four such frames for last three actions
  5. Add episode information (total reward for the entire episode) for monitoring
  6. Restrict an episode to a single life (game has 5 lives, we reset after every single life)

Observation format

Observation is tensor of size (84, 84, 4). It is four frames (images of the game screen) stacked on last axis. i.e, each channel is a frame.

Frames    00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15
Actions   a1 a1 a1 a1 a2 a2 a2 a2 a3 a3 a3 a3 a4 a4 a4 a4
Max       -- -- MM MM -- -- MM MM -- -- MM MM -- -- MM MM
Stacked   -- -- Stack -- -- Stack -- -- Stack -- -- Stack
class Game(gym.Wrapper):
#
#

Initialize

    def __init__(self, env: gym.Env, is_stack: bool):
#
        gym.Wrapper.__init__(self, env)
        self.is_stack = is_stack

        if self.is_stack:
            self.observation_space = gym.spaces.Box(low=0, high=255,
                                                    shape=(84, 84, 4),
                                                    dtype=np.uint8)
        else:
            self.observation_space = gym.spaces.Box(low=0, high=255,
                                                    shape=(84, 84, 1),
                                                    dtype=np.uint8)
#

buffer to take the maximum of last 2 frames for each action

        self.obs_2_max = np.zeros((2, 84, 84, 1), np.uint8)
#

tensor for a stack of 4 frames

        self.obs_4 = np.zeros((84, 84, 4))
#

keep track of the episode rewards

        self.rewards = []
#

and number of lives left

        self.lives = 0
#

Step

Executes action for 4 time steps and returns a tuple of (observation, reward, done, episode_info).

  • observation: stacked 4 frames (this frame and frames for last 3 actions) as described above
  • reward: total reward while the action was executed
  • done: whether the episode finished (a life lost)
  • episode_info: episode information if completed
    def step(self, action):
#
        reward = 0.
        done = None
#

run for 4 steps

        for i in range(4):
#

execute the action in the OpenAI Gym environment

            obs, r, done, info = self.env.step(action)
#

add last two frames to buffer

            if i >= 2:
                self.obs_2_max[i % 2] = self._process_obs(obs)

            reward += r
#

get number of lives left

            lives = self.env.unwrapped.ale.lives()
#

reset if a life is lost

            if lives < self.lives:
                done = True
            self.lives = lives
#

stop if episode finished

            if done:
                break
#

maintain rewards for each step

        self.rewards.append(reward)

        if done:
#

if finished, set episode information if episode is over, and reset

            episode_info = {"reward": sum(self.rewards),
                            "length": len(self.rewards)}
            self.reset()
        else:
            episode_info = None
#

get the max of last two frames

            obs = self.obs_2_max.max(axis=0)
#

push it to the stack of 4 frames

            self.obs_4 = np.roll(self.obs_4, shift=-1, axis=-1)
            self.obs_4[..., -1:] = obs

        if self.is_stack:
            return self.obs_4, reward, done, episode_info
        else:
            return self.obs_4[..., 3:], reward, done, episode_info
#

Reset environment

Clean up episode info and 4 frame stack

    def reset(self):
#
#

reset OpenAI Gym environment

        obs = self.env.reset()
        for _ in range(30):
            obs, _, _, _ = self.env.step(0)
#

Fire and make a move

        obs, _, _, _ = self.env.step(1)
        obs, _, _, _ = self.env.step(2)
#

reset caches

        obs = self._process_obs(obs)
        self.obs_4[..., 0:] = obs
        self.obs_4[..., 1:] = obs
        self.obs_4[..., 2:] = obs
        self.obs_4[..., 3:] = obs
        self.rewards = []

        self.lives = self.env.unwrapped.ale.lives()

        if self.is_stack:
            return self.obs_4
        else:
            return self.obs_4[..., 3:]
#

Process game frames

Convert game frames to gray and rescale to 84x84

    @staticmethod
    def _process_obs(obs):
#
        obs = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
        obs = cv2.resize(obs, (84, 84), interpolation=cv2.INTER_AREA)
        return obs[:, :, None]  # Shape (84, 84, 1)
#

Without the scoreboard

    @staticmethod
    def _process_obs_2(obs):
#
        img = np.reshape(obs, [210, 160, 3]).astype(np.float32)
        img = img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 + img[:, :, 2] * 0.114
        resized_screen = cv2.resize(img, (84, 110), interpolation=cv2.INTER_LINEAR)
        x_t = resized_screen[18:102, :]
        x_t = np.reshape(x_t, [84, 84, 1])
        return x_t.astype(np.uint8)
#

Create Game

def make_game(seed: int, is_stack: bool):
#

create gym environment

    env = gym.make('BreakoutNoFrameskip-v4')
    env.seed(seed)

    game = Game(env, is_stack)

    return game
#

Worker Process

Each worker process runs this method

def worker_process(remote: multiprocessing.connection.Connection, seed: int):
#
#

create game

    game = make_game(seed, is_stack=True)
#

wait for instructions from the connection and execute them

    while True:
        cmd, data = remote.recv()
        if cmd == "step":
            remote.send(game.step(data))
        elif cmd == "reset":
            remote.send(game.reset())
        elif cmd == "close":
            remote.close()
            break
        else:
            raise NotImplementedError
#

Worker

Creates a new worker and runs it in a separate process.

class Worker(object):
#
    child: multiprocessing.connection.Connection
    process: multiprocessing.Process
#
    def __init__(self, seed):
        self.child, parent = multiprocessing.Pipe()
        self.process = multiprocessing.Process(target=worker_process, args=(parent, seed))
        self.process.start()