我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys.__stdout__()。
def test_and(self): code = "(and (do (print \"a\") 1) (do (print \"b\") 0) (do (print \"c\") 1))" tree = self.parser.parse_line(code) io = StringIO() sys.stdout = io result = self.runner.evaluate(tree[0]) sys.stdout = sys.__stdout__ self.assertEqual(result, 0) self.assertEqual(io.getvalue(), "a\nb\n") code = "(and True False True)" tree = self.parser.parse_line(code) result = self.runner.evaluate(tree[0]) self.assertEqual(result, False) code = "(and True 1 4)" tree = self.parser.parse_line(code) result = self.runner.evaluate(tree[0]) self.assertEqual(result, 4)
def _check_docs(self, module): if self._skip: # Printing this directly to __stdout__ so that it doesn't get # captured by nose. print("Warning: Skipping doctests for %s because " "pdbpp is installed." % module.__name__, file=sys.__stdout__) return try: doctest.testmod( module, verbose=True, raise_on_error=True, optionflags=self.flags, ) except doctest.UnexpectedException as e: raise e.exc_info[1] except doctest.DocTestFailure as e: print("Got:") print(e.got) raise
def configure_logging(debug): '''Sets the data kennel logger to appropriate levels of chattiness.''' default_logger = logging.getLogger('') datadog_logger = logging.getLogger('datadog.api') requests_logger = logging.getLogger('requests') if debug: default_logger.setLevel(logging.DEBUG) datadog_logger.setLevel(logging.INFO) requests_logger.setLevel(logging.INFO) else: default_logger.setLevel(logging.INFO) datadog_logger.setLevel(logging.WARNING) requests_logger.setLevel(logging.WARNING) stream_handler = logging.StreamHandler(sys.__stdout__) stream_handler.setLevel(logging.DEBUG) stream_handler.setFormatter(logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s')) default_logger.addHandler(stream_handler)
def doItConsolicious(opt): # reclaim stdout/stderr from log sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ if opt['zipfile']: print 'Unpacking documentation...' for n in zipstream.unzipIter(opt['zipfile'], opt['ziptargetdir']): if n % 100 == 0: print n, if n % 1000 == 0: print print 'Done unpacking.' if opt['compiledir']: print 'Compiling to pyc...' import compileall compileall.compile_dir(opt["compiledir"]) print 'Done compiling.'
def test_main_function(self): # List files file_list = [os.path.join(test_fold, f) for f in self._input_files] # Output to string with io.StringIO() as io_stream: sys.stdout = io_stream hpp2plantuml.CreatePlantUMLFile(file_list) io_stream.seek(0) # Read string output, exclude final line return output_str = io_stream.read()[:-1] sys.stdout = sys.__stdout__ nt.assert_equal(self._diag_saved_ref, output_str) # Output to file output_fname = 'output.puml' hpp2plantuml.CreatePlantUMLFile(file_list, output_fname) output_fcontent = '' with open(output_fname, 'rt') as fid: output_fcontent = fid.read() nt.assert_equal(self._diag_saved_ref, output_fcontent) os.unlink(output_fname)
def get_terminal_size(fallback=(80, 24)): """ Return tuple containing columns and rows of controlling terminal, trying harder than shutil.get_terminal_size to find a tty before returning fallback. Theoretically, stdout, stderr, and stdin could all be different ttys that could cause us to get the wrong measurements (instead of using the fallback) but the much more common case is that IO is piped. """ for stream in [sys.__stdout__, sys.__stderr__, sys.__stdin__]: try: # Make WINSIZE call to terminal data = fcntl.ioctl(stream.fileno(), TIOCGWINSZ, b"\x00\x00\00\x00") except (IOError, OSError): pass else: # Unpack two shorts from ioctl call lines, columns = struct.unpack("hh", data) break else: columns, lines = fallback return columns, lines
def become_daemon(self, root_dir='/'): if os.fork() != 0: # launch child and ... os._exit(0) # kill off parent os.setsid() os.chdir(root_dir) os.umask(0) if os.fork() != 0: # fork again so we are not a session leader os._exit(0) sys.stdin.close() sys.__stdin__ = sys.stdin sys.stdout.close() sys.stdout = sys.__stdout__ = _NullDevice() sys.stderr.close() sys.stderr = sys.__stderr__ = _NullDevice() for fd in range(1024): try: os.close(fd) except OSError: pass
def find_pep8_errors(cls, filename=None, lines=None): try: sys.stdout = cStringIO.StringIO() config = {} # Ignore long lines on test files, as the test names can get long # when following our test naming standards. if cls._is_test(filename): config['ignore'] = ['E501'] checker = pep8.Checker(filename=filename, lines=lines, **config) checker.check_all() output = sys.stdout.getvalue() finally: sys.stdout = sys.__stdout__ errors = [] for line in output.split('\n'): parts = line.split(' ', 2) if len(parts) == 3: location, error, desc = parts line_no = location.split(':')[1] errors.append('%s ln:%s %s' % (error, line_no, desc)) return errors
def emit_func(func, o=sys.__stdout__, d = init()): """Emits all items in the data store in a format such that it can be sourced by a shell.""" keys = (key for key in d.keys() if not key.startswith("__") and not d.getVarFlag(key, "func", False)) for key in keys: emit_var(key, o, d, False) o.write('\n') emit_var(func, o, d, False) and o.write('\n') newdeps = bb.codeparser.ShellParser(func, logger).parse_shell(d.getVar(func, True)) newdeps |= set((d.getVarFlag(func, "vardeps", True) or "").split()) seen = set() while newdeps: deps = newdeps seen |= deps newdeps = set() for dep in deps: if d.getVarFlag(dep, "func", False) and not d.getVarFlag(dep, "python", False): emit_var(dep, o, d, False) and o.write('\n') newdeps |= bb.codeparser.ShellParser(dep, logger).parse_shell(d.getVar(dep, True)) newdeps |= set((d.getVarFlag(dep, "vardeps", True) or "").split()) newdeps -= seen
def pkt_verify(parent, rcv_pkt, exp_pkt): if str(exp_pkt) != str(rcv_pkt): logging.error("ERROR: Packet match failed.") logging.debug("Expected (" + str(len(exp_pkt)) + ")") logging.debug(str(exp_pkt).encode('hex')) sys.stdout = tmpout = StringIO() exp_pkt.show() sys.stdout = sys.__stdout__ logging.debug(tmpout.getvalue()) logging.debug("Received (" + str(len(rcv_pkt)) + ")") logging.debug(str(rcv_pkt).encode('hex')) sys.stdout = tmpout = StringIO() Ether(rcv_pkt).show() sys.stdout = sys.__stdout__ logging.debug(tmpout.getvalue()) parent.assertEqual(str(exp_pkt), str(rcv_pkt), "Packet match error") return rcv_pkt
def quit(self, *args): # restore stdout before signaling the run thread to exit! # it can get stuck in trying to dump to the redirected text message area sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ try: self.display.quit() except: pass self.root.destroy() print "* All done!" # sys.exit() # force quit! return # ----------------------------------------------------------
def shutdown(self, c): ''' Shutdown this process ''' try: try: util.debug('manager received shutdown message') c.send(('#RETURN', None)) if sys.stdout != sys.__stdout__: util.debug('resetting stdout, stderr') sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ util._run_finalizers(0) for p in active_children(): util.debug('terminating a child process of manager') p.terminate() for p in active_children(): util.debug('terminating a child process of manager') p.join() util._run_finalizers() util.info('manager exiting with exitcode 0') except: import traceback traceback.print_exc() finally: exit(0)
def __dir__(self): return dir(sys.__stdout__)
def __getattribute__(self, name): if name == '__members__': return dir(sys.__stdout__) try: stream = _local.stream except AttributeError: stream = sys.__stdout__ return getattr(stream, name)
def __repr__(self): return repr(sys.__stdout__) # add the threaded stream as display hook
def execute(self, context): if not hasattr(bpy, "wow_game_data"): print("\n\n### Loading game data ###") bpy.ops.scene.load_wow_filesystem() game_data = bpy.wow_game_data for ob in bpy.context.selected_objects: mesh = ob.data for i in range(len(mesh.materials)): if mesh.materials[i].active_texture is not None \ and not mesh.materials[i].WowMaterial.Texture1 \ and mesh.materials[i].active_texture.type == 'IMAGE' \ and mesh.materials[i].active_texture.image is not None: path = (os.path.splitext(bpy.path.abspath(mesh.materials[i].active_texture.image.filepath))[0] + ".blp", "") rest_path = "" while True: path = os.path.split(path[0]) if not path[1]: print("\nTexture <<{}>> not found.".format(mesh.materials[i].active_texture.image.filepath)) break rest_path = os.path.join(path[1], rest_path) rest_path = rest_path[:-1] if rest_path.endswith("\\") else rest_path sys.stdout = open(os.devnull, 'w') if game_data.read_file(rest_path): mesh.materials[i].WowMaterial.Texture1 = rest_path break sys.stdout = sys.__stdout__ self.report({'INFO'}, "Done filling texture paths") return {'FINISHED'}
def __init__(self, log, prefix = ""): self.console = sys.__stdout__ self.log = log self.prefix = prefix
def init(ctx): global LOGFILE filename = os.path.abspath(LOGFILE) try: os.makedirs(os.path.dirname(os.path.abspath(filename))) except OSError: pass if hasattr(os, 'O_NOINHERIT'): fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT) fileobj = os.fdopen(fd, 'w') else: fileobj = open(LOGFILE, 'w') old_stderr = sys.stderr # sys.stdout has already been replaced, so __stdout__ will be faster #sys.stdout = log_to_file(sys.stdout, fileobj, filename) #sys.stderr = log_to_file(sys.stderr, fileobj, filename) def wrap(stream): if stream.isatty(): return ansiterm.AnsiTerm(stream) return stream sys.stdout = log_to_file(wrap(sys.__stdout__), fileobj, filename) sys.stderr = log_to_file(wrap(sys.__stderr__), fileobj, filename) # now mess with the logging module... for x in Logs.log.handlers: try: stream = x.stream except AttributeError: pass else: if id(stream) == id(old_stderr): x.stream = sys.stderr
def init(ctx): global LOGFILE filename = os.path.abspath(LOGFILE) try: os.makedirs(os.path.dirname(os.path.abspath(filename))) except OSError: pass if hasattr(os, 'O_NOINHERIT'): fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT) fileobj = os.fdopen(fd, 'w') else: fileobj = open(LOGFILE, 'w') old_stderr = sys.stderr # sys.stdout has already been replaced, so __stdout__ will be faster #sys.stdout = log_to_file(sys.stdout, fileobj, filename) #sys.stderr = log_to_file(sys.stderr, fileobj, filename) sys.stdout = log_to_file(sys.__stdout__, fileobj, filename) sys.stderr = log_to_file(sys.__stderr__, fileobj, filename) # now mess with the logging module... for x in Logs.log.handlers: try: stream = x.stream except AttributeError: pass else: if id(stream) == id(old_stderr): x.stream = sys.stderr
def handle(self, *args, **options): if (options['reset_db']): call_command('cleardatabase') print("Grammar objects initially in database: {}".format(Grammar.objects.count())) #Number of randomly generated grammars num = options['num'] #Number variables this run will include. #For example [2,3] will run the script to generate #grammars with 2 variables and 3 variables nVariables = [2, 3] nonTerminals = ['A','B','C','D'] terminals = ['x','y','z','w'] if options['silent']: sys.stdout = open(os.devnull, "w") for n in nVariables: start_time = time.time() mg = MassGrammarGenerator.MassGrammarGenerator(n) mg.run(num,nonTerminals[:n],terminals) print("{}Variables: {} seconds---".format(n,(time.time() - start_time))) if options['silent']: sys.stdout = sys.__stdout__ print("Grammar objects finally in database: {}".format(Grammar.objects.count()))
def quiet(): # save stdout/stderr # Jupyter doesn't support setting it back to # sys.__stdout__ and sys.__stderr__ _sys_stdout = sys.stdout _sys_stderr = sys.stderr # Divert stdout and stderr to devnull sys.stdout = sys.stderr = open(os.devnull, "w") try: yield finally: # Revert back to standard stdout/stderr sys.stdout = _sys_stdout sys.stderr = _sys_stderr
def _finalize(self): sys.stdout = sys.__stdout__ logging.root.removeHandler(self._log_handler) self._log_handler.close() logger.info('Jupyter window finalized successfully')
def plot_compute_node(self): tag = 'compute' redirected = False if sys.stdout == sys.__stdout__: self.outfile = open(self.compute_dot_file, "w") sys.stdout = self.outfile redirected = True self.__digraph_open(tag) # Title self.__cluster_open('ComputeNode', 'red') self.__cluster_name('Compute Node', 1, 'yellow') self.__cluster_close() # Plot nodes self.__cluster_open_plain('Nova') self.__plot_vms() self.__plot_linux_bridge() self.__cluster_close_plain() self.__cluster_open_plain('OVS') self.__plot_br_int_compute() self.__plot_br_tun(tag) self.__cluster_close_plain() # Plot edges self.__plot_title_edges(tag) self.__plot_vms_to_linuxbridge() self.__plot_linuxbridge_to_br_int() self.__plot_br_int_to_br_tun(tag) if redirected: self.__digraph_close() self.outfile.close() sys.stdout = sys.__stdout__
def plot_network_node(self): tag = 'network' redirected = False if sys.stdout == sys.__stdout__: self.outfile = open(self.network_dot_file, "w") sys.stdout = self.outfile redirected = True self.__digraph_open(tag) self.__cluster_open('NetworkNode', 'red') self.__cluster_name('Network Node', 1, 'yellow') self.__cluster_close() # Plot nodes self.__cluster_open_plain('OVS') self.__plot_br_ex_network() self.__plot_br_int_network() self.__plot_br_tun(tag) self.__cluster_close_plain() # Plot edges self.__plot_title_edges(tag) self.__plot_br_int_to_br_tun(tag) self.__plot_br_ex_to_br_int() if redirected: self.__digraph_close() self.outfile.close() sys.stdout = sys.__stdout__
def debug(msg): if settings['debug']: if sys.stdout != sys.__stdout__: tmp = sys.stdout sys.stdout = sys.__stdout__ print('DEBUG: ' + msg) sys.stdout = tmp else: print('DEBUG: ' + msg)
def error(msg): if sys.stdout != sys.__stdout__: tmp = sys.stdout sys.stdout = sys.__stdout__ print('ERROR: ' + msg) sys.stdout = tmp else: print('ERROR: ' + msg)
def warning(msg): if sys.stdout != sys.__stdout__: tmp = sys.stdout sys.stdout = sys.__stdout__ print('WARNING: ' + msg) sys.stdout = tmp else: print('WARNING: ' + msg)
def set_trace(): """Call pdb.set_trace in the calling frame, first restoring sys.stdout to the real output stream. Note that sys.stdout is NOT reset to whatever it was before the call once pdb is done! """ import pdb import sys stdout = sys.stdout sys.stdout = sys.__stdout__ pdb.Pdb().set_trace(sys._getframe().f_back)
def parseArgs(self, argv): """Parse argv and env and configure running environment. """ self.config.configure(argv, doc=self.usage()) log.debug("configured %s", self.config) # quick outs: version, plugins (optparse would have already # caught and exited on help) if self.config.options.version: from nose import __version__ sys.stdout = sys.__stdout__ print "%s version %s" % (os.path.basename(sys.argv[0]), __version__) sys.exit(0) if self.config.options.showPlugins: self.showPlugins() sys.exit(0) if self.testLoader is None: self.testLoader = defaultTestLoader(config=self.config) elif isclass(self.testLoader): self.testLoader = self.testLoader(config=self.config) plug_loader = self.config.plugins.prepareTestLoader(self.testLoader) if plug_loader is not None: self.testLoader = plug_loader log.debug("test loader is %s", self.testLoader) # FIXME if self.module is a string, add it to self.testNames? not sure if self.config.testNames: self.testNames = self.config.testNames else: self.testNames = tolist(self.defaultTest) log.debug('defaultTest %s', self.defaultTest) log.debug('Test names are %s', self.testNames) if self.config.workingDir is not None: os.chdir(self.config.workingDir) self.createTests()
def __init__(self, string): self.stdout = sys.__stdout__ self.appends = '' self.prepends = '' self.isatty = self.stdout.isatty()
def test_task(self): class AddTask(Task): addend_a = properties.Float('First add argument') addend_b = properties.Float('Second add argument') class Result(BaseResult): value = properties.Float('Result of add operation') def __call__(self): self.report_status({'progress': 0., 'message': 'Starting'}) if self.addend_a == self.addend_b: raise PermanentTaskFailure() return self.Result(value=self.addend_a + self.addend_b) add = AddTask(addend_a=0., addend_b=10.) sys.stdout = temp_out = StringIO() result = add() sys.stdout = sys.__stdout__ assert temp_out.getvalue() == 'AddTask | 0% | Starting\n' assert result.value == 10. add = AddTask(addend_a=5., addend_b=5.) with self.assertRaises(PermanentTaskFailure): add() with self.assertRaises(NotImplementedError): Task()() with self.assertRaises(ValueError): Task().report_status(.5)
def write(self, data): try: self.inner_file.write(data) except: sys.stdout = sys.__stdout__ log.DefaultObserver.stderr = sys.stderr = sys.__stderr__ raise
def tearDown(self): sys.stdout = sys.__stdout__ sys.stdin = sys.__stdin__