我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用__builtin__._。
def init_pprint(self): """Activates pretty-printing of output values. """ keys_re = re.compile(r'([\'\("]+(.*?[\'\)"]: ))+?') color_dict = partial(keys_re.sub, lambda m: purple(m.group())) format_func = pprint.pformat if sys.version_info.major >= 3 and sys.version_info.minor > 3: format_func = partial(pprint.pformat, compact=True) def pprint_callback(value): if value is not None: try: rows, cols = os.get_teminal_size() except AttributeError: try: rows, cols = map(int, subprocess.check_output(['stty', 'size']).split()) except: cols = 80 builtins._ = value formatted = format_func(value, width=cols) print(color_dict(formatted) if issubclass(type(value), dict) else blue(formatted)) sys.displayhook = pprint_callback
def test_original_displayhook(self): import __builtin__ savestdout = sys.stdout out = cStringIO.StringIO() sys.stdout = out dh = sys.__displayhook__ self.assertRaises(TypeError, dh) if hasattr(__builtin__, "_"): del __builtin__._ dh(None) self.assertEqual(out.getvalue(), "") self.assertTrue(not hasattr(__builtin__, "_")) dh(42) self.assertEqual(out.getvalue(), "42\n") self.assertEqual(__builtin__._, 42) del sys.stdout self.assertRaises(RuntimeError, dh, 42) sys.stdout = savestdout
def gettext_translate( s ): """ Thread-safe version of _(). We look up the thread-local translation function. """ return catalogs.translate(s) # Inject the _() function in the builtins. You could also inject a N_() # function for noop translation markers.
def handle_request(): """ Treat a request, with i18n strings. """ # Fetch and return a translated string. # This is the interesting bit, from a client's point-of-view. print _('bli'), _('bla'), _('blo') # Do something else. time.sleep(random.random()) # A thread class. This would be provided by the web framework, # normally.
def init_prompt(self): """Activates color on the prompt based on python version. Also adds the hosts IP if running on a remote host over a ssh connection. """ prompt_color = green if sys.version_info.major == 2 else yellow sys.ps1 = prompt_color('>>> ', readline_workaround=True) sys.ps2 = red('... ', readline_workaround=True) # - if we are over a remote connection, modify the ps1 if os.getenv('SSH_CONNECTION'): _, _, this_host, _ = os.getenv('SSH_CONNECTION').split() sys.ps1 = prompt_color('[{}]>>> '.format(this_host), readline_workaround=True) sys.ps2 = red('[{}]... '.format(this_host), readline_workaround=True)
def improved_rlcompleter(self): """Enhances the default rlcompleter The function enhances the default rlcompleter by also doing pathname completion and module name completion for import statements. Additionally, it inserts a tab instead of attempting completion if there is no preceding text. """ completer = rlcompleter.Completer(namespace=self.locals) # - remove / from the delimiters to help identify possibility for path completion readline.set_completer_delims(readline.get_completer_delims().replace('/', '')) modlist = frozenset(name for _, name, _ in pkgutil.iter_modules()) def complete_wrapper(text, state): line = readline.get_line_buffer().strip() if line == '': return None if state > 0 else self.tab if state == 0: if line.startswith(('import', 'from')): completer.matches = [name for name in modlist if name.startswith(text)] else: match = completer.complete(text, state) if match is None and '/' in text: completer.matches = glob.glob(text+'*') try: match = completer.matches[state] return '{}{}'.format(match, ' ' if keyword.iskeyword(match) else '') except IndexError: return None return complete_wrapper
def testVerify(spec, pub, priv, closed, proxy): """ Runs a single test case against verify.verifyDEP() and returns the result and potentially an error message. In addition to the elements understood by run_test.runTest(), this function also understands the "expectedException" element, which indicates the name of the exception the verifyDEP() function is expected to throw, and the "exceptionReceipt" element, which indicates the receipt at which an exception is expected to occur. If "exceptionReceipt" is omitted, the expected exception may occur anywhere in the generated DEP. If "expectedException" is omitted, verifyDEP() must not throw any exception. :param spec: The test case specification as a dict structure. :param pub: The public key or certificate. For a closed system a public key must be used, for an open system a certificate must be used. :param priv: The private key used to sign the generated receipts. :param closed: Indicates whether the system is a closed system (True) or an open system (False). :param proxy: An object implementing RKSVVerificationProxyI. This will be used to do the actual verification. :return: A TestVerifyResult indicating the result of the test and an error message. If the result is OK, the message is None. """ try: keymat = [(pub, priv)] * spec['numberOfSignatureDevices'] deps, cc = run_test.runTest(spec, keymat, closed) except Exception as e: return TestVerifyResult.ERROR, e rN, mN = _testVerify(spec, deps, cc, False, proxy) rP, mP = _testVerify(spec, deps, cc, True, proxy) if rN == rP and str(mN) == str(mP): return rN, mN r = TestVerifyResult.FAIL if rN == TestVerifyResult.ERROR or rP == TestVerifyResult.ERROR: r = TestVerifyResult.ERROR return r, Exception( _('Result mismatch: without parsing {}:>{}<, with parsing {}:>{}<').format( rN.name, mN, rP.name, mP))
def testVerifyMulti(specs, groupLabel, crt, pub, priv, tcDefaultSize, proxy): """ Runs all the given test cases against verify.verifyDEP. In addition to the elements understood by TestVerify(), this function also understands the "closedSystem" element in the root dictionary, which indicates whether the system is a closed system (True) or an open system (False). This function is a generator to facilitate more responsive output when used with many test cases. :param specs: A list or generator with test specifications as dict structures. :param groupLabel: A label to indicate which group the tests belong to as a string. :param crt: The certificate used to sign the generated receipts if an open system is used. :param pub: The public key used to sign the generated receipts if a closed system is used. :param priv: The private key belonging to the given certificate and public key. :param tcDefaultSize: The turnover counter size in bytes to use if no size is given in the test specification. :param proxy: An object implementing RKSVVerificationProxyI. This will be used to do the actual verification. :yield: A tuple containing (in order) the test cases name, the group label, a boolean indicating whether the system is a closed (True) or an open (False) system, the used turnover counter size in bytes, the result of the test as a TestVerifyResult and the generated error message or None if no error occurred. """ for s in specs: label = s.get('simulationRunLabel', 'Unknown') tc_size = s.get('turnoverCounterSize', tcDefaultSize) closed = s.get('closedSystem', False) if label == 'Unknown': result = TestVerifyResult.ERROR msg = _('No run label') else: pc = pub if closed else crt result, msg = testVerify(s, pc, priv, closed, proxy) yield (label, groupLabel, closed, tc_size, result, msg)
def printTestVerifySummary(results): nFails = sum(r[4] == TestVerifyResult.FAIL for r in results) nErrors = sum(r[4] == TestVerifyResult.ERROR for r in results) print(_('{} tests run, {} failed, {} errors').format(len(results), nFails, nErrors))
def verify(self, fd, keyStore, aesKey, inState, registerIdx, chunksize): # Save the _() function. trvec = ( __builtin__._ , depparser._, key_store._, receipt._, verification_state._, verify._, verify_receipt._, ) # Temporarily disable translations to make sure error # messages match. ( __builtin__._ , depparser._, key_store._, receipt._, verification_state._, verify._, verify_receipt._, ) = [lambda x: x] * len(trvec) try: parser = depparser.IncrementalDEPParser.fromFd(fd, True) outState = verify.verifyParsedDEP(parser, keyStore, aesKey, inState, registerIdx, self.pool, self.nprocs, chunksize) finally: ( __builtin__._ , depparser._, key_store._, receipt._, verification_state._, verify._, verify_receipt._, ) = trvec return outState
def process_edit_cmd(self, arg=''): """{EDIT_CMD} [object|filename] Open {EDITOR} with session history, provided filename or object's source file. - without arguments, a temporary file containing session history is created and opened in {EDITOR}. On quitting the editor, all the non commented lines in the file are executed. - with a filename argument, the file is opened in the editor. On close, you are returned bay to the interpreter. - with an object name argument, an attempt is made to lookup the source file of the object and it is opened if found. Else the argument is treated as a filename. """ line_num_opt = '' if arg: obj = self.lookup(arg) try: if obj: filename = inspect.getsourcefile(obj) _, line_no = inspect.getsourcelines(obj) line_num_opt = config['LINE_NUM_OPT'].format(line_no=line_no) else: filename = arg except (IOError, TypeError, NameError) as e: return self.writeline(e) else: # - make a list of all lines in session history, commenting # any non-blank lines. filename = self._mktemp_buffer("# {}".format(line) if line else '' for line in (line.strip('\n') for line in self.session_history)) # - shell out to the editor os.system('{} {} {}'.format(config['EDITOR'], line_num_opt, filename)) # - if arg was not provided (we edited session history), execute # it in the current namespace if not arg: self._exec_from_file(filename) os.unlink(filename)
def process_sh_cmd(self, cmd): """{SH_EXEC} [cmd [args ...] | {{fmt string}}] Escape to {SHELL} or execute `cmd` in {SHELL} - without arguments, the current interpreter will be suspended and you will be dropped in a {SHELL} prompt. Use fg to return. - with arguments, the text will be executed in {SHELL} and the output/error will be displayed. Additionally '_' will contain a named tuple with the (<stdout>, <stderror>, <return_code>) for the execution of the command. You may pass strings from the global namespace to the command line using the `.format()` syntax. for example: >>> filename = '/does/not/exist' >>> !ls {{filename}} ls: cannot access /does/not/exist: No such file or directory >>> _ CmdExec(out='', err='ls: cannot access /does/not/exist: No such file or directory\n', rc=2) """ if cmd: try: cmd = cmd.format(**self.locals) cmd = shlex.split(cmd) if cmd[0] == 'cd': os.chdir(os.path.expanduser(os.path.expandvars(' '.join(cmd[1:]) or '${HOME}'))) else: cmd_exec = namedtuple('CmdExec', ['out', 'err', 'rc']) process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = process.communicate() rc = process.returncode print (red(err.decode('utf-8')) if err else green(out.decode('utf-8'), bold=False)) builtins._ = cmd_exec(out, err, rc) del cmd_exec except: self.showtraceback() else: if os.getenv('SSH_CONNECTION'): # I use the bash function similar to the one below in my # .bashrc to directly open a python prompt on remote # systems I log on to. # function rpython { ssh -t $1 -- "python" } # Unfortunately, suspending this ssh session, does not place me # in a shell, so I need to create one: os.system(config['SHELL']) else: os.kill(os.getpid(), signal.SIGSTOP)