我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用gym.utils()。
def __init__(self, **kwargs): utils.EzPickle.__init__(self) self.curr_seed = 0 self.screen = np.zeros((SCREEN_HEIGHT, SCREEN_WIDTH, 3), dtype=np.uint8) self.closed = False self.can_send_command = True self.command_cond = Condition() self.viewer = None self.reward = 0 episode_time_length_secs = 7 frame_skip = 5 fps = 60 self.episode_length = episode_time_length_secs * fps / frame_skip self.actions = [ 'U', 'D', 'L', 'R', 'UR', 'DR', 'URA', 'DRB', 'A', 'B', 'RB', 'RA'] self.action_space = spaces.Discrete(len(self.actions)) self.frame = 0 # for communication with emulator self.pipe_in = None self.pipe_out = None self.thread_incoming = None self.rom_file_path = None self.lua_interface_path = None self.emulator_started = False ## ---------- gym.Env methods -------------
def _seed(self, seed=None): self.np_random, seed = gym.utils.seeding.np_random(seed) return [seed]
def __init__(self, game='pong', obs_type='ram', frameskip=(2, 5), repeat_action_probability=0.): """Frameskip should be either a tuple (indicating a random range to choose from, with the top value exclude), or an int.""" utils.EzPickle.__init__(self, game, obs_type) assert obs_type in ('ram', 'image') self.game_path = atari_py.get_game_path(game) if not os.path.exists(self.game_path): raise IOError('You asked for game %s but path %s does not exist'%(game, self.game_path)) self._obs_type = obs_type self.frameskip = frameskip self.ale = atari_py.ALEInterface() self.viewer = None # Tune (or disable) ALE's action repeat: # https://github.com/openai/gym/issues/349 assert isinstance(repeat_action_probability, (float, int)), "Invalid repeat_action_probability: {!r}".format(repeat_action_probability) self.ale.setFloat('repeat_action_probability'.encode('utf-8'), repeat_action_probability) self._seed() (screen_width, screen_height) = self.ale.getScreenDims() self._action_set = self.ale.getMinimalActionSet() self.action_space = spaces.Discrete(len(self._action_set)) (screen_width,screen_height) = self.ale.getScreenDims() if self._obs_type == 'ram': self.observation_space = spaces.Box(low=np.zeros(128), high=np.zeros(128)+255) elif self._obs_type == 'image': self.observation_space = spaces.Box(low=0, high=255, shape=(screen_height, screen_width, 3)) else: raise error.Error('Unrecognized observation type: {}'.format(self._obs_type))
def _seed(self, seed=None): from gym.utils import seeding self.np_random, seed = seeding.np_random(seed) return [seed]
def _render(self, mode='human', close=False): if mode == 'rgb_array': return self.get_bitmap() # utils functions
def __init__(self, game='classic_kong', obs_type='ram', frameskip=(2, 5), repeat_action_probability=0.): """Frameskip should be either a tuple (indicating a random range to choose from, with the top value exclude), or an int.""" utils.EzPickle.__init__(self, game, obs_type) assert obs_type in ('ram', 'image') self.game_path = self.get_rom_path(game) self._obs_type = obs_type self.frameskip = frameskip self.rle = rle_python_interface.RLEInterface() self.viewer = None # Tune (or disable) RLE's action repeat: # https://github.com/openai/gym/issues/349 assert isinstance(repeat_action_probability, (float, int)), "Invalid repeat_action_probability: {!r}".format(repeat_action_probability) self.rle.setFloat('repeat_action_probability'.encode('utf-8'), repeat_action_probability) self._seed() (screen_width, screen_height) = self.rle.getScreenDims() self._buffer = np.empty((screen_height, screen_width, 4), dtype=np.uint8) self._action_set = self.rle.getMinimalActionSet() self.action_space = spaces.Discrete(len(self._action_set)) (screen_width,screen_height) = self.rle.getScreenDims() ram_size = self.rle.getRAMSize() if self._obs_type == 'ram': self.observation_space = spaces.Box(low=np.zeros(ram_size), high=np.zeros(ram_size)+255) elif self._obs_type == 'image': self.observation_space = spaces.Box(low=0, high=255, shape=(screen_height, screen_width, )) else: raise error.Error('Unrecognized observation type: {}'.format(self._obs_type))
def __init__(self, game='pong', obs_type='ram', frameskip=(2, 5), repeat_action_probability=0.): """Frameskip should be either a tuple (indicating a random range to choose from, with the top value exclude), or an int.""" utils.EzPickle.__init__(self, game, obs_type) assert obs_type in ('ram', 'image') self.game_path = atari_py.get_game_path(game) if not os.path.exists(self.game_path): raise IOError('You asked for game %s but path %s does not exist'%(game, self.game_path)) self._obs_type = obs_type self.frameskip = frameskip self.ale = atari_py.ALEInterface() self.viewer = None # Tune (or disable) ALE's action repeat: # https://github.com/openai/gym/issues/349 assert isinstance(repeat_action_probability, (float, int)), "Invalid repeat_action_probability: {!r}".format(repeat_action_probability) self.ale.setFloat('repeat_action_probability'.encode('utf-8'), repeat_action_probability) self._seed() (screen_width, screen_height) = self.ale.getScreenDims() self._buffer = np.empty((screen_height, screen_width, 4), dtype=np.uint8) self._action_set = self.ale.getMinimalActionSet() self.action_space = spaces.Discrete(len(self._action_set)) (screen_width,screen_height) = self.ale.getScreenDims() if self._obs_type == 'ram': self.observation_space = spaces.Box(low=np.zeros(128), high=np.zeros(128)+255) elif self._obs_type == 'image': self.observation_space = spaces.Box(low=0, high=255, shape=(screen_height, screen_width, 3)) else: raise error.Error('Unrecognized observation type: {}'.format(self._obs_type))
def __init__(self, client_id, base_url=allocator_base, address_type=None, start_timeout=None, api_key=None, runtime_id=None, params=None, placement=None, use_recorder_ports=False, ): super(AllocatorManager, self).__init__() self.label = 'AllocatorManager' self.supports_reconnect = True self.connect_vnc = True self.connect_rewarder = True if address_type is None: address_type = 'public' if address_type not in ['public', 'pod', 'private']: raise error.Error('Bad address type specified: {}. Must be public, pod, or private.'.format(address_type)) self.client_id = client_id self.address_type = address_type if start_timeout is None: start_timeout = 20 * 60 self.start_timeout = start_timeout self.params = params self.placement = placement self.use_recorder_ports = use_recorder_ports # if base_url is None: # base_url = scoreboard.api_base # if base_url is None: # base_url = gym_base_url # if api_key is None: # api_key = scoreboard.api_key # if api_key is None: # raise gym.error.AuthenticationError("""You must provide an OpenAI Gym API key. # (HINT: Set your API key using "gym.scoreboard.api_key = .." or "export OPENAI_GYM_API_KEY=..."). You can find your API key in the OpenAI Gym web interface: https://gym.openai.com/settings/profile.""") if api_key is None: api_key = _api_key self._requestor = AllocatorClient(self.label, api_key, base_url=base_url) self.base_url = base_url # These could be overridden on a per-allocation basis, if you # want heterogeoneous envs. We don't support those currently # in the higher layers, but this layer could support it # easily. self.runtime_id = runtime_id self.pending = {} self.error_buffer = utils.ErrorBuffer() self.requests = queue.Queue() self.ready = queue.Queue() self._reconnect_history = {} self._sleep = 1
def __init__(self): utils.EzPickle.__init__(self) self.rom_path = '' self.screen_height = 224 self.screen_width = 256 self.action_space = spaces.MultiDiscrete([[0, 1]] * NUM_ACTIONS) self.observation_space = spaces.Box(low=0, high=255, shape=(self.screen_height, self.screen_width, 3)) self.launch_vars = {} self.cmd_args = ['--xscale 2', '--yscale 2', '-f 0'] self.lua_path = [] self.subprocess = None self.no_render = True self.viewer = None # Pipes self.pipe_name = '' self.path_pipe_prefix = os.path.join(tempfile.gettempdir(), 'smb-fifo') self.path_pipe_in = '' # Input pipe (maps to fceux out-pipe and to 'in' file) self.path_pipe_out = '' # Output pipe (maps to fceux in-pipe and to 'out' file) self.pipe_out = None self.lock_out = Lock() self.disable_in_pipe = False self.disable_out_pipe = False self.launch_vars['pipe_name'] = '' self.launch_vars['pipe_prefix'] = self.path_pipe_prefix # Other vars self.is_initialized = 0 # Used to indicate fceux has been launched and is running self.is_exiting = 0 # Used to stop the listening thread self.last_frame = 0 # Last processed frame self.reward = 0 # Reward for last action self.episode_reward = 0 # Total rewards for episode self.is_finished = False self.screen = np.zeros(shape=(self.screen_height, self.screen_width, 3), dtype=np.uint8) self.info = {} self.level = 0 self._reset_info_vars() self.first_step = False self.lock = (NesLock()).get_lock() # Seeding self.curr_seed = 0 self._seed()