import cv2
import multiprocessing
import multiprocessing.connection
import gym
import numpy as np
This is a wrapper for OpenAI gym game environment. We do a few things here:
Observation is tensor of size (84, 84, 4). It is four frames (images of the game screen) stacked on last axis. i.e, each channel is a frame.
Frames 00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15
Actions a1 a1 a1 a1 a2 a2 a2 a2 a3 a3 a3 a3 a4 a4 a4 a4
Max -- -- MM MM -- -- MM MM -- -- MM MM -- -- MM MM
Stacked -- -- Stack -- -- Stack -- -- Stack -- -- Stack
class Game(gym.Wrapper):
def __init__(self, env: gym.Env, is_stack: bool):
gym.Wrapper.__init__(self, env)
self.is_stack = is_stack
if self.is_stack:
self.observation_space = gym.spaces.Box(low=0, high=255,
shape=(84, 84, 4),
dtype=np.uint8)
else:
self.observation_space = gym.spaces.Box(low=0, high=255,
shape=(84, 84, 1),
dtype=np.uint8)
buffer to take the maximum of last 2 frames for each action
self.obs_2_max = np.zeros((2, 84, 84, 1), np.uint8)
tensor for a stack of 4 frames
self.obs_4 = np.zeros((84, 84, 4))
keep track of the episode rewards
self.rewards = []
and number of lives left
self.lives = 0
Executes action
for 4 time steps and
returns a tuple of (observation, reward, done, episode_info).
observation
: stacked 4 frames (this frame and frames for last 3 actions) as described abovereward
: total reward while the action was executeddone
: whether the episode finished (a life lost)episode_info
: episode information if completed def step(self, action):
reward = 0.
done = None
run for 4 steps
for i in range(4):
execute the action in the OpenAI Gym environment
obs, r, done, info = self.env.step(action)
add last two frames to buffer
if i >= 2:
self.obs_2_max[i % 2] = self._process_obs(obs)
reward += r
get number of lives left
lives = self.env.unwrapped.ale.lives()
reset if a life is lost
if lives < self.lives:
done = True
self.lives = lives
stop if episode finished
if done:
break
maintain rewards for each step
self.rewards.append(reward)
if done:
if finished, set episode information if episode is over, and reset
episode_info = {"reward": sum(self.rewards),
"length": len(self.rewards)}
self.reset()
else:
episode_info = None
get the max of last two frames
obs = self.obs_2_max.max(axis=0)
push it to the stack of 4 frames
self.obs_4 = np.roll(self.obs_4, shift=-1, axis=-1)
self.obs_4[..., -1:] = obs
if self.is_stack:
return self.obs_4, reward, done, episode_info
else:
return self.obs_4[..., 3:], reward, done, episode_info
def reset(self):
reset OpenAI Gym environment
obs = self.env.reset()
for _ in range(30):
obs, _, _, _ = self.env.step(0)
Fire and make a move
obs, _, _, _ = self.env.step(1)
obs, _, _, _ = self.env.step(2)
reset caches
obs = self._process_obs(obs)
self.obs_4[..., 0:] = obs
self.obs_4[..., 1:] = obs
self.obs_4[..., 2:] = obs
self.obs_4[..., 3:] = obs
self.rewards = []
self.lives = self.env.unwrapped.ale.lives()
if self.is_stack:
return self.obs_4
else:
return self.obs_4[..., 3:]
@staticmethod
def _process_obs(obs):
obs = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
obs = cv2.resize(obs, (84, 84), interpolation=cv2.INTER_AREA)
return obs[:, :, None] # Shape (84, 84, 1)
@staticmethod
def _process_obs_2(obs):
img = np.reshape(obs, [210, 160, 3]).astype(np.float32)
img = img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 + img[:, :, 2] * 0.114
resized_screen = cv2.resize(img, (84, 110), interpolation=cv2.INTER_LINEAR)
x_t = resized_screen[18:102, :]
x_t = np.reshape(x_t, [84, 84, 1])
return x_t.astype(np.uint8)
def make_game(seed: int, is_stack: bool):
create gym environment
env = gym.make('BreakoutNoFrameskip-v4')
env.seed(seed)
game = Game(env, is_stack)
return game
def worker_process(remote: multiprocessing.connection.Connection, seed: int):
create game
game = make_game(seed, is_stack=True)
wait for instructions from the connection and execute them
while True:
cmd, data = remote.recv()
if cmd == "step":
remote.send(game.step(data))
elif cmd == "reset":
remote.send(game.reset())
elif cmd == "close":
remote.close()
break
else:
raise NotImplementedError
class Worker(object):
child: multiprocessing.connection.Connection
process: multiprocessing.Process
def __init__(self, seed):
self.child, parent = multiprocessing.Pipe()
self.process = multiprocessing.Process(target=worker_process, args=(parent, seed))
self.process.start()