我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用past.builtins.unicode()。
def test_generating_cubefile_works(wfn_viewer): wfn_viewer.numpoints = 64 grid, values = wfn_viewer._calc_orb_grid(wfn_viewer.mol.wfn.orbitals.canonical[1]) cb = wfn_viewer._grid_to_cube(grid, values) assert isinstance(cb, unicode)
def get_conn(self): def _str(s): # cloudant-python doesn't support unicode. if isinstance(s, unicode): log = LoggingMixin().log log.debug( 'cloudant-python does not support unicode. Encoding %s as ascii using "ignore".', s ) return s.encode('ascii', 'ignore') return s conn = self.get_connection(self.cloudant_conn_id) for conn_param in ['host', 'password', 'schema']: if not hasattr(conn, conn_param) or not getattr(conn, conn_param): raise AirflowException( 'missing connection parameter {0}'.format(conn_param) ) # In the connection form: # - 'host' is renamed to 'Account' # - 'login' is renamed 'Username (or API Key)' # - 'schema' is renamed to 'Database' # # So, use the 'host' attribute as the account name, and, if login is # defined, use that as the username. account = cloudant.Account(_str(conn.host)) username = _str(conn.login or conn.host) account.login( username, _str(conn.password)).raise_for_status() return account.database(_str(conn.schema))
def log(self, session=None): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') dttm = pendulum.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) dag = dagbag.get_dag(dag_id) ti = session.query(models.TaskInstance).filter( models.TaskInstance.dag_id == dag_id, models.TaskInstance.task_id == task_id, models.TaskInstance.execution_date == dttm).first() if ti is None: logs = ["*** Task instance did not exist in the DB\n"] else: logger = logging.getLogger('airflow.task') task_log_reader = conf.get('core', 'task_log_reader') handler = next((handler for handler in logger.handlers if handler.name == task_log_reader), None) try: ti.task = dag.get_task(ti.task_id) logs = handler.read(ti) except AttributeError as e: logs = ["Task log handler {} does not support read logs.\n{}\n" \ .format(task_log_reader, str(e))] for i, log in enumerate(logs): if PY2 and not isinstance(log, unicode): logs[i] = log.decode('utf-8') return self.render( 'airflow/ti_log.html', logs=logs, dag=dag, title="Log by attempts", task_id=task_id, execution_date=execution_date, form=form)
def _can_handle_key(self, k): return isinstance(k, (str, unicode))
def _can_handle_val(self, v): if isinstance(v, (str, unicode)): return True elif isinstance(v, bool): return True elif isinstance(v, (int, long)): return True elif isinstance(v, float): return True return False
def _handle_attr(self, layer, feature, props): for k, v in props.items(): if self._can_handle_attr(k, v): if not PY3 and isinstance(k, str): k = k.decode('utf-8') if k not in self.seen_keys_idx: layer.keys.append(k) self.seen_keys_idx[k] = self.key_idx self.key_idx += 1 feature.tags.append(self.seen_keys_idx[k]) if v not in self.seen_values_idx: self.seen_values_idx[v] = self.val_idx self.val_idx += 1 val = layer.values.add() if isinstance(v, bool): val.bool_value = v elif isinstance(v, str): if PY3: val.string_value = v else: val.string_value = unicode(v, 'utf-8') elif isinstance(v, unicode): val.string_value = v elif isinstance(v, (int, long)): val.int_value = v elif isinstance(v, float): val.double_value = v feature.tags.append(self.seen_values_idx[v])
def test_source_coding_utf8(self): """ Tests to ensure that the source coding line is not corrupted or removed. It must be left as the first line in the file (including before any __future__ imports). Also tests whether the unicode characters in this encoding are parsed correctly and left alone. """ code = """ # -*- coding: utf-8 -*- icons = [u"?", u"?", u"?", u"?"] """
def test_literal_prefixes_are_not_stripped(self): """ Tests to ensure that the u'' and b'' prefixes on unicode strings and byte strings are not removed by the futurize script. Removing the prefixes on Py3.3+ is unnecessary and loses some information -- namely, that the strings have explicitly been marked as unicode or bytes, rather than just e.g. a guess by some automated tool about what they are. """ code = ''' s = u'unicode string' b = b'byte string' ''' self.unchanged(code)
def test_Py2_StringIO_module(self): """ This requires that the argument to io.StringIO be made a unicode string explicitly if we're not using unicode_literals: Ideally, there would be a fixer for this. For now: TODO: add the Py3 equivalent for this to the docs. Also add back a test for the unicode_literals case. """ before = """ import cStringIO import StringIO s1 = cStringIO.StringIO('my string') s2 = StringIO.StringIO('my other string') assert isinstance(s1, cStringIO.InputType) """ # There is no io.InputType in Python 3. futurize should change this to # something like this. But note that the input to io.StringIO # must be a unicode string on both Py2 and Py3. after = """ import io import io s1 = io.StringIO(u'my string') s2 = io.StringIO(u'my other string') assert isinstance(s1, io.StringIO) """ self.convert_check(before, after)
def test_open(self): """ In conservative mode, futurize would not import io.open because this changes the default return type from bytes to text. """ before = """ filename = 'temp_file_open.test' contents = 'Temporary file contents. Delete me.' with open(filename, 'w') as f: f.write(contents) with open(filename, 'r') as f: data = f.read() assert isinstance(data, str) assert data == contents """ after = """ from past.builtins import open, str as oldbytes, unicode filename = oldbytes(b'temp_file_open.test') contents = oldbytes(b'Temporary file contents. Delete me.') with open(filename, oldbytes(b'w')) as f: f.write(contents) with open(filename, oldbytes(b'r')) as f: data = f.read() assert isinstance(data, oldbytes) assert data == contents assert isinstance(oldbytes(b'hello'), basestring) assert isinstance(unicode(u'hello'), basestring) assert isinstance(oldbytes(b'hello'), basestring) """ self.convert_check(before, after, conservative=True)
def test_import_builtin_types(self): code = """ s1 = 'abcd' s2 = u'abcd' b1 = b'abcd' b2 = s2.encode('utf-8') d1 = {} d2 = dict((i, i**2) for i in range(10)) i1 = 1923482349324234L i2 = 1923482349324234 """ module = self.write_and_import(code, 'test_builtin_types') self.assertTrue(isinstance(module.s1, oldstr)) self.assertTrue(isinstance(module.s2, unicode)) self.assertTrue(isinstance(module.b1, oldstr))
def get_conn(self): def _str(s): # cloudant-python doesn't support unicode. if isinstance(s, unicode): logging.debug(('cloudant-python does not support unicode. ' 'Encoding %s as ascii using "ignore".'), s) return s.encode('ascii', 'ignore') return s conn = self.get_connection(self.cloudant_conn_id) for conn_param in ['host', 'password', 'schema']: if not hasattr(conn, conn_param) or not getattr(conn, conn_param): raise AirflowException( 'missing connection parameter {0}'.format(conn_param) ) # In the connection form: # - 'host' is renamed to 'Account' # - 'login' is renamed 'Username (or API Key)' # - 'schema' is renamed to 'Database' # # So, use the 'host' attribute as the account name, and, if login is # defined, use that as the username. account = cloudant.Account(_str(conn.host)) username = _str(conn.login or conn.host) account.login( username, _str(conn.password)).raise_for_status() return account.database(_str(conn.schema))
def test_nestedInteractions(self, values): """ Nested interactions operate independently of parent interactions. :param values: a two-tuple composed of: - a recursive list of unicode and other recursive lists - list start means begin interaction, string means node resolve, list end means finish interaction. - list of False/True; True means failed interaction """ requested_interactions, failures = values failures = iter(failures) assume(not isinstance(requested_interactions, unicode)) self.init() ws_actor = self.connector.expectSocket() self.connector.connect(ws_actor) failures = iter(failures) created_services = {} expected_success_nodes = Counter() expected_failed_nodes = Counter() def run_interaction(children): should_fail = next(failures) failed = [] succeeded = [] self.session.start_interaction() for child in children: if isinstance(child, unicode): # Make sure disco knows about the node: if child in created_services: node = created_services[child] else: node = create_node(child, child) created_services[child] = node self.disco.onMessage(None, NodeActive(node)) # Make sure the child Node is resolved in the interaction self.session.resolve(node.service, "1.0") if should_fail: expected_failed_nodes[node] += 1 failed.append(node) else: expected_success_nodes[node] += 1 succeeded.append(node) else: run_interaction(child) if should_fail: self.session.fail_interaction("OHNO") self.session.finish_interaction() self.connector.advance_time(5.0) # Make sure interaction is sent ws_actor.swallowLogMessages() self.connector.expectInteraction( self, ws_actor, self.session, failed, succeeded) run_interaction(requested_interactions) for node in set(expected_failed_nodes) | set(expected_success_nodes): policy = self.disco.failurePolicy(node) self.assertEqual((policy.successes, policy.failures), (expected_success_nodes[node], expected_failed_nodes[node]))