我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.string_types()。
def bool_from_string(value): """Interpret string value as boolean. Returns True if value translates to True otherwise False. """ if isinstance(value, six.string_types): value = six.text_type(value) else: msg = "Unable to interpret non-string value '%s' as boolean" % (value) raise ValueError(msg) value = value.strip().lower() if value in ['y', 'yes', 'true', 't', 'on']: return True elif value in ['n', 'no', 'false', 'f', 'off']: return False msg = "Unable to interpret string value '%s' as boolean" % (value) raise ValueError(msg)
def log(message, level=None): """Write a message to the juju log""" command = ['juju-log'] if level: command += ['-l', level] if not isinstance(message, six.string_types): message = repr(message) command += [message] # Missing juju-log should not cause failures in unit tests # Send log output to stderr try: subprocess.call(command) except OSError as e: if e.errno == errno.ENOENT: if level: message = "{}: {}".format(level, message) message = "juju-log: {}".format(message) print(message, file=sys.stderr) else: raise
def remove_cache_tier(self, cache_pool): """ Removes a cache tier from Ceph. Flushes all dirty objects from writeback pools and waits for that to complete. :param cache_pool: six.string_types. The cache tier pool name to remove. :return: None """ # read-only is easy, writeback is much harder mode = get_cache_mode(self.service, cache_pool) version = ceph_version() if mode == 'readonly': check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none']) check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool]) elif mode == 'writeback': pool_forward_cmd = ['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'forward'] if version >= '10.1': # Jewel added a mandatory flag pool_forward_cmd.append('--yes-i-really-mean-it') check_call(pool_forward_cmd) # Flush the cache and wait for it to return check_call(['rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all']) check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name]) check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
def get_mon_map(service): """ Returns the current monitor map. :param service: six.string_types. The Ceph user name to run the command under :return: json string. :raise: ValueError if the monmap fails to parse. Also raises CalledProcessError if our ceph command fails """ try: mon_status = check_output( ['ceph', '--id', service, 'mon_status', '--format=json']) try: return json.loads(mon_status) except ValueError as v: log("Unable to parse mon_status json: {}. Error: {}".format( mon_status, v.message)) raise except CalledProcessError as e: log("mon_status command failed with message: {}".format( e.message)) raise
def hash_monitor_names(service): """ Uses the get_mon_map() function to get information about the monitor cluster. Hash the name of each monitor. Return a sorted list of monitor hashes in an ascending order. :param service: six.string_types. The Ceph user name to run the command under :rtype : dict. json dict of monitor name, ip address and rank example: { 'name': 'ip-172-31-13-165', 'rank': 0, 'addr': '172.31.13.165:6789/0'} """ try: hash_list = [] monitor_list = get_mon_map(service=service) if monitor_list['monmap']['mons']: for mon in monitor_list['monmap']['mons']: hash_list.append( hashlib.sha224(mon['name'].encode('utf-8')).hexdigest()) return sorted(hash_list) else: return None except (ValueError, CalledProcessError): raise
def monitor_key_set(service, key, value): """ Sets a key value pair on the monitor cluster. :param service: six.string_types. The Ceph user name to run the command under :param key: six.string_types. The key to set. :param value: The value to set. This will be converted to a string before setting """ try: check_output( ['ceph', '--id', service, 'config-key', 'put', str(key), str(value)]) except CalledProcessError as e: log("Monitor config-key put failed with message: {}".format( e.output)) raise
def monitor_key_get(service, key): """ Gets the value of an existing key in the monitor cluster. :param service: six.string_types. The Ceph user name to run the command under :param key: six.string_types. The key to search for. :return: Returns the value of that key or None if not found. """ try: output = check_output( ['ceph', '--id', service, 'config-key', 'get', str(key)]) return output except CalledProcessError as e: log("Monitor config-key get failed with message: {}".format( e.output)) return None
def monitor_key_exists(service, key): """ Searches for the existence of a key in the monitor cluster. :param service: six.string_types. The Ceph user name to run the command under :param key: six.string_types. The key to search for :return: Returns True if the key exists, False if not and raises an exception if an unknown error occurs. :raise: CalledProcessError if an unknown error occurs """ try: check_call( ['ceph', '--id', service, 'config-key', 'exists', str(key)]) # I can return true here regardless because Ceph returns # ENOENT if the key wasn't found return True except CalledProcessError as e: if e.returncode == errno.ENOENT: return False else: log("Unknown error from ceph config-get exists: {} {}".format( e.returncode, e.output)) raise
def remove_pool_snapshot(service, pool_name, snapshot_name): """ Remove a snapshot from a RADOS pool in ceph. :param service: six.string_types. The Ceph user name to run the command under :param pool_name: six.string_types :param snapshot_name: six.string_types :return: None. Can raise CalledProcessError """ cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name] try: check_call(cmd) except CalledProcessError: raise # max_bytes should be an int or long
def get_cache_mode(service, pool_name): """ Find the current caching mode of the pool_name given. :param service: six.string_types. The Ceph user name to run the command under :param pool_name: six.string_types :return: int or None """ validator(value=service, valid_type=six.string_types) validator(value=pool_name, valid_type=six.string_types) out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json']) try: osd_json = json.loads(out) for pool in osd_json['pools']: if pool['pool_name'] == pool_name: return pool['cache_mode'] return None except ValueError: raise
def _validate_dict_data(self, expected, actual): """Validate dictionary data. Compare expected dictionary data vs actual dictionary data. The values in the 'expected' dictionary can be strings, bools, ints, longs, or can be a function that evaluates a variable and returns a bool. """ self.log.debug('actual: {}'.format(repr(actual))) self.log.debug('expected: {}'.format(repr(expected))) for k, v in six.iteritems(expected): if k in actual: if (isinstance(v, six.string_types) or isinstance(v, bool) or isinstance(v, six.integer_types)): # handle explicit values if v != actual[k]: return "{}:{}".format(k, actual[k]) # handle function pointers, such as not_null or valid_ip elif not v(actual[k]): return "{}:{}".format(k, actual[k]) else: return "key '{}' does not exist".format(k) return None
def __getitem__(self, index): """ Indexes an expiration state on this collection. Args: index (str): Name of the physical channel of which the expiration state to retrieve. Returns: nidaqmx.system._watchdog_modules.expiration_state.ExpirationState: The object representing the indexed expiration state. """ if isinstance(index, six.string_types): return ExpirationState(self._handle, index) else: raise DaqError( 'Invalid index type "{0}" used to access expiration states.' .format(type(index)), -1)
def _ConvertValueMessage(self, value, message): """Convert a JSON representation into Value message.""" if isinstance(value, dict): self._ConvertStructMessage(value, message.struct_value) elif isinstance(value, list): self. _ConvertListValueMessage(value, message.list_value) elif value is None: message.null_value = 0 elif isinstance(value, bool): message.bool_value = value elif isinstance(value, six.string_types): message.string_value = value elif isinstance(value, _INT_OR_FLOAT): message.number_value = value else: raise ParseError('Unexpected type for Value message.')
def generic_type_name(v): """Return a descriptive type name that isn't Python specific. For example, an int value will return 'integer' rather than 'int'.""" if isinstance(v, numbers.Integral): # Must come before real numbers check since integrals are reals too return 'integer' elif isinstance(v, numbers.Real): return 'float' elif isinstance(v, (tuple, list)): return 'list' elif isinstance(v, six.string_types): return 'string' elif v is None: return 'null' else: return type(v).__name__
def _get_url(self): """ Returns: string_type or parse.ParseResult or parse SplitResult We have a vague return type for _get_url method because we could not generalise regex support for all backends. So we are directly passing result to backends. That way users can use regex that their backend provides. """ if self.url is NotImplemented: raise NotImplementedError('To use placebo, you need to either ' 'provide url attribute or ' 'overwrite get_url method in subclass.') else: url = invoke_or_get(self.url) # if url is a string convert it to ParsedUrl # if isinstance(url, six.string_types): # url = parse.urlparse(url) # TODO: check return type return url
def get_mon_map(service): """ Returns the current monitor map. :param service: six.string_types. The Ceph user name to run the command under :return: json string. :raise: ValueError if the monmap fails to parse. Also raises CalledProcessError if our ceph command fails """ try: mon_status = check_output(['ceph', '--id', service, 'mon_status', '--format=json']) if six.PY3: mon_status = mon_status.decode('UTF-8') try: return json.loads(mon_status) except ValueError as v: log("Unable to parse mon_status json: {}. Error: {}" .format(mon_status, str(v))) raise except CalledProcessError as e: log("mon_status command failed with message: {}" .format(str(e))) raise
def monitor_key_get(service, key): """ Gets the value of an existing key in the monitor cluster. :param service: six.string_types. The Ceph user name to run the command under :param key: six.string_types. The key to search for. :return: Returns the value of that key or None if not found. """ try: output = check_output( ['ceph', '--id', service, 'config-key', 'get', str(key)]).decode('UTF-8') return output except CalledProcessError as e: log("Monitor config-key get failed with message: {}".format( e.output)) return None
def get_cache_mode(service, pool_name): """ Find the current caching mode of the pool_name given. :param service: six.string_types. The Ceph user name to run the command under :param pool_name: six.string_types :return: int or None """ validator(value=service, valid_type=six.string_types) validator(value=pool_name, valid_type=six.string_types) out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json']) if six.PY3: out = out.decode('UTF-8') try: osd_json = json.loads(out) for pool in osd_json['pools']: if pool['pool_name'] == pool_name: return pool['cache_mode'] return None except ValueError: raise
def ns_query(address): try: import dns.resolver except ImportError: if six.PY2: apt_install('python-dnspython', fatal=True) else: apt_install('python3-dnspython', fatal=True) import dns.resolver if isinstance(address, dns.name.Name): rtype = 'PTR' elif isinstance(address, six.string_types): rtype = 'A' else: return None try: answers = dns.resolver.query(address, rtype) except dns.resolver.NXDOMAIN: return None if answers: return str(answers[0]) return None
def add_roles(self, user, roles): """ Adds one or more roles to a user. Adding the same role more than once has no effect. user -- name of the user roles -- one or more roles to add """ if user not in self.users: raise UserNotDefined(user) roles = (roles,) if isinstance(roles, six.string_types) else roles for role in roles: if ',' in role: raise BadRoleError('\',\' not allowed in role name (%s) ' 'for user %s' % (role, user)) self.roles[user].add(role)
def ng_call_scope_function(self, element, func, params='', return_out=False): """ :Description: Will execute scope function with provided parameters. :Warning: This will only work for angular.js 1.x. :Warning: Requires angular debugging to be enabled. :param element: Element for browser instance to target. :param func: Function to execute from angular element scope. :type func: string :param params: String (naked) args, or list of parameters to pass to target function. :type params: string, tuple, list :param return_out: Return output of function call otherwise None :type return_out: bool """ if isinstance(params, string_types): param_str = params elif isinstance(params, (tuple, list)): param_str = self.__serialize_params(params) else: raise ValueError('Invalid type specified for function parameters') exec_str = 'angular.element(arguments[0]).scope().%s(%s);' % (func, param_str) if return_out: return self.__type2python( self.browser.execute_script('return {}'.format(exec_str), element)) else: self.browser.execute_script(exec_str, element)
def ng_call_ctrl_function(self, element, func, params='', return_out=False): """ :Description: Will execute controller function with provided parameters. :Warning: This will only work for angular.js 1.x. :Warning: Requires angular debugging to be enabled. :param element: Element for browser instance to target. :param func: Function to execute from angular element controller. :type func: string :param params: String (naked) args, or list of parameters to pass to target function. :type params: string, tuple, list :param return_out: Return output of function call otherwise None :type return_out: bool """ if isinstance(params, string_types): param_str = params elif isinstance(params, (tuple, list)): param_str = self.__serialize_params(params) else: raise ValueError('Invalid type specified for function parameters') exec_str = 'angular.element(arguments[0]).controller().%s(%s);' % (func, param_str) if return_out: return self.__type2python( self.browser.execute_script('return {}'.format(exec_str), element)) else: self.browser.execute_script(exec_str, element)
def ng2_call_component_function(self, element, func, params='', return_out=False): """ :Description: Will execute the component instance function with provided parameters. :Warning: This will only work for Angular components. :param element: Element for browser instance to target. :param func: Function to execute from component instance. :type func: string :param params: String (naked) args, or list of parameters to pass to target function. :type params: string, tuple, list :param return_out: Return output of function call otherwise None :type return_out: bool """ if isinstance(params, string_types): param_str = params elif isinstance(params, (tuple, list)): param_str = self.__serialize_params(params) else: raise ValueError('Invalid type specified for function parameters') exec_str = 'ng.probe(arguments[0]).componentInstance.%s(%s);' % (func, param_str) if return_out: return self.__type2python( self.browser.execute_script('return {}'.format(exec_str), element)) else: self.browser.execute_script(exec_str, element)
def ensure_timezone(func, argname, arg): """Argument preprocessor that converts the input into a tzinfo object. Usage ----- >>> from zipline.utils.preprocess import preprocess >>> @preprocess(tz=ensure_timezone) ... def foo(tz): ... return tz >>> foo('utc') <UTC> """ if isinstance(arg, tzinfo): return arg if isinstance(arg, string_types): return timezone(arg) raise TypeError( "{func}() couldn't convert argument " "{argname}={arg!r} to a timezone.".format( func=_qualified_name(func), argname=argname, arg=arg, ), )
def user_login(self): self.log.info('Google User Login for: {}'.format(self.username)) if not isinstance(self.username, six.string_types) or not isinstance(self.password, six.string_types): raise AuthException("Username/password not correctly specified") user_login = perform_master_login(self.username, self.password, self.GOOGLE_LOGIN_ANDROID_ID) try: refresh_token = user_login.get('Token', None) except ConnectionError as e: raise AuthException("Caught ConnectionError: %s", e) if refresh_token is not None: self._refresh_token = refresh_token self.log.info('Google User Login successful.') else: self._refresh_token = None raise AuthException("Invalid Google Username/password") self.get_access_token() return self._login
def scopes_to_string(scopes): """Converts scope value to a string. If scopes is a string then it is simply passed through. If scopes is an iterable then a string is returned that is all the individual scopes concatenated with spaces. Args: scopes: string or iterable of strings, the scopes. Returns: The scopes formatted as a single string. """ if isinstance(scopes, six.string_types): return scopes else: return ' '.join(scopes)
def string_to_scopes(scopes): """Converts stringifed scope value to a list. If scopes is a list then it is simply passed through. If scopes is an string then a list of each individual scope is returned. Args: scopes: a string or iterable of strings, the scopes. Returns: The scopes in a list. """ if not scopes: return [] if isinstance(scopes, six.string_types): return scopes.split(' ') else: return scopes
def __init__(self, value): """ Initializer value can be: - integer_type: absolute days from epoch (1970, 1, 1). Can be negative. - datetime.date: built-in date - string_type: a string time of the form "yyyy-mm-dd" """ if isinstance(value, six.integer_types): self.days_from_epoch = value elif isinstance(value, (datetime.date, datetime.datetime)): self._from_timetuple(value.timetuple()) elif isinstance(value, six.string_types): self._from_datestring(value) else: raise TypeError('Date arguments must be a whole number, datetime.date, or string')
def process_response(self, response): import six body = response['body']['string'] # backward compatibility. under 2.7 response body is unicode, under 3.5 response body is # bytes. when set the value back, the same type must be used. body_is_string = isinstance(body, six.string_types) content_in_string = (response['body']['string'] or b'').decode('utf-8') index = content_in_string.find(LargeResponseBodyProcessor.control_flag) if index > -1: length = int(content_in_string[index + len(LargeResponseBodyProcessor.control_flag):]) if body_is_string: response['body']['string'] = '0' * length else: response['body']['string'] = bytes([0] * length) return response
def _process_response_recording(self, response): if self.in_recording: # make header name lower case and filter unwanted headers headers = {} for key in response['headers']: if key.lower() not in self.FILTER_HEADERS: headers[key.lower()] = response['headers'][key] response['headers'] = headers body = response['body']['string'] if body and not isinstance(body, six.string_types): response['body']['string'] = body.decode('utf-8') for processor in self.recording_processors: response = processor.process_response(response) if not response: break else: for processor in self.replay_processors: response = processor.process_response(response) if not response: break return response
def __init__(self, fileobj): global rrule if not rrule: from dateutil import rrule if isinstance(fileobj, string_types): self._s = fileobj # ical should be encoded in UTF-8 with CRLF fileobj = open(fileobj, 'r') elif hasattr(fileobj, "name"): self._s = fileobj.name else: self._s = repr(fileobj) self._vtz = {} self._parse_rfc(fileobj.read())
def extract_options(inc, global_options=None): global_options = global_options or [] if global_options and isinstance(global_options, six.string_types): global_options = [global_options] if '|' not in inc: return (inc, global_options) inc, opts = inc.split('|') return (inc, parse_sync_options(opts) + global_options)