我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.PY2。
def pip_execute(*args, **kwargs): """Overriden pip_execute() to stop sys.path being changed. The act of importing main from the pip module seems to cause add wheels from the /usr/share/python-wheels which are installed by various tools. This function ensures that sys.path remains the same after the call is executed. """ try: _path = sys.path try: from pip import main as _pip_execute except ImportError: apt_update() if six.PY2: apt_install('python-pip') else: apt_install('python3-pip') from pip import main as _pip_execute _pip_execute(*args, **kwargs) finally: sys.path = _path
def ns_query(address): try: import dns.resolver except ImportError: if six.PY2: apt_install('python-dnspython', fatal=True) else: apt_install('python3-dnspython', fatal=True) import dns.resolver if isinstance(address, dns.name.Name): rtype = 'PTR' elif isinstance(address, six.string_types): rtype = 'A' else: return None try: answers = dns.resolver.query(address, rtype) except dns.resolver.NXDOMAIN: return None if answers: return str(answers[0]) return None
def test_keymap_new_from_buffer(self): ctx = xkb.Context() if six.PY2: typecode = b'b' else: typecode = 'b' test_data = array.array(typecode, sample_keymap_bytes) km = ctx.keymap_new_from_buffer(test_data) self.assertIsNotNone(km) self.assertEqual(km.load_method, "buffer") test_data.extend([0] * 10) length = len(sample_keymap_bytes) km = ctx.keymap_new_from_buffer(test_data, length=length) self.assertIsNotNone(km) # This class makes use of the details of the sample keymap in # sample_keymap_string.
def main(args=None): args = parse_args(args) if not os.path.exists(args.aws_credentials): print("%s does not exist. Please run 'aws configure' or specify an " "alternate credentials file with --aws-credentials." % args.aws_credentials, file=sys.stderr) return USER_RECOVERABLE_ERROR if PY2: credentials = configparser.ConfigParser() else: credentials = configparser.ConfigParser(default_section=None) credentials.read(args.aws_credentials) err = one_mfa(args, credentials) if err != OK: return err if args.rotate_identity_keys: err = rotate(args, credentials) if err != OK: return err if args.env: print_env_vars(credentials, args.target_profile) return OK
def _execute(self, idx, statement, params): self._exec_depth += 1 try: future = self.session.execute_async(statement, params, timeout=None) args = (future, idx) future.add_callbacks( callback=self._on_success, callback_args=args, errback=self._on_error, errback_args=args) except Exception as exc: # exc_info with fail_fast to preserve stack trace info when raising on the client thread # (matches previous behavior -- not sure why we wouldn't want stack trace in the other case) e = sys.exc_info() if self._fail_fast and six.PY2 else exc # If we're not failing fast and all executions are raising, there is a chance of recursing # here as subsequent requests are attempted. If we hit this threshold, schedule this result/retry # and let the event loop thread return. if self._exec_depth < self.max_error_recursion: self._put_result(e, idx, False) else: self.session.submit(self._put_result, e, idx, False) self._exec_depth -= 1
def apply_parameters(cls, subtypes, names=None): """ Given a set of other CassandraTypes, create a new subtype of this type using them as parameters. This is how composite types are constructed. >>> MapType.apply_parameters([DateType, BooleanType]) <class 'cassandra.cqltypes.MapType(DateType, BooleanType)'> `subtypes` will be a sequence of CassandraTypes. If provided, `names` will be an equally long sequence of column names or Nones. """ if cls.num_subtypes != 'UNKNOWN' and len(subtypes) != cls.num_subtypes: raise ValueError("%s types require %d subtypes (%d given)" % (cls.typename, cls.num_subtypes, len(subtypes))) newname = cls.cass_parameterized_type_with(subtypes) if six.PY2 and isinstance(newname, unicode): newname = newname.encode('utf-8') return type(newname, (cls,), {'subtypes': subtypes, 'cassname': cls.cassname, 'fieldnames': names})
def make_udt_class(cls, keyspace, udt_name, field_names, field_types): assert len(field_names) == len(field_types) if six.PY2 and isinstance(udt_name, unicode): udt_name = udt_name.encode('utf-8') instance = cls._cache.get((keyspace, udt_name)) if not instance or instance.fieldnames != field_names or instance.subtypes != field_types: instance = type(udt_name, (cls,), {'subtypes': field_types, 'cassname': cls.cassname, 'typename': udt_name, 'fieldnames': field_names, 'keyspace': keyspace, 'mapped_class': None, 'tuple_type': cls._make_registered_udt_namedtuple(keyspace, udt_name, field_names)}) cls._cache[(keyspace, udt_name)] = instance return instance
def __init__(self, templates_dir, openstack_release): if not os.path.isdir(templates_dir): log('Could not locate templates dir %s' % templates_dir, level=ERROR) raise OSConfigException self.templates_dir = templates_dir self.openstack_release = openstack_release self.templates = {} self._tmpl_env = None if None in [Environment, ChoiceLoader, FileSystemLoader]: # if this code is running, the object is created pre-install hook. # jinja2 shouldn't get touched until the module is reloaded on next # hook execution, with proper jinja2 bits successfully imported. if six.PY2: apt_install('python-jinja2') else: apt_install('python3-jinja2')
def get(self, request, container, object_name): """Get the object contents. """ obj = api.swift.swift_get_object( request, container, object_name ) # Add the original file extension back on if it wasn't preserved in the # name given to the object. filename = object_name.rsplit(api.swift.FOLDER_DELIMITER)[-1] if not os.path.splitext(obj.name)[1] and obj.orig_name: name, ext = os.path.splitext(obj.orig_name) filename = "%s%s" % (filename, ext) response = StreamingHttpResponse(obj.data) safe = filename.replace(",", "") if six.PY2: safe = safe.encode('utf-8') response['Content-Disposition'] = 'attachment; filename="%s"' % safe response['Content-Type'] = 'application/octet-stream' response['Content-Length'] = obj.bytes return response
def object_download(request, container_name, object_path): try: obj = api.swift.swift_get_object(request, container_name, object_path, resp_chunk_size=swift.CHUNK_SIZE) except Exception: redirect = reverse("horizon:project:containers:index") exceptions.handle(request, _("Unable to retrieve object."), redirect=redirect) # Add the original file extension back on if it wasn't preserved in the # name given to the object. filename = object_path.rsplit(swift.FOLDER_DELIMITER)[-1] if not os.path.splitext(obj.name)[1] and obj.orig_name: name, ext = os.path.splitext(obj.orig_name) filename = "%s%s" % (filename, ext) response = http.StreamingHttpResponse(obj.data) safe_name = filename.replace(",", "") if six.PY2: safe_name = safe_name.encode('utf-8') response['Content-Disposition'] = 'attachment; filename="%s"' % safe_name response['Content-Type'] = 'application/octet-stream' response['Content-Length'] = obj.bytes return response
def _passthrough_interactive_check(self, method_name, mode): # type: (str, str) -> bool """Attempt to call the specified method on the wrapped stream and return the result. If the method is not found on the wrapped stream, returns False. .. note:: Special Case: If wrapped stream is a Python 2 file, inspect the file mode. :param str method_name: Name of method to call :param str mode: Python 2 mode character :rtype: bool """ try: method = getattr(self.__wrapped, method_name) except AttributeError: if six.PY2 and isinstance(self.__wrapped, file): # noqa pylint: disable=undefined-variable if mode in self.__wrapped.mode: return True return False else: return method()
def convert_into_with_meta(self, item, resp): if isinstance(item, six.string_types): if six.PY2 and isinstance(item, six.text_type): return UnicodeWithMeta(item, resp) else: return StrWithMeta(item, resp) elif isinstance(item, six.binary_type): return BytesWithMeta(item, resp) elif isinstance(item, list): return ListWithMeta(item, resp) elif isinstance(item, tuple): return TupleWithMeta(item, resp) elif item is None: return TupleWithMeta((), resp) else: return DictWithMeta(item, resp)
def preprocess(self, x): """Load a single example using this field, tokenizing if necessary. If the input is a Python 2 `str`, it will be converted to Unicode first. If `sequential=True`, it will be tokenized. Then the input will be optionally lowercased and passed to the user-provided `preprocessing` Pipeline.""" if (six.PY2 and isinstance(x, six.string_types) and not isinstance(x, six.text_type)): x = Pipeline(lambda s: six.text_type(s, encoding='utf-8'))(x) if self.sequential and isinstance(x, six.text_type): x = self.tokenize(x.rstrip('\n')) if self.lower: x = Pipeline(six.text_type.lower)(x) if self.preprocessing is not None: return self.preprocessing(x) else: return x
def fromCSV(cls, data, fields): data = data.rstrip("\n") # If Python 2, encode to utf-8 since CSV doesn't take unicode input if six.PY2: data = data.encode('utf-8') # Use Python CSV module to parse the CSV line parsed_csv_lines = csv.reader([data]) # If Python 2, decode back to unicode (the original input format). if six.PY2: for line in parsed_csv_lines: parsed_csv_line = [six.text_type(col, 'utf-8') for col in line] break else: parsed_csv_line = list(parsed_csv_lines)[0] return cls.fromlist(parsed_csv_line, fields)
def _copy_func_details(func, funcopy): funcopy.__name__ = func.__name__ funcopy.__doc__ = func.__doc__ try: funcopy.__text_signature__ = func.__text_signature__ except AttributeError: pass # we explicitly don't copy func.__dict__ into this copy as it would # expose original attributes that should be mocked try: funcopy.__module__ = func.__module__ except AttributeError: pass try: funcopy.__defaults__ = func.__defaults__ except AttributeError: pass try: funcopy.__kwdefaults__ = func.__kwdefaults__ except AttributeError: pass if six.PY2: funcopy.func_defaults = func.func_defaults return
def assert_called_with(_mock_self, *args, **kwargs): """assert that the mock was called with the specified arguments. Raises an AssertionError if the args and keyword args passed in are different to the last call to the mock.""" self = _mock_self if self.call_args is None: expected = self._format_mock_call_signature(args, kwargs) raise AssertionError('Expected call: %s\nNot called' % (expected,)) def _error_message(cause): msg = self._format_mock_failure_message(args, kwargs) if six.PY2 and cause is not None: # Tack on some diagnostics for Python without __cause__ msg = '%s\n%s' % (msg, str(cause)) return msg expected = self._call_matcher((args, kwargs)) actual = self._call_matcher(self.call_args) if expected != actual: cause = expected if isinstance(expected, Exception) else None six.raise_from(AssertionError(_error_message(cause)), cause)
def _format_call_signature(name, args, kwargs): message = '%s(%%s)' % name formatted_args = '' args_string = ', '.join([repr(arg) for arg in args]) def encode_item(item): if six.PY2 and isinstance(item, unicode): return item.encode("utf-8") else: return item kwargs_string = ', '.join([ '%s=%r' % (encode_item(key), value) for key, value in sorted(kwargs.items()) ]) if args_string: formatted_args = args_string if kwargs_string: if formatted_args: formatted_args += ', ' formatted_args += kwargs_string return message % formatted_args
def test_stat_bin(): x = [1, 2, 3] y = [1, 2, 3] df = pd.DataFrame({'x': x, 'y': y}) # About the default bins gg = ggplot(aes(x='x'), df) + stat_bin() if not six.PY2: # Test fails on PY2 when all the tests are run, # but not when only this test module is run with pytest.warns(None) as record: gg.draw_test() res = ('bins' in str(item.message).lower() for item in record) assert any(res) # About the ignoring the y aesthetic gg = ggplot(aes(x='x', y='y'), df) + stat_bin() with pytest.raises(PlotnineError): gg.draw_test()
def test_aesthetics(): p = (ggplot(df) + geom_rug(aes('x', 'y'), size=2) + geom_rug(aes('x+2*n', 'y+2*n', alpha='z'), size=2, sides='tr') + geom_rug(aes('x+4*n', 'y+4*n', linetype='factor(z)'), size=2, sides='t') + geom_rug(aes('x+6*n', 'y+6*n', color='factor(z)'), size=2, sides='b') + geom_rug(aes('x+8*n', 'y+8*n', size='z'), sides='tblr')) if six.PY2: # Small displacement in y-axis text assert p + _theme == ('aesthetics', {'tol': 4}) else: assert p + _theme == 'aesthetics'
def decode_unicode(input): """Decode the unicode of the message, and encode it into utf-8.""" if isinstance(input, dict): temp = {} # If the input data is a dict, create an equivalent dict with a # predictable insertion order to avoid inconsistencies in the # message signature computation for equivalent payloads modulo # ordering for key, value in sorted(six.iteritems(input)): temp[decode_unicode(key)] = decode_unicode(value) return temp elif isinstance(input, (tuple, list)): # When doing a pair of JSON encode/decode operations to the tuple, # the tuple would become list. So we have to generate the value as # list here. return [decode_unicode(element) for element in input] elif six.PY2 and isinstance(input, six.text_type): return input.encode('utf-8') elif six.PY3 and isinstance(input, six.binary_type): return input.decode('utf-8') else: return input
def _some_str(value): try: if PY2: # If there is a working __unicode__, great. # Otherwise see if we can get a bytestring... # Otherwise we fallback to unprintable. try: return unicode(value) except: return "b%s" % repr(str(value)) else: # For Python3, bytestrings don't implicit decode, so its trivial. return str(value) except: return '<unprintable %s object>' % type(value).__name__ # --
def console(self): """starts to interact (starts interactive console) Something like code.InteractiveConsole""" while True: if six.PY2: code = raw_input('>>> ') else: code = input('>>>') try: print(self.eval(code)) except KeyboardInterrupt: break except Exception as e: import traceback if DEBUG: sys.stderr.write(traceback.format_exc()) else: sys.stderr.write('EXCEPTION: '+str(e)+'\n') time.sleep(0.01) #print x
def setUp(self): # Example intentionally not in order example = """ [ACL] foo.baz: ACL foo.acl: ACL [ACL/Rule A] foo.bar: ACL-Rule-A [ACL/Rule B] foo.baz: ACL-Rule-B [DEFAULTS] foo.bar: Default foo.baz: Default [Deep/Subsection/Without/Parents] foo.baz: Deep [ACL/DEFAULTS] foo.bar: ACL-Default foo.baz: ACL-Default """ self.parser = config.ConfigParser() if six.PY2: self.parser.readfp(io.BytesIO(dedent(example))) else: self.parser.read_string(example)
def mixin_meta(item, resp): if isinstance(item, six.string_types): if six.PY2 and isinstance(item, six.text_type): return resource.UnicodeWithMeta(item, resp) else: return resource.StrWithMeta(item, resp) elif isinstance(item, six.binary_type): return resource.BytesWithMeta(item, resp) elif isinstance(item, list): return resource.ListWithMeta(item, resp) elif isinstance(item, tuple): return resource.TupleWithMeta(item, resp) elif item is None: return resource.TupleWithMeta((), resp) else: return resource.DictWithMeta(item, resp)
def to_utf8(s): """Convert a string to utf8. If the argument is an iterable (list/tuple/set), then each element of it would be converted instead. >>> to_utf8('a') 'a' >>> to_utf8(u'a') 'a' >>> to_utf8([u'a', u'b', u'\u4f60']) ['a', 'b', '\\xe4\\xbd\\xa0'] """ if six.PY2: if isinstance(s, str): return s elif isinstance(s, unicode): return s.encode('utf-8') elif isinstance(s, (list, tuple, set)): return [to_utf8(v) for v in s] else: return s else: return s
def _get_home(): """Find user's home directory if possible. Otherwise, returns None. :see: http://mail.python.org/pipermail/python-list/2005-February/325395.html This function is copied from matplotlib version 1.4.3, Jan 2016 """ try: if six.PY2 and sys.platform == 'win32': path = os.path.expanduser(b"~").decode(sys.getfilesystemencoding()) else: path = os.path.expanduser("~") except ImportError: # This happens on Google App Engine (pwd module is not present). pass else: if os.path.isdir(path): return path for evar in ('HOME', 'USERPROFILE', 'TMP'): path = os.environ.get(evar) if path is not None and os.path.isdir(path): return path return None
def test_main_01_from_project(self): """Test the :func:`psyplot.__main__.main` function""" if not six.PY2: with self.assertRaisesRegex(ValueError, 'filename'): main.main(['-o', 'test.pdf']) sp, fname1 = self._create_and_save_test_project() fname2 = tempfile.NamedTemporaryFile( suffix='.pdf', prefix='test_psyplot_').name self._created_files.add(fname2) sp.save_project(fname1, use_rel_paths=False) psy.close('all') if six.PY2: main.main(['-p', fname1, '-o', fname2]) else: with self.assertWarnsRegex(UserWarning, 'ignored'): main.main(['-p', fname1, '-o', fname2, '-n', 't2m']) self.assertTrue(osp.exists(fname2), msg='Missing ' + fname2) self.assertEqual(len(psy.gcp(True)), 4)
def decode_b64jose(data, size=None, minimum=False): """Decode JOSE Base-64 field. :param unicode data: :param int size: Required length (after decoding). :param bool minimum: If ``True``, then `size` will be treated as minimum required length, as opposed to exact equality. :rtype: bytes """ error_cls = TypeError if six.PY2 else binascii.Error try: decoded = b64.b64decode(data.encode()) except error_cls as error: raise errors.DeserializationError(error) if size is not None and ((not minimum and len(decoded) != size) or (minimum and len(decoded) < size)): raise errors.DeserializationError( "Expected at least or exactly {0} bytes".format(size)) return decoded
def decode_hex16(value, size=None, minimum=False): """Decode hexlified field. :param unicode value: :param int size: Required length (after decoding). :param bool minimum: If ``True``, then `size` will be treated as minimum required length, as opposed to exact equality. :rtype: bytes """ value = value.encode() if size is not None and ((not minimum and len(value) != size * 2) or (minimum and len(value) < size * 2)): raise errors.DeserializationError() error_cls = TypeError if six.PY2 else binascii.Error try: return binascii.unhexlify(value) except error_cls as error: raise errors.DeserializationError(error)