Python past.builtins 模块,unicode() 实例源码


项目:notebook-molecular-visualization    作者:Autodesk    | 项目源码 | 文件源码
def test_generating_cubefile_works(wfn_viewer):
    wfn_viewer.numpoints = 64
    grid, values = wfn_viewer._calc_orb_grid(wfn_viewer.mol.wfn.orbitals.canonical[1])
    cb = wfn_viewer._grid_to_cube(grid, values)
    assert isinstance(cb, unicode)
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def get_conn(self):
        def _str(s):
            # cloudant-python doesn't support unicode.
            if isinstance(s, unicode):
                log = LoggingMixin().log
                    'cloudant-python does not support unicode. Encoding %s as ascii using "ignore".',
                return s.encode('ascii', 'ignore')

            return s

        conn = self.get_connection(self.cloudant_conn_id)

        for conn_param in ['host', 'password', 'schema']:
            if not hasattr(conn, conn_param) or not getattr(conn, conn_param):
                raise AirflowException(
                    'missing connection parameter {0}'.format(conn_param)

        # In the connection form:
        # - 'host' is renamed to 'Account'
        # - 'login' is renamed 'Username (or API Key)'
        # - 'schema' is renamed to 'Database'
        # So, use the 'host' attribute as the account name, and, if login is
        # defined, use that as the username.
        account = cloudant.Account(_str(

        username = _str(conn.login or


        return account.database(_str(conn.schema))
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def log(self, session=None):
        dag_id = request.args.get('dag_id')
        task_id = request.args.get('task_id')
        execution_date = request.args.get('execution_date')
        dttm = pendulum.parse(execution_date)
        form = DateTimeForm(data={'execution_date': dttm})
        dag = dagbag.get_dag(dag_id)
        ti = session.query(models.TaskInstance).filter(
            models.TaskInstance.dag_id == dag_id,
            models.TaskInstance.task_id == task_id,
            models.TaskInstance.execution_date == dttm).first()
        if ti is None:
            logs = ["*** Task instance did not exist in the DB\n"]
            logger = logging.getLogger('airflow.task')
            task_log_reader = conf.get('core', 'task_log_reader')
            handler = next((handler for handler in logger.handlers
                            if == task_log_reader), None)
                ti.task = dag.get_task(ti.task_id)
                logs =
            except AttributeError as e:
                logs = ["Task log handler {} does not support read logs.\n{}\n" \
                            .format(task_log_reader, str(e))]

        for i, log in enumerate(logs):
            if PY2 and not isinstance(log, unicode):
                logs[i] = log.decode('utf-8')

        return self.render(
            logs=logs, dag=dag, title="Log by attempts", task_id=task_id,
            execution_date=execution_date, form=form)
项目:Vector-Tiles-Reader-QGIS-Plugin    作者:geometalab    | 项目源码 | 文件源码
def _can_handle_key(self, k):
        return isinstance(k, (str, unicode))
项目:Vector-Tiles-Reader-QGIS-Plugin    作者:geometalab    | 项目源码 | 文件源码
def _can_handle_val(self, v):
        if isinstance(v, (str, unicode)):
            return True
        elif isinstance(v, bool):
            return True
        elif isinstance(v, (int, long)):
            return True
        elif isinstance(v, float):
            return True

        return False
项目:Vector-Tiles-Reader-QGIS-Plugin    作者:geometalab    | 项目源码 | 文件源码
def _handle_attr(self, layer, feature, props):
        for k, v in props.items():
            if self._can_handle_attr(k, v):
                if not PY3 and isinstance(k, str):
                    k = k.decode('utf-8')

                if k not in self.seen_keys_idx:
                    self.seen_keys_idx[k] = self.key_idx
                    self.key_idx += 1


                if v not in self.seen_values_idx:
                    self.seen_values_idx[v] = self.val_idx
                    self.val_idx += 1

                    val = layer.values.add()
                    if isinstance(v, bool):
                        val.bool_value = v
                    elif isinstance(v, str):
                        if PY3:
                            val.string_value = v
                            val.string_value = unicode(v, 'utf-8')
                    elif isinstance(v, unicode):
                        val.string_value = v
                    elif isinstance(v, (int, long)):
                        val.int_value = v
                    elif isinstance(v, float):
                        val.double_value = v

项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_source_coding_utf8(self):
        Tests to ensure that the source coding line is not corrupted or
        removed. It must be left as the first line in the file (including
        before any __future__ imports). Also tests whether the unicode
        characters in this encoding are parsed correctly and left alone.
        code = """
        # -*- coding: utf-8 -*-
        icons = [u"?", u"?", u"?", u"?"]
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_literal_prefixes_are_not_stripped(self):
        Tests to ensure that the u'' and b'' prefixes on unicode strings and
        byte strings are not removed by the futurize script.  Removing the
        prefixes on Py3.3+ is unnecessary and loses some information -- namely,
        that the strings have explicitly been marked as unicode or bytes,
        rather than just e.g. a guess by some automated tool about what they
        code = '''
        s = u'unicode string'
        b = b'byte string'
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_Py2_StringIO_module(self):
        This requires that the argument to io.StringIO be made a
        unicode string explicitly if we're not using unicode_literals:

        Ideally, there would be a fixer for this. For now:

        TODO: add the Py3 equivalent for this to the docs. Also add back
        a test for the unicode_literals case.
        before = """
        import cStringIO
        import StringIO
        s1 = cStringIO.StringIO('my string')
        s2 = StringIO.StringIO('my other string')
        assert isinstance(s1, cStringIO.InputType)

        # There is no io.InputType in Python 3. futurize should change this to
        # something like this. But note that the input to io.StringIO
        # must be a unicode string on both Py2 and Py3.
        after = """
        import io
        import io
        s1 = io.StringIO(u'my string')
        s2 = io.StringIO(u'my other string')
        assert isinstance(s1, io.StringIO)
        self.convert_check(before, after)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_open(self):
        In conservative mode, futurize would not import because
        this changes the default return type from bytes to text.
        before = """
        filename = 'temp_file_open.test'
        contents = 'Temporary file contents. Delete me.'
        with open(filename, 'w') as f:

        with open(filename, 'r') as f:
            data =
        assert isinstance(data, str)
        assert data == contents
        after = """
        from past.builtins import open, str as oldbytes, unicode
        filename = oldbytes(b'temp_file_open.test')
        contents = oldbytes(b'Temporary file contents. Delete me.')
        with open(filename, oldbytes(b'w')) as f:

        with open(filename, oldbytes(b'r')) as f:
            data =
        assert isinstance(data, oldbytes)
        assert data == contents
        assert isinstance(oldbytes(b'hello'), basestring)
        assert isinstance(unicode(u'hello'), basestring)
        assert isinstance(oldbytes(b'hello'), basestring)
        self.convert_check(before, after, conservative=True)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_import_builtin_types(self):
        code = """
        s1 = 'abcd'
        s2 = u'abcd'
        b1 = b'abcd'
        b2 = s2.encode('utf-8')
        d1 = {}
        d2 = dict((i, i**2) for i in range(10))
        i1 = 1923482349324234L
        i2 = 1923482349324234
        module = self.write_and_import(code, 'test_builtin_types')
        self.assertTrue(isinstance(module.s1, oldstr))
        self.assertTrue(isinstance(module.s2, unicode))
        self.assertTrue(isinstance(module.b1, oldstr))
项目:airflow    作者:apache-airflow    | 项目源码 | 文件源码
def get_conn(self):
        def _str(s):
            # cloudant-python doesn't support unicode.
            if isinstance(s, unicode):
                logging.debug(('cloudant-python does not support unicode. '
                               'Encoding %s as ascii using "ignore".'),
                return s.encode('ascii', 'ignore')

            return s

        conn = self.get_connection(self.cloudant_conn_id)

        for conn_param in ['host', 'password', 'schema']:
            if not hasattr(conn, conn_param) or not getattr(conn, conn_param):
                raise AirflowException(
                    'missing connection parameter {0}'.format(conn_param)

        # In the connection form:
        # - 'host' is renamed to 'Account'
        # - 'login' is renamed 'Username (or API Key)'
        # - 'schema' is renamed to 'Database'
        # So, use the 'host' attribute as the account name, and, if login is
        # defined, use that as the username.
        account = cloudant.Account(_str(

        username = _str(conn.login or


        return account.database(_str(conn.schema))
项目:go2mapillary    作者:enricofer    | 项目源码 | 文件源码
def _can_handle_key(self, k):
        return isinstance(k, (str, unicode))
项目:go2mapillary    作者:enricofer    | 项目源码 | 文件源码
def _can_handle_val(self, v):
        if isinstance(v, (str, unicode)):
            return True
        elif isinstance(v, bool):
            return True
        elif isinstance(v, (int, long)):
            return True
        elif isinstance(v, float):
            return True

        return False
项目:go2mapillary    作者:enricofer    | 项目源码 | 文件源码
def _handle_attr(self, layer, feature, props):
        for k, v in props.items():
            if self._can_handle_attr(k, v):
                if not PY3 and isinstance(k, str):
                    k = k.decode('utf-8')

                if k not in self.seen_keys_idx:
                    self.seen_keys_idx[k] = self.key_idx
                    self.key_idx += 1


                if v not in self.seen_values_idx:
                    self.seen_values_idx[v] = self.val_idx
                    self.val_idx += 1

                    val = layer.values.add()
                    if isinstance(v, bool):
                        val.bool_value = v
                    elif isinstance(v, str):
                        if PY3:
                            val.string_value = v
                            val.string_value = unicode(v, 'utf-8')
                    elif isinstance(v, unicode):
                        val.string_value = v
                    elif isinstance(v, (int, long)):
                        val.int_value = v
                    elif isinstance(v, float):
                        val.double_value = v

项目:mdk    作者:datawire    | 项目源码 | 文件源码
def test_nestedInteractions(self, values):
        Nested interactions operate independently of parent interactions.

        :param values: a two-tuple composed of:
           - a recursive list of unicode and other recursive lists - list start
             means begin interaction, string means node resolve, list end means
             finish interaction.
           - list of False/True; True means failed interaction
        requested_interactions, failures = values
        failures = iter(failures)
        assume(not isinstance(requested_interactions, unicode))
        ws_actor = self.connector.expectSocket()

        failures = iter(failures)
        created_services = {}
        expected_success_nodes = Counter()
        expected_failed_nodes = Counter()

        def run_interaction(children):
            should_fail = next(failures)
            failed = []
            succeeded = []
            for child in children:
                if isinstance(child, unicode):
                    # Make sure disco knows about the node:
                    if child in created_services:
                        node = created_services[child]
                        node = create_node(child, child)
                        created_services[child] = node
                    self.disco.onMessage(None, NodeActive(node))
                    # Make sure the child Node is resolved in the interaction
                    self.session.resolve(node.service, "1.0")
                    if should_fail:
                        expected_failed_nodes[node] += 1
                        expected_success_nodes[node] += 1
            if should_fail:
            self.connector.advance_time(5.0) # Make sure interaction is sent
                self, ws_actor, self.session, failed, succeeded)

        for node in set(expected_failed_nodes) | set(expected_success_nodes):
            policy = self.disco.failurePolicy(node)
            self.assertEqual((policy.successes, policy.failures),