我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用builtins._。
def init_pprint(self): """Activates pretty-printing of output values. """ keys_re = re.compile(r'([\'\("]+(.*?[\'\)"]: ))+?') color_dict = partial(keys_re.sub, lambda m: purple(m.group())) format_func = pprint.pformat if sys.version_info.major >= 3 and sys.version_info.minor > 3: format_func = partial(pprint.pformat, compact=True) def pprint_callback(value): if value is not None: try: rows, cols = os.get_teminal_size() except AttributeError: try: rows, cols = map(int, subprocess.check_output(['stty', 'size']).split()) except: cols = 80 builtins._ = value formatted = format_func(value, width=cols) print(color_dict(formatted) if issubclass(type(value), dict) else blue(formatted)) sys.displayhook = pprint_callback
def test_original_displayhook(self): import builtins out = io.StringIO() sys.stdout = out dh = sys.__displayhook__ self.assertRaises(TypeError, dh) if hasattr(builtins, "_"): del builtins._ dh(None) self.assertEqual(out.getvalue(), "") self.assertTrue(not hasattr(builtins, "_")) dh(42) self.assertEqual(out.getvalue(), "42\n") self.assertEqual(builtins._, 42) del sys.stdout self.assertRaises(RuntimeError, dh, 42)
def _test(): testfiles = [arg for arg in sys.argv[1:] if arg and arg[0] != '-'] if not testfiles: name = os.path.basename(sys.argv[0]) if '__loader__' in globals(): # python -m name, _ = os.path.splitext(name) print("usage: {0} [-v] file ...".format(name)) return 2 for filename in testfiles: if filename.endswith(".py"): # It is a module -- insert its dir into sys.path and try to # import it. If it is part of a package, that possibly # won't work because of package imports. dirname, filename = os.path.split(filename) sys.path.insert(0, dirname) m = __import__(filename[:-3]) del sys.path[0] failures, _ = testmod(m) else: failures, _ = testfile(filename, module_relative=False) if failures: return 1 return 0
def displayhook(value): """Override standard display hook to use non-locale encoding""" if value is None: return # Set '_' to None to avoid recursion builtins._ = None text = repr(value) try: sys.stdout.write(text) except UnicodeEncodeError: # let's use ascii while utf8-bmp codec doesn't present encoding = 'ascii' bytes = text.encode(encoding, 'backslashreplace') text = bytes.decode(encoding, 'strict') sys.stdout.write(text) sys.stdout.write("\n") builtins._ = value
def _verify_pre_check(filepath): """Check student code for certain issues.""" # Make sure the program doesn't crash for students. # Could use some improvement for better logging and error reporting. try: # Check for inline "pylint:" comment, which may indicate a student # trying to disable a check. with tokenize.open(os.path.expanduser(filepath)) as f: for tok_type, content, _, _, _ in tokenize.generate_tokens(f.readline): if tok_type != tokenize.COMMENT: continue match = pylint.utils.OPTION_RGX.search(content) if match is not None: print('ERROR: string "pylint:" found in comment. ' + 'No check run on file `{}`\n'.format(filepath)) return False except IndentationError as e: print('ERROR: python_ta could not check your code due to an ' + 'indentation error at line {}'.format(e.lineno)) return False except tokenize.TokenError as e: print('ERROR: python_ta could not check your code due to a ' + 'syntax error in your file') return False return True
def init_prompt(self): """Activates color on the prompt based on python version. Also adds the hosts IP if running on a remote host over a ssh connection. """ prompt_color = green if sys.version_info.major == 2 else yellow sys.ps1 = prompt_color('>>> ', readline_workaround=True) sys.ps2 = red('... ', readline_workaround=True) # - if we are over a remote connection, modify the ps1 if os.getenv('SSH_CONNECTION'): _, _, this_host, _ = os.getenv('SSH_CONNECTION').split() sys.ps1 = prompt_color('[{}]>>> '.format(this_host), readline_workaround=True) sys.ps2 = red('[{}]... '.format(this_host), readline_workaround=True)
def improved_rlcompleter(self): """Enhances the default rlcompleter The function enhances the default rlcompleter by also doing pathname completion and module name completion for import statements. Additionally, it inserts a tab instead of attempting completion if there is no preceding text. """ completer = rlcompleter.Completer(namespace=self.locals) # - remove / from the delimiters to help identify possibility for path completion readline.set_completer_delims(readline.get_completer_delims().replace('/', '')) modlist = frozenset(name for _, name, _ in pkgutil.iter_modules()) def complete_wrapper(text, state): line = readline.get_line_buffer().strip() if line == '': return None if state > 0 else self.tab if state == 0: if line.startswith(('import', 'from')): completer.matches = [name for name in modlist if name.startswith(text)] else: match = completer.complete(text, state) if match is None and '/' in text: completer.matches = glob.glob(text+'*') try: match = completer.matches[state] return '{}{}'.format(match, ' ' if keyword.iskeyword(match) else '') except IndexError: return None return complete_wrapper
def id(self): return '_'.join(self._dt_test.name.split('.'))
def _test(): parser = argparse.ArgumentParser(description="doctest runner") parser.add_argument('-v', '--verbose', action='store_true', default=False, help='print very verbose output for all tests') parser.add_argument('-o', '--option', action='append', choices=OPTIONFLAGS_BY_NAME.keys(), default=[], help=('specify a doctest option flag to apply' ' to the test run; may be specified more' ' than once to apply multiple options')) parser.add_argument('-f', '--fail-fast', action='store_true', help=('stop running tests after first failure (this' ' is a shorthand for -o FAIL_FAST, and is' ' in addition to any other -o options)')) parser.add_argument('file', nargs='+', help='file containing the tests to run') args = parser.parse_args() testfiles = args.file # Verbose used to be handled by the "inspect argv" magic in DocTestRunner, # but since we are using argparse we are passing it manually now. verbose = args.verbose options = 0 for option in args.option: options |= OPTIONFLAGS_BY_NAME[option] if args.fail_fast: options |= FAIL_FAST for filename in testfiles: if filename.endswith(".py"): # It is a module -- insert its dir into sys.path and try to # import it. If it is part of a package, that possibly # won't work because of package imports. dirname, filename = os.path.split(filename) sys.path.insert(0, dirname) m = __import__(filename[:-3]) del sys.path[0] failures, _ = testmod(m, verbose=verbose, optionflags=options) else: failures, _ = testfile(filename, module_relative=False, verbose=verbose, optionflags=options) if failures: return 1 return 0
def testVerify(spec, pub, priv, closed, proxy): """ Runs a single test case against verify.verifyDEP() and returns the result and potentially an error message. In addition to the elements understood by run_test.runTest(), this function also understands the "expectedException" element, which indicates the name of the exception the verifyDEP() function is expected to throw, and the "exceptionReceipt" element, which indicates the receipt at which an exception is expected to occur. If "exceptionReceipt" is omitted, the expected exception may occur anywhere in the generated DEP. If "expectedException" is omitted, verifyDEP() must not throw any exception. :param spec: The test case specification as a dict structure. :param pub: The public key or certificate. For a closed system a public key must be used, for an open system a certificate must be used. :param priv: The private key used to sign the generated receipts. :param closed: Indicates whether the system is a closed system (True) or an open system (False). :param proxy: An object implementing RKSVVerificationProxyI. This will be used to do the actual verification. :return: A TestVerifyResult indicating the result of the test and an error message. If the result is OK, the message is None. """ try: keymat = [(pub, priv)] * spec['numberOfSignatureDevices'] deps, cc = run_test.runTest(spec, keymat, closed) except Exception as e: return TestVerifyResult.ERROR, e rN, mN = _testVerify(spec, deps, cc, False, proxy) rP, mP = _testVerify(spec, deps, cc, True, proxy) if rN == rP and str(mN) == str(mP): return rN, mN r = TestVerifyResult.FAIL if rN == TestVerifyResult.ERROR or rP == TestVerifyResult.ERROR: r = TestVerifyResult.ERROR return r, Exception( _('Result mismatch: without parsing {}:>{}<, with parsing {}:>{}<').format( rN.name, mN, rP.name, mP))
def testVerifyMulti(specs, groupLabel, crt, pub, priv, tcDefaultSize, proxy): """ Runs all the given test cases against verify.verifyDEP. In addition to the elements understood by TestVerify(), this function also understands the "closedSystem" element in the root dictionary, which indicates whether the system is a closed system (True) or an open system (False). This function is a generator to facilitate more responsive output when used with many test cases. :param specs: A list or generator with test specifications as dict structures. :param groupLabel: A label to indicate which group the tests belong to as a string. :param crt: The certificate used to sign the generated receipts if an open system is used. :param pub: The public key used to sign the generated receipts if a closed system is used. :param priv: The private key belonging to the given certificate and public key. :param tcDefaultSize: The turnover counter size in bytes to use if no size is given in the test specification. :param proxy: An object implementing RKSVVerificationProxyI. This will be used to do the actual verification. :yield: A tuple containing (in order) the test cases name, the group label, a boolean indicating whether the system is a closed (True) or an open (False) system, the used turnover counter size in bytes, the result of the test as a TestVerifyResult and the generated error message or None if no error occurred. """ for s in specs: label = s.get('simulationRunLabel', 'Unknown') tc_size = s.get('turnoverCounterSize', tcDefaultSize) closed = s.get('closedSystem', False) if label == 'Unknown': result = TestVerifyResult.ERROR msg = _('No run label') else: pc = pub if closed else crt result, msg = testVerify(s, pc, priv, closed, proxy) yield (label, groupLabel, closed, tc_size, result, msg)
def printTestVerifySummary(results): nFails = sum(r[4] == TestVerifyResult.FAIL for r in results) nErrors = sum(r[4] == TestVerifyResult.ERROR for r in results) print(_('{} tests run, {} failed, {} errors').format(len(results), nFails, nErrors))
def verify(self, fd, keyStore, aesKey, inState, registerIdx, chunksize): # Save the _() function. trvec = ( __builtin__._ , depparser._, key_store._, receipt._, verification_state._, verify._, verify_receipt._, ) # Temporarily disable translations to make sure error # messages match. ( __builtin__._ , depparser._, key_store._, receipt._, verification_state._, verify._, verify_receipt._, ) = [lambda x: x] * len(trvec) try: parser = depparser.IncrementalDEPParser.fromFd(fd, True) outState = verify.verifyParsedDEP(parser, keyStore, aesKey, inState, registerIdx, self.pool, self.nprocs, chunksize) finally: ( __builtin__._ , depparser._, key_store._, receipt._, verification_state._, verify._, verify_receipt._, ) = trvec return outState
def __call__(self, obj): """Be callable, catch objects that should be displayed. @param obj: object to be displayed """ if obj is None: return builtins._ = obj sys.stdout.flush() sys.stderr.flush() if self.result is not None: self.result.result.append(repr(obj))
def _pprint_displayhook(value): if value is not None: builtins._ = value pprint(value)
def get_file_paths(rel_path): """A generator for iterating python files within a directory. `rel_path` is a relative path to a file or directory. Returns paths to all files in a directory. """ if not os.path.isdir(rel_path): yield rel_path # Don't do anything; return the file name. else: for root, _, files in os.walk(rel_path): for filename in (f for f in files if f.endswith('.py')): yield os.path.join(root, filename) # Format path, from root.
def process_edit_cmd(self, arg=''): """{EDIT_CMD} [object|filename] Open {EDITOR} with session history, provided filename or object's source file. - without arguments, a temporary file containing session history is created and opened in {EDITOR}. On quitting the editor, all the non commented lines in the file are executed. - with a filename argument, the file is opened in the editor. On close, you are returned bay to the interpreter. - with an object name argument, an attempt is made to lookup the source file of the object and it is opened if found. Else the argument is treated as a filename. """ line_num_opt = '' if arg: obj = self.lookup(arg) try: if obj: filename = inspect.getsourcefile(obj) _, line_no = inspect.getsourcelines(obj) line_num_opt = config['LINE_NUM_OPT'].format(line_no=line_no) else: filename = arg except (IOError, TypeError, NameError) as e: return self.writeline(e) else: # - make a list of all lines in session history, commenting # any non-blank lines. filename = self._mktemp_buffer("# {}".format(line) if line else '' for line in (line.strip('\n') for line in self.session_history)) # - shell out to the editor os.system('{} {} {}'.format(config['EDITOR'], line_num_opt, filename)) # - if arg was not provided (we edited session history), execute # it in the current namespace if not arg: self._exec_from_file(filename) os.unlink(filename)
def process_sh_cmd(self, cmd): """{SH_EXEC} [cmd [args ...] | {{fmt string}}] Escape to {SHELL} or execute `cmd` in {SHELL} - without arguments, the current interpreter will be suspended and you will be dropped in a {SHELL} prompt. Use fg to return. - with arguments, the text will be executed in {SHELL} and the output/error will be displayed. Additionally '_' will contain a named tuple with the (<stdout>, <stderror>, <return_code>) for the execution of the command. You may pass strings from the global namespace to the command line using the `.format()` syntax. for example: >>> filename = '/does/not/exist' >>> !ls {{filename}} ls: cannot access /does/not/exist: No such file or directory >>> _ CmdExec(out='', err='ls: cannot access /does/not/exist: No such file or directory\n', rc=2) """ if cmd: try: cmd = cmd.format(**self.locals) cmd = shlex.split(cmd) if cmd[0] == 'cd': os.chdir(os.path.expanduser(os.path.expandvars(' '.join(cmd[1:]) or '${HOME}'))) else: cmd_exec = namedtuple('CmdExec', ['out', 'err', 'rc']) process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = process.communicate() rc = process.returncode print (red(err.decode('utf-8')) if err else green(out.decode('utf-8'), bold=False)) builtins._ = cmd_exec(out, err, rc) del cmd_exec except: self.showtraceback() else: if os.getenv('SSH_CONNECTION'): # I use the bash function similar to the one below in my # .bashrc to directly open a python prompt on remote # systems I log on to. # function rpython { ssh -t $1 -- "python" } # Unfortunately, suspending this ssh session, does not place me # in a shell, so I need to create one: os.system(config['SHELL']) else: os.kill(os.getpid(), signal.SIGSTOP)