我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用past.builtins.basestring()。
def generate(self, data_keys, dag_output_path=None): if isinstance(data_keys, basestring): data_keys = (data_keys,) involved_dag, generation_order = self.build_involved_dag(data_keys) if dag_output_path is not None: draw_dag(involved_dag, dag_output_path) # generate data for node in generation_order: node_attr = involved_dag.node[node] if node_attr['skipped']: continue self._generate_one( involved_dag, node, node_attr['func_name'], node_attr['handler'], node_attr['handler_kwargs'], node_attr['__re_args__'], node_attr['mode']) return involved_dag
def get_data_keys_from_structure(structure): data_keys = [] def _get_data_keys_from_structure(structure): if isinstance(structure, basestring): data_keys.append(structure) elif isinstance(structure, list): data_keys.extend(structure) elif isinstance(structure, dict): for _, val in six.viewitems(structure): _get_data_keys_from_structure(val) else: raise TypeError("The bundle structure only support " "dict, list and str.") _get_data_keys_from_structure(structure) return data_keys
def sequential(x, net, defaults = {}, name = '', reuse = None, var = {}, layers = {}): layers = dict(list(layers.items()) + list(predefined_layers.items())) y = x logging.info('Building Sequential Network : %s', name) with tf.variable_scope(name, reuse = reuse): for i in range(len(net)): ltype = net[i][0] lcfg = net[i][1] if len(net[i]) == 2 else {} lname = lcfg.get('name', ltype + str(i)) ldefs = defaults.get(ltype, {}) lcfg = dict(list(ldefs.items()) + list(lcfg.items())) for k, v in list(lcfg.items()): if isinstance(v, basestring) and v[0] == '$': # print var, v lcfg[k] = var[v[1:]] y = layers[ltype](y, lname, **lcfg) logging.info('\t %s \t %s', lname, y.get_shape().as_list()) return y
def extract_database_schema(pymongo_database, collection_names=None): """ Extract the database schema, for every collection in collection_names :param pymongo_database: pymongo.database.Database :param collection_names: str, list of str, default None :return database_schema: dict """ if isinstance(collection_names, basestring): collection_names = [collection_names] database_collections = pymongo_database.collection_names(include_system_collections=False) if collection_names is None: collection_names = database_collections else: collection_names = [col for col in collection_names if col in database_collections] database_schema = dict() for collection in collection_names: logger.info('...collection %s', collection) pymongo_collection = pymongo_database[collection] database_schema[collection] = extract_collection_schema(pymongo_collection) return database_schema
def data(self, value): if isinstance(value, bool): self._data_type = bool self._pb.bool_data = value elif isinstance(value, int): self._data_type = int self._pb.int64_data = value elif isinstance(value, float): self._data_type = float self._pb.float64_data = value elif isinstance(value, basestring): self._data_type = str self._pb.string_data = value elif isinstance(value, bytes): self._data_type = bytes self._pb.bytes_data = value else: raise TypeError("Unsupported data type '{}'. (Supported: " "int, long, float, str and bool)".format(value))
def _check_key(key): errors = [] if isinstance(key, tuple): for i in key: if not isinstance(i, basestring): errors.append("Expected: string, Received: {}:{}" .format(type(i), i)) elif isinstance(key, basestring): return (key,), key elif key is None: return "", "" else: raise TypeError("Expected: tuple of strings, Received: {}" .format(type(key))) if len(errors) > 0: raise TypeError(errors) return key, ".".join(key)
def submit(self, job): # TODO: inherit docstring self._check_job(job) # Wraps the main command to copy inputs into working dir and copy outputs out cmdstring = ['export CCC_WORKDIR=`pwd`'] if job.inputs: cmdstring = ['cp -rf /inputs/* .'] + cmdstring if isinstance(job.command, basestring): cmdstring.append(job.command) else: cmdstring.append(' '.join(job.command)) cmdstring.append('cd $CCC_WORKDIR && cp -r * /outputs 2>/dev/null') returnval = self.proxy.submitjob(image=job.image, command=['sh', '-c', ' && '.join(cmdstring)], inputs=job.inputs, cpus=job.numcpus, # how is this the "minimum"? maxDuration=1000*job.runtime, workingDir='/workingdir') job.jobid = returnval['jobId'] job._result_json = None
def __init__( self, partition_names, metastore_conn_id='metastore_default', poke_interval=60 * 3, *args, **kwargs): super(NamedHivePartitionSensor, self).__init__( poke_interval=poke_interval, *args, **kwargs) if isinstance(partition_names, basestring): raise TypeError('partition_names must be an array of strings') self.metastore_conn_id = metastore_conn_id self.partition_names = partition_names self.next_poke_idx = 0
def configure(self): # process the parsed view variables if 'path' not in self.vars: raise MissingParameterError("Missing variable \"path\" in view: %s" % self.name) self.path = self.vars['path'] # if it's a complex view if isinstance(self.path, dict): self.configure_complex_view(self.path) elif isinstance(self.path, basestring): self.configure_simple_view(self.path) else: raise ValueError("Unrecognised structure for \"path\" configuration in view: %s" % self.name) # if we don't have a template yet if self.template is None: # try to load it if 'template' not in self.vars: raise MissingParameterError("Missing variable \"template\" in view: %s" % self.name) self.template = self.template_engine.load_template(self.vars['template']) self.configure_context()
def _loggers_from_logcfg(logcfg, logopt): def to_stdout(loggers, opt): def one(loggers, one): if isinstance(one, basestring): return one, next(row for row in loggers if one in row[0])[1] else: return one return [one(loggers, x) for x in opt] def to_file(loggers): return [(name, row[2]) for row in loggers for name in row[0]] return Loggers( stdout=to_stdout(logcfg._loggers, logopt), file_=to_file(logcfg._loggers) )
def test_basestring(self): """ The 2to3 basestring fixer breaks working Py2 code that uses basestring. This tests whether something sensible is done instead. """ before = """ assert isinstance('hello', basestring) assert isinstance(u'hello', basestring) assert isinstance(b'hello', basestring) """ after = """ from past.builtins import basestring assert isinstance('hello', basestring) assert isinstance(u'hello', basestring) assert isinstance(b'hello', basestring) """ self.convert_check(before, after)
def test_basestring_issue_156(self): before = """ x = str(3) allowed_types = basestring, int assert isinstance('', allowed_types) assert isinstance(u'', allowed_types) assert isinstance(u'foo', basestring) """ after = """ from builtins import str from past.builtins import basestring x = str(3) allowed_types = basestring, int assert isinstance('', allowed_types) assert isinstance(u'', allowed_types) assert isinstance(u'foo', basestring) """ self.convert_check(before, after)
def test_basestring(self): """ In conservative mode, futurize would not modify "basestring" but merely import it from ``past``, and the following code would still run on both Py2 and Py3. """ before = """ assert isinstance('hello', basestring) assert isinstance(u'hello', basestring) assert isinstance(b'hello', basestring) """ after = """ from past.builtins import basestring assert isinstance('hello', basestring) assert isinstance(u'hello', basestring) assert isinstance(b'hello', basestring) """ self.convert_check(before, after, conservative=True)
def abs_url(self, url): """Given a relative or absolute URL; return an absolute URL. Args: url(basestring): A relative or absolute URL. Returns: str: An absolute URL. """ parsed_url = urllib.parse.urlparse(url) if not parsed_url.scheme and not parsed_url.netloc: # url is a relative URL; combine with base_url return urllib.parse.urljoin(str(self.base_url), str(url)) else: # url is already an absolute URL; return as is return url
def get(self, url, params=None, **kwargs): """Sends a GET request. Args: url(basestring): The URL of the API endpoint. params(dict): The parameters for the HTTP GET request. **kwargs: erc(int): The expected (success) response code for the request. others: Passed on to the requests package. Raises: SparkApiError: If anything other than the expected response code is returned by the Cisco Spark API endpoint. """ assert isinstance(url, basestring) assert params is None or isinstance(params, dict) # Expected response code erc = kwargs.pop('erc', EXPECTED_RESPONSE_CODE['GET']) response = self.request('GET', url, erc, params=params, **kwargs) return extract_and_parse_json(response)
def post(self, url, json=None, data=None, **kwargs): """Sends a POST request. Args: url(basestring): The URL of the API endpoint. json: Data to be sent in JSON format in tbe body of the request. data: Data to be sent in the body of the request. **kwargs: erc(int): The expected (success) response code for the request. others: Passed on to the requests package. Raises: SparkApiError: If anything other than the expected response code is returned by the Cisco Spark API endpoint. """ assert isinstance(url, basestring) # Expected response code erc = kwargs.pop('erc', EXPECTED_RESPONSE_CODE['POST']) response = self.request('POST', url, erc, json=json, data=data, **kwargs) return extract_and_parse_json(response)
def put(self, url, json=None, data=None, **kwargs): """Sends a PUT request. Args: url(basestring): The URL of the API endpoint. json: Data to be sent in JSON format in tbe body of the request. data: Data to be sent in the body of the request. **kwargs: erc(int): The expected (success) response code for the request. others: Passed on to the requests package. Raises: SparkApiError: If anything other than the expected response code is returned by the Cisco Spark API endpoint. """ assert isinstance(url, basestring) # Expected response code erc = kwargs.pop('erc', EXPECTED_RESPONSE_CODE['PUT']) response = self.request('PUT', url, erc, json=json, data=data, **kwargs) return extract_and_parse_json(response)
def delete(self, url, **kwargs): """Sends a DELETE request. Args: url(basestring): The URL of the API endpoint. **kwargs: erc(int): The expected (success) response code for the request. others: Passed on to the requests package. Raises: SparkApiError: If anything other than the expected response code is returned by the Cisco Spark API endpoint. """ assert isinstance(url, basestring) # Expected response code erc = kwargs.pop('erc', EXPECTED_RESPONSE_CODE['DELETE']) self.request('DELETE', url, erc, **kwargs)
def get(self, personId): """Get a person's details, by ID. Args: personId(basestring): The ID of the person to be retrieved. Returns: Person: A Person object with the details of the requested person. Raises: TypeError: If the parameter types are incorrect. SparkApiError: If the Cisco Spark cloud returns an error. """ check_type(personId, basestring, may_be_none=False) # API request json_data = self._session.get('people/' + personId) # Return a Person object created from the response JSON data return Person(json_data)
def delete(self, personId): """Remove a person from the system. Only an admin can remove a person. Args: personId(basestring): The ID of the person to be deleted. Raises: AssertionError: If the parameter types are incorrect. SparkApiError: If the Cisco Spark cloud returns an error. """ check_type(personId, basestring, may_be_none=False) # API request self._session.delete('people/' + personId)
def __init__(self, base_url, timeout=None): """Initialize an AccessTokensAPI object with the provided RestSession. Args: base_url(basestring): The base URL the API endpoints. timeout(int): Timeout in seconds for the API requests. Raises: TypeError: If the parameter types are incorrect. """ check_type(base_url, basestring, may_be_none=False) check_type(timeout, int) super(AccessTokensAPI, self).__init__() self._base_url = str(validate_base_url(base_url)) self._timeout = timeout self._endpoint_url = urllib.parse.urljoin(self.base_url, API_ENDPOINT) self._request_kwargs = {"timeout": timeout}
def get(self, membershipId): """Get details for a membership, by ID. Args: membershipId(basestring): The membership ID. Returns: Membership: A Membership object with the details of the requested membership. Raises: TypeError: If the parameter types are incorrect. SparkApiError: If the Cisco Spark cloud returns an error. """ check_type(membershipId, basestring, may_be_none=False) # API request json_data = self._session.get('memberships/' + membershipId) # Return a Membership object created from the response JSON data return Membership(json_data)
def get(self, orgId): """Get the details of an Organization, by ID. Args: orgId(basestring): The ID of the Organization to be retrieved. Returns: Organization: An Organization object with the details of the requested organization. Raises: TypeError: If the parameter types are incorrect. SparkApiError: If the Cisco Spark cloud returns an error. """ check_type(orgId, basestring, may_be_none=False) # API request json_data = self._session.get('organizations/' + orgId) # Return a Organization object created from the returned JSON object return Organization(json_data)
def get(self, roleId): """Get the details of a Role, by ID. Args: roleId(basestring): The ID of the Role to be retrieved. Returns: Role: A Role object with the details of the requested Role. Raises: TypeError: If the parameter types are incorrect. SparkApiError: If the Cisco Spark cloud returns an error. """ check_type(roleId, basestring, may_be_none=False) # API request json_data = self._session.get('roles/' + roleId) # Return a Role object created from the returned JSON object return Role(json_data)
def get(self, webhookId): """Get the details of a webhook, by ID. Args: webhookId(basestring): The ID of the webhook to be retrieved. Returns: Webhook: A Webhook object with the details of the requested webhook. Raises: TypeError: If the parameter types are incorrect. SparkApiError: If the Cisco Spark cloud returns an error. """ check_type(webhookId, basestring, may_be_none=False) # API request json_data = self._session.get('webhooks/' + webhookId) # Return a Webhook object created from the response JSON data return Webhook(json_data)
def get(self, roomId): """Get the details of a room, by ID. Args: roomId(basestring): The ID of the room to be retrieved. Returns: Room: A Room object with the details of the requested room. Raises: TypeError: If the parameter types are incorrect. SparkApiError: If the Cisco Spark cloud returns an error. """ check_type(roomId, basestring, may_be_none=False) # API request json_data = self._session.get('rooms/' + roomId) # Return a Room object created from the response JSON data return Room(json_data)
def get(self, licenseId): """Get the details of a License, by ID. Args: licenseId(basestring): The ID of the License to be retrieved. Returns: License: A License object with the details of the requested License. Raises: TypeError: If the parameter types are incorrect. SparkApiError: If the Cisco Spark cloud returns an error. """ check_type(licenseId, basestring, may_be_none=False) # API request json_data = self._session.get('licenses/' + licenseId) # Return a License object created from the returned JSON object return License(json_data)
def get(self, teamId): """Get the details of a team, by ID. Args: teamId(basestring): The ID of the team to be retrieved. Returns: Team: A Team object with the details of the requested team. Raises: TypeError: If the parameter types are incorrect. SparkApiError: If the Cisco Spark cloud returns an error. """ check_type(teamId, basestring, may_be_none=False) # API request json_data = self._session.get('teams/' + teamId) # Return a Team object created from the response JSON data return Team(json_data)
def get(self, messageId): """Get the details of a message, by ID. Args: messageId(basestring): The ID of the message to be retrieved. Returns: Message: A Message object with the details of the requested message. Raises: TypeError: If the parameter types are incorrect. SparkApiError: If the Cisco Spark cloud returns an error. """ check_type(messageId, basestring, may_be_none=False) # API request json_data = self._session.get('messages/' + messageId) # Return a Message object created from the response JSON data return Message(json_data)
def get(self, membershipId): """Get details for a team membership, by ID. Args: membershipId(basestring): The team membership ID. Returns: TeamMembership: A TeamMembership object with the details of the requested team membership. Raises: TypeError: If the parameter types are incorrect. SparkApiError: If the Cisco Spark cloud returns an error. """ check_type(membershipId, basestring, may_be_none=False) # API request json_data = self._session.get('team/memberships/' + membershipId) # Return a TeamMembership object created from the response JSON data return TeamMembership(json_data)
def _exec_adb_cmd(self, name, args, shell, timeout): if shell: # Add quotes around "adb" in case the ADB path contains spaces. This # is pretty common on Windows (e.g. Program Files). if self.serial: adb_cmd = '"%s" -s "%s" %s %s' % (ADB, self.serial, name, args) else: adb_cmd = '"%s" %s %s' % (ADB, name, args) else: adb_cmd = [ADB] if self.serial: adb_cmd.extend(['-s', self.serial]) adb_cmd.append(name) if args: if isinstance(args, basestring): adb_cmd.append(args) else: adb_cmd.extend(args) return self._exec_cmd(adb_cmd, shell=shell, timeout=timeout)
def _set_text_property(self, name, value, allow_utf8=True): atom = xlib.XInternAtom(self._x_display, asbytes(name), False) if not atom: raise XlibException('Undefined atom "%s"' % name) assert isinstance(value, basestring) property = xlib.XTextProperty() if _have_utf8 and allow_utf8: buf = create_string_buffer(value.encode('utf8')) result = xlib.Xutf8TextListToTextProperty(self._x_display, cast(pointer(buf), c_char_p), 1, xlib.XUTF8StringStyle, byref(property)) if result < 0: raise XlibException('Could not create UTF8 text property') else: buf = create_string_buffer(value.encode('ascii', 'ignore')) result = xlib.XStringListToTextProperty( cast(pointer(buf), c_char_p), 1, byref(property)) if result < 0: raise XlibException('Could not create text property') xlib.XSetTextProperty(self._x_display, self._window, byref(property), atom) # XXX <rj> Xlib doesn't like us freeing this #xlib.XFree(property.value)
def add_file(font): """Add a font to pyglet's search path. In order to load a font that is not installed on the system, you must call this method to tell pyglet that it exists. You can supply either a filename or any file-like object. The font format is platform-dependent, but is typically a TrueType font file containing a single font face. Note that to use a font added with this method, you should pass the face name (not the file name) to :meth:`pyglet.font.load` or any other place where you normally specify a font. :Parameters: `font` : str or file Filename or file-like object to load fonts from. """ if isinstance(font, basestring): font = open(font, 'rb') if hasattr(font, 'read'): font = font.read() _font_class.add_font_data(font)
def __add__(self, move): if isinstance(move, (str, basestring)): return self + Move(move) elif move is None: return self elif isinstance(move, Move): if self.face != move.face: raise ValueError("Only same faces can be added") if self.clockwise and move.counterclockwise: return None if self.double and move.double: return None offset = ( (self.clockwise + (self.double * 2) + (self.counterclockwise * 3)) + (move.clockwise + (move.double * 2) + (move.counterclockwise * 3)) ) % 4 if offset == 0: return None return Move(self.face + [None, "", "2", "'"][offset]) else: raise ValueError("Unable to add %s and %s" %(self.raw, str(move)))
def __init__(self, size=3, init=None, check=True): self.size = size if init: init = init.replace(' ', '') if check and not isinstance(init, (str, basestring)): raise ValueError("Init configuration must be a string") if check and int(math.sqrt(len(init))) != math.sqrt(len(init)): raise ValueError( "Init configuration length must be a power of 2") self.size = int(math.sqrt(self.size)) self.squares = init else: self.squares = '.' * (self.size * self.size)
def _check_valid_cube(cube): '''Checks if cube is one of str, NaiveCube or Cubie.Cube and returns an instance of Cubie.Cube''' if isinstance(cube, basestring): c = NaiveCube() c.set_cube(cube) cube = c if isinstance(cube, NaiveCube): c = Cube() c.from_naive_cube(cube) cube = c if not isinstance(cube, Cube): raise ValueError('Cube is not one of (str, NaiveCube or Cubie.Cube)') return cube
def solve(cube, method = Beginner.BeginnerSolver, *args, **kwargs): if isinstance(method, basestring): if not method in METHODS: raise ValueError('Invalid method name, must be one of (%s)' % ', '.join(METHODS.keys()) ) method = METHODS[method] if not issubclass(method, Solver): raise ValueError('Method %s is not a valid Solver subclass' % method.__class__.__name__ ) cube = _check_valid_cube(cube) solver = method(cube) return solver.solution(*args, **kwargs)
def _preprocess(argument): """Receives the argument (from the constructor), and normalizes it into a list of Pattern objects.""" pattern_set = PatternSet() if argument is not None: if isinstance(argument, basestring): argument = [argument] for glob in argument: if isinstance(glob, basestring): patterns = Pattern.create(glob) pattern_set.extend(patterns) elif isinstance(glob, Pattern): pattern_set.append(glob) return pattern_set
def is_number(s): if isinstance(s, list) and not isinstance(s, basestring): try: for x in s: if isinstance(x, basestring) and ' ' in x: raise ValueError [float(x) for x in s] return True except ValueError: return False else: try: if isinstance(s, basestring) and ' ' in s: raise ValueError float(s) return True except ValueError: return False
def _get_task_priority(tasks, task_priority): """Get the task `priority` corresponding to the given `task_priority`. If `task_priority` is an integer or 'None', return it. If `task_priority` is a str, return the priority of the task it matches. Otherwise, raise `ValueError`. """ if task_priority is None: return None if is_integer(task_priority): return task_priority if isinstance(task_priority, basestring): if task_priority in tasks: return tasks[task_priority].priority raise ValueError("Unrecognized task priority '{}'".format(task_priority))
def retrieve_file(self, remote_full_path, local_full_path_or_buffer): conn = self.get_conn() is_path = isinstance(local_full_path_or_buffer, basestring) if is_path: output_handle = open(local_full_path_or_buffer, 'wb') else: output_handle = local_full_path_or_buffer logging.info('Retrieving file from FTP: {}'.format(remote_full_path)) conn.getfo(remote_full_path, output_handle) logging.info('Finished retrieving file from FTP: {}'.format( remote_full_path)) if is_path: output_handle.close()
def _multi_dataurl_op(self, urls, ops, model=None, local_ids=None, meta=None, payload=None, **kwargs): """ If sending image_url or image_file strings, then we can send as json directly instead of the multipart form. """ if urls is not None: # for feedback, this might not be required. if not isinstance(urls, list): urls = [urls] self._check_batch_size(urls) if not isinstance(urls[0], basestring): raise Exception("urls must be strings") data = self._setup_multi_data(ops, len(urls), model, local_ids, meta, **kwargs) # Add some addition url specific stuff to data dict: if urls is not None: data['url'] = urls if payload: assert isinstance(payload, dict), "Addition payload must be a dict" for (k, v) in iteritems(payload): data[k] = v url = self._url_for_op(ops) kwargs = {'data': data} raw_response = self._get_raw_response( self._get_json_headers, self._get_json_response, url, kwargs) return self._parse_response(raw_response)
def _compare_type(expected, result): if isinstance(expected, basestring) and isinstance(result, basestring): return True if isinstance(expected, bool) and isinstance(result, bool): return True # bool is instance of int. return False if one type is a boolean elif isinstance(expected, bool) != isinstance(result, bool): return False if isinstance(expected, (int, float)) and isinstance(result, (int, float)): return True if isinstance(expected, list) and isinstance(result, list): return True if isinstance(expected, dict) and isinstance(result, dict): return True if isinstance(expected, tuple) and isinstance(result, type): return True return False
def __init__( self, partition_names, metastore_conn_id='metastore_default', poke_interval=60*3, *args, **kwargs): super(NamedHivePartitionSensor, self).__init__( poke_interval=poke_interval, *args, **kwargs) if isinstance(partition_names, basestring): raise TypeError('partition_names must be an array of strings') self.metastore_conn_id = metastore_conn_id self.partition_names = partition_names self.next_poke_idx = 0
def _get_json_to_kwargs(self, json_data): """ Augments json data before passing to the handler script. Prefixes all keys with cbot_ value to avoid clashes + serializes itself to JSON - for JSON parsing stuff. :param json_data: :return: """ n_data = OrderedDict() for k in json_data: val = json_data[k] if k == 'command': continue if isinstance(val, float): val = str(math.ceil(val)) if not isinstance(val, (str, basestring)): val = str(val) if val is not None: n_data[k] = val n_data['cbot_' + k] = val n_data['cbot_json'] = self._json_dumps(json_data) return n_data
def test_until_next_flags(self): for name, (flag, builder) in lower_until_next_flags.items(): if builder is None: continue document = self.parse(r"\p \{} hello \p world".format(flag)) elements = document.elements self.assertEqual(len(elements), 2) self.assertIsInstance(elements[0], Paragraph) self.assertEqual(len(elements[0].children), 1) child = elements[0].children[0] self.assertIsInstance(child, FormattedText) self.assertEqual(len(child.children), 1) text = child.children[0] self.assertIsInstance(text, Text) self.assertIsInstance(text.content, basestring) self.assertIsInstance(elements[1], Paragraph)
def has_user_group(self, user_group_name): """Checks if a user group exists in the commcell with the input user group name. Args: user_group_name (str) -- name of the user group Returns: bool - boolean output whether the user group exists in the commcell or not Raises: SDKException: if type of the user group name argument is not string """ if not isinstance(user_group_name, basestring): raise SDKException('UserGroup', '101') return self._user_groups and user_group_name.lower() in self._user_groups
def has_client(self, client_name): """Checks if a client exists in the commcell with the input client name. Args: client_name (str) -- name of the client Returns: bool - boolean output whether the client exists in the commcell or not Raises: SDKException: if type of the client name argument is not string """ if not isinstance(client_name, basestring): raise SDKException('Client', '101') return self._clients and client_name.lower() in self._clients