Python sys 模块,__stdout__() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys.__stdout__()

项目:clopure    作者:vbkaisetsu    | 项目源码 | 文件源码
def test_and(self):
        code = "(and (do (print \"a\") 1) (do (print \"b\") 0) (do (print \"c\") 1))"
        tree = self.parser.parse_line(code)
        io = StringIO()
        sys.stdout = io
        result = self.runner.evaluate(tree[0])
        sys.stdout = sys.__stdout__
        self.assertEqual(result, 0)
        self.assertEqual(io.getvalue(), "a\nb\n")
        code = "(and True False True)"
        tree = self.parser.parse_line(code)
        result = self.runner.evaluate(tree[0])
        self.assertEqual(result, False)
        code = "(and True 1 4)"
        tree = self.parser.parse_line(code)
        result = self.runner.evaluate(tree[0])
        self.assertEqual(result, 4)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def _check_docs(self, module):
        if self._skip:
            # Printing this directly to __stdout__ so that it doesn't get
            # captured by nose.
            print("Warning: Skipping doctests for %s because "
                  "pdbpp is installed." % module.__name__, file=sys.__stdout__)
            return
        try:
            doctest.testmod(
                module,
                verbose=True,
                raise_on_error=True,
                optionflags=self.flags,
            )
        except doctest.UnexpectedException as e:
            raise e.exc_info[1]
        except doctest.DocTestFailure as e:
            print("Got:")
            print(e.got)
            raise
项目:data_kennel    作者:amplify-education    | 项目源码 | 文件源码
def configure_logging(debug):
    '''Sets the data kennel logger to appropriate levels of chattiness.'''
    default_logger = logging.getLogger('')
    datadog_logger = logging.getLogger('datadog.api')
    requests_logger = logging.getLogger('requests')
    if debug:
        default_logger.setLevel(logging.DEBUG)
        datadog_logger.setLevel(logging.INFO)
        requests_logger.setLevel(logging.INFO)
    else:
        default_logger.setLevel(logging.INFO)
        datadog_logger.setLevel(logging.WARNING)
        requests_logger.setLevel(logging.WARNING)

    stream_handler = logging.StreamHandler(sys.__stdout__)
    stream_handler.setLevel(logging.DEBUG)
    stream_handler.setFormatter(logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s'))
    default_logger.addHandler(stream_handler)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def doItConsolicious(opt):
    # reclaim stdout/stderr from log
    sys.stdout = sys.__stdout__
    sys.stderr = sys.__stderr__
    if opt['zipfile']:
        print 'Unpacking documentation...'
        for n in zipstream.unzipIter(opt['zipfile'], opt['ziptargetdir']):
            if n % 100 == 0:
                print n,
            if n % 1000 == 0:
                print
        print 'Done unpacking.'

    if opt['compiledir']:
        print 'Compiling to pyc...'
        import compileall
        compileall.compile_dir(opt["compiledir"])
        print 'Done compiling.'
项目:hpp2plantuml    作者:thibaultmarin    | 项目源码 | 文件源码
def test_main_function(self):

        # List files
        file_list = [os.path.join(test_fold, f) for f in self._input_files]

        # Output to string
        with io.StringIO() as io_stream:
            sys.stdout = io_stream
            hpp2plantuml.CreatePlantUMLFile(file_list)
            io_stream.seek(0)
            # Read string output, exclude final line return
            output_str = io_stream.read()[:-1]
        sys.stdout = sys.__stdout__
        nt.assert_equal(self._diag_saved_ref, output_str)

        # Output to file
        output_fname = 'output.puml'
        hpp2plantuml.CreatePlantUMLFile(file_list, output_fname)
        output_fcontent = ''
        with open(output_fname, 'rt') as fid:
            output_fcontent = fid.read()
        nt.assert_equal(self._diag_saved_ref, output_fcontent)
        os.unlink(output_fname)
项目:style50    作者:cs50    | 项目源码 | 文件源码
def get_terminal_size(fallback=(80, 24)):
    """
    Return tuple containing columns and rows of controlling terminal, trying harder
    than shutil.get_terminal_size to find a tty before returning fallback.

    Theoretically, stdout, stderr, and stdin could all be different ttys that could
    cause us to get the wrong measurements (instead of using the fallback) but the much more
    common case is that IO is piped.
    """
    for stream in [sys.__stdout__, sys.__stderr__, sys.__stdin__]:
        try:
            # Make WINSIZE call to terminal
            data = fcntl.ioctl(stream.fileno(), TIOCGWINSZ, b"\x00\x00\00\x00")
        except (IOError, OSError):
            pass
        else:
            # Unpack two shorts from ioctl call
            lines, columns = struct.unpack("hh", data)
            break
    else:
        columns, lines = fallback

    return columns, lines
项目:SameKeyProxy    作者:xzhou    | 项目源码 | 文件源码
def become_daemon(self, root_dir='/'):
        if os.fork() != 0:  # launch child and ...
            os._exit(0)  # kill off parent
        os.setsid()
        os.chdir(root_dir)
        os.umask(0)
        if os.fork() != 0: # fork again so we are not a session leader
            os._exit(0)
        sys.stdin.close()
        sys.__stdin__ = sys.stdin
        sys.stdout.close()
        sys.stdout = sys.__stdout__ = _NullDevice()
        sys.stderr.close()
        sys.stderr = sys.__stderr__ = _NullDevice()
        for fd in range(1024):
            try:
                os.close(fd)
            except OSError:
                pass
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def find_pep8_errors(cls, filename=None, lines=None):
        try:
            sys.stdout = cStringIO.StringIO()
            config = {}

            # Ignore long lines on test files, as the test names can get long
            # when following our test naming standards.
            if cls._is_test(filename):
                config['ignore'] = ['E501']

            checker = pep8.Checker(filename=filename, lines=lines,
                                   **config)
            checker.check_all()
            output = sys.stdout.getvalue()
        finally:
            sys.stdout = sys.__stdout__

        errors = []
        for line in output.split('\n'):
            parts = line.split(' ', 2)
            if len(parts) == 3:
                location, error, desc = parts
                line_no = location.split(':')[1]
                errors.append('%s ln:%s %s' % (error, line_no, desc))
        return errors
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def emit_func(func, o=sys.__stdout__, d = init()):
    """Emits all items in the data store in a format such that it can be sourced by a shell."""

    keys = (key for key in d.keys() if not key.startswith("__") and not d.getVarFlag(key, "func", False))
    for key in keys:
        emit_var(key, o, d, False)

    o.write('\n')
    emit_var(func, o, d, False) and o.write('\n')
    newdeps = bb.codeparser.ShellParser(func, logger).parse_shell(d.getVar(func, True))
    newdeps |= set((d.getVarFlag(func, "vardeps", True) or "").split())
    seen = set()
    while newdeps:
        deps = newdeps
        seen |= deps
        newdeps = set()
        for dep in deps:
            if d.getVarFlag(dep, "func", False) and not d.getVarFlag(dep, "python", False):
               emit_var(dep, o, d, False) and o.write('\n')
               newdeps |=  bb.codeparser.ShellParser(dep, logger).parse_shell(d.getVar(dep, True))
               newdeps |= set((d.getVarFlag(dep, "vardeps", True) or "").split())
        newdeps -= seen
项目:fabric-oftest    作者:opencord    | 项目源码 | 文件源码
def pkt_verify(parent, rcv_pkt, exp_pkt):
    if str(exp_pkt) != str(rcv_pkt):
        logging.error("ERROR: Packet match failed.")
        logging.debug("Expected (" + str(len(exp_pkt)) + ")")
        logging.debug(str(exp_pkt).encode('hex'))
        sys.stdout = tmpout = StringIO()
        exp_pkt.show()
        sys.stdout = sys.__stdout__
        logging.debug(tmpout.getvalue())
        logging.debug("Received (" + str(len(rcv_pkt)) + ")")
        logging.debug(str(rcv_pkt).encode('hex'))
        sys.stdout = tmpout = StringIO()
        Ether(rcv_pkt).show()
        sys.stdout = sys.__stdout__
        logging.debug(tmpout.getvalue())
    parent.assertEqual(str(exp_pkt), str(rcv_pkt),
                       "Packet match error")

    return rcv_pkt
项目:pm-fiber-birefringence    作者:sid5432    | 项目源码 | 文件源码
def quit(self, *args):
        # restore stdout before signaling the run thread to exit!
        # it can get stuck in trying to dump to the redirected text message area
        sys.stdout = sys.__stdout__
        sys.stderr = sys.__stderr__

        try:
            self.display.quit()
        except:
            pass

        self.root.destroy()
        print "* All done!"
        # sys.exit() # force quit!

        return

    # ----------------------------------------------------------
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def doItConsolicious(opt):
    # reclaim stdout/stderr from log
    sys.stdout = sys.__stdout__
    sys.stderr = sys.__stderr__
    if opt['zipfile']:
        print 'Unpacking documentation...'
        for n in zipstream.unzipIter(opt['zipfile'], opt['ziptargetdir']):
            if n % 100 == 0:
                print n,
            if n % 1000 == 0:
                print
        print 'Done unpacking.'

    if opt['compiledir']:
        print 'Compiling to pyc...'
        import compileall
        compileall.compile_dir(opt["compiledir"])
        print 'Done compiling.'
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def shutdown(self, c):
        '''
        Shutdown this process
        '''
        try:
            try:
                util.debug('manager received shutdown message')
                c.send(('#RETURN', None))

                if sys.stdout != sys.__stdout__:
                    util.debug('resetting stdout, stderr')
                    sys.stdout = sys.__stdout__
                    sys.stderr = sys.__stderr__

                util._run_finalizers(0)

                for p in active_children():
                    util.debug('terminating a child process of manager')
                    p.terminate()

                for p in active_children():
                    util.debug('terminating a child process of manager')
                    p.join()

                util._run_finalizers()
                util.info('manager exiting with exitcode 0')
            except:
                import traceback
                traceback.print_exc()
        finally:
            exit(0)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def __dir__(self):
        return dir(sys.__stdout__)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def __getattribute__(self, name):
        if name == '__members__':
            return dir(sys.__stdout__)
        try:
            stream = _local.stream
        except AttributeError:
            stream = sys.__stdout__
        return getattr(stream, name)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def __repr__(self):
        return repr(sys.__stdout__)


# add the threaded stream as display hook
项目:Blender-WMO-import-export-scripts    作者:WowDevTools    | 项目源码 | 文件源码
def execute(self, context):

        if not hasattr(bpy, "wow_game_data"):
            print("\n\n### Loading game data ###")
            bpy.ops.scene.load_wow_filesystem()

        game_data = bpy.wow_game_data

        for ob in bpy.context.selected_objects:
            mesh = ob.data
            for i in range(len(mesh.materials)):
                if mesh.materials[i].active_texture is not None \
                        and not mesh.materials[i].WowMaterial.Texture1 \
                        and mesh.materials[i].active_texture.type == 'IMAGE' \
                        and mesh.materials[i].active_texture.image is not None:
                    path = (os.path.splitext(bpy.path.abspath(mesh.materials[i].active_texture.image.filepath))[0] + ".blp", "")
                    rest_path = ""

                    while True:
                        path = os.path.split(path[0])
                        if not path[1]:
                            print("\nTexture <<{}>> not found.".format(mesh.materials[i].active_texture.image.filepath))
                            break

                        rest_path = os.path.join(path[1], rest_path)
                        rest_path = rest_path[:-1] if rest_path.endswith("\\") else rest_path

                        sys.stdout = open(os.devnull, 'w')

                        if game_data.read_file(rest_path):
                            mesh.materials[i].WowMaterial.Texture1 = rest_path
                            break

                        sys.stdout = sys.__stdout__

            self.report({'INFO'}, "Done filling texture paths")

        return {'FINISHED'}
项目:Harmonbot    作者:Harmon758    | 项目源码 | 文件源码
def __init__(self, log, prefix = ""):
        self.console = sys.__stdout__
        self.log = log
        self.prefix = prefix
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def init(ctx):
    global LOGFILE
    filename = os.path.abspath(LOGFILE)
    try:
        os.makedirs(os.path.dirname(os.path.abspath(filename)))
    except OSError:
        pass

    if hasattr(os, 'O_NOINHERIT'):
        fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT)
        fileobj = os.fdopen(fd, 'w')
    else:
        fileobj = open(LOGFILE, 'w')
    old_stderr = sys.stderr

    # sys.stdout has already been replaced, so __stdout__ will be faster
    #sys.stdout = log_to_file(sys.stdout, fileobj, filename)
    #sys.stderr = log_to_file(sys.stderr, fileobj, filename)
    def wrap(stream):
        if stream.isatty():
            return ansiterm.AnsiTerm(stream)
        return stream
    sys.stdout = log_to_file(wrap(sys.__stdout__), fileobj, filename)
    sys.stderr = log_to_file(wrap(sys.__stderr__), fileobj, filename)

    # now mess with the logging module...
    for x in Logs.log.handlers:
        try:
            stream = x.stream
        except AttributeError:
            pass
        else:
            if id(stream) == id(old_stderr):
                x.stream = sys.stderr
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def init(ctx):
    global LOGFILE
    filename = os.path.abspath(LOGFILE)
    try:
        os.makedirs(os.path.dirname(os.path.abspath(filename)))
    except OSError:
        pass

    if hasattr(os, 'O_NOINHERIT'):
        fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT)
        fileobj = os.fdopen(fd, 'w')
    else:
        fileobj = open(LOGFILE, 'w')
    old_stderr = sys.stderr

    # sys.stdout has already been replaced, so __stdout__ will be faster
    #sys.stdout = log_to_file(sys.stdout, fileobj, filename)
    #sys.stderr = log_to_file(sys.stderr, fileobj, filename)
    def wrap(stream):
        if stream.isatty():
            return ansiterm.AnsiTerm(stream)
        return stream
    sys.stdout = log_to_file(wrap(sys.__stdout__), fileobj, filename)
    sys.stderr = log_to_file(wrap(sys.__stderr__), fileobj, filename)

    # now mess with the logging module...
    for x in Logs.log.handlers:
        try:
            stream = x.stream
        except AttributeError:
            pass
        else:
            if id(stream) == id(old_stderr):
                x.stream = sys.stderr
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def init(ctx):
    global LOGFILE
    filename = os.path.abspath(LOGFILE)
    try:
        os.makedirs(os.path.dirname(os.path.abspath(filename)))
    except OSError:
        pass

    if hasattr(os, 'O_NOINHERIT'):
        fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT)
        fileobj = os.fdopen(fd, 'w')
    else:
        fileobj = open(LOGFILE, 'w')
    old_stderr = sys.stderr

    # sys.stdout has already been replaced, so __stdout__ will be faster
    #sys.stdout = log_to_file(sys.stdout, fileobj, filename)
    #sys.stderr = log_to_file(sys.stderr, fileobj, filename)
    sys.stdout = log_to_file(sys.__stdout__, fileobj, filename)
    sys.stderr = log_to_file(sys.__stderr__, fileobj, filename)

    # now mess with the logging module...
    for x in Logs.log.handlers:
        try:
            stream = x.stream
        except AttributeError:
            pass
        else:
            if id(stream) == id(old_stderr):
                x.stream = sys.stderr
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def __dir__(self):
        return dir(sys.__stdout__)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def __getattribute__(self, name):
        if name == '__members__':
            return dir(sys.__stdout__)
        try:
            stream = _local.stream
        except AttributeError:
            stream = sys.__stdout__
        return getattr(stream, name)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def __repr__(self):
        return repr(sys.__stdout__)


# add the threaded stream as display hook
项目:LL1-Academy    作者:H-Huang    | 项目源码 | 文件源码
def handle(self, *args, **options):
        if (options['reset_db']):
            call_command('cleardatabase')

        print("Grammar objects initially in database: {}".format(Grammar.objects.count()))
        #Number of randomly generated grammars
        num = options['num']

        #Number variables this run will include. 
        #For example [2,3] will run the script to generate
        #grammars with 2 variables and 3 variables
        nVariables = [2, 3]

        nonTerminals = ['A','B','C','D']
        terminals = ['x','y','z','w']



        if options['silent']:
            sys.stdout = open(os.devnull, "w")

        for n in nVariables:
            start_time = time.time()
            mg = MassGrammarGenerator.MassGrammarGenerator(n)
            mg.run(num,nonTerminals[:n],terminals)
            print("{}Variables: {} seconds---".format(n,(time.time() - start_time)))

        if options['silent']:
            sys.stdout = sys.__stdout__

        print("Grammar objects finally in database: {}".format(Grammar.objects.count()))
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def __dir__(self):
        return dir(sys.__stdout__)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def __getattribute__(self, name):
        if name == '__members__':
            return dir(sys.__stdout__)
        try:
            stream = _local.stream
        except AttributeError:
            stream = sys.__stdout__
        return getattr(stream, name)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def __repr__(self):
        return repr(sys.__stdout__)


# add the threaded stream as display hook
项目:scriptcwl    作者:NLeSC    | 项目源码 | 文件源码
def quiet():
    # save stdout/stderr
    # Jupyter doesn't support setting it back to
    # sys.__stdout__ and sys.__stderr__
    _sys_stdout = sys.stdout
    _sys_stderr = sys.stderr
    # Divert stdout and stderr to devnull
    sys.stdout = sys.stderr = open(os.devnull, "w")
    try:
        yield
    finally:
        # Revert back to standard stdout/stderr
        sys.stdout = _sys_stdout
        sys.stderr = _sys_stderr
项目:gui_tool    作者:UAVCAN    | 项目源码 | 文件源码
def _finalize(self):
        sys.stdout = sys.__stdout__
        logging.root.removeHandler(self._log_handler)
        self._log_handler.close()
        logger.info('Jupyter window finalized successfully')
项目:python-don    作者:openstack    | 项目源码 | 文件源码
def plot_compute_node(self):
        tag = 'compute'
        redirected = False
        if sys.stdout == sys.__stdout__:
            self.outfile = open(self.compute_dot_file, "w")
            sys.stdout = self.outfile
            redirected = True
            self.__digraph_open(tag)

        # Title
        self.__cluster_open('ComputeNode', 'red')
        self.__cluster_name('Compute Node', 1, 'yellow')
        self.__cluster_close()

        # Plot nodes
        self.__cluster_open_plain('Nova')
        self.__plot_vms()
        self.__plot_linux_bridge()
        self.__cluster_close_plain()

        self.__cluster_open_plain('OVS')
        self.__plot_br_int_compute()
        self.__plot_br_tun(tag)
        self.__cluster_close_plain()

        # Plot edges
        self.__plot_title_edges(tag)
        self.__plot_vms_to_linuxbridge()
        self.__plot_linuxbridge_to_br_int()
        self.__plot_br_int_to_br_tun(tag)

        if redirected:
            self.__digraph_close()
            self.outfile.close()
            sys.stdout = sys.__stdout__
项目:python-don    作者:openstack    | 项目源码 | 文件源码
def plot_network_node(self):
        tag = 'network'
        redirected = False
        if sys.stdout == sys.__stdout__:
            self.outfile = open(self.network_dot_file, "w")
            sys.stdout = self.outfile
            redirected = True
            self.__digraph_open(tag)

        self.__cluster_open('NetworkNode', 'red')
        self.__cluster_name('Network Node', 1, 'yellow')
        self.__cluster_close()

        # Plot nodes
        self.__cluster_open_plain('OVS')
        self.__plot_br_ex_network()
        self.__plot_br_int_network()
        self.__plot_br_tun(tag)
        self.__cluster_close_plain()

        # Plot edges
        self.__plot_title_edges(tag)
        self.__plot_br_int_to_br_tun(tag)
        self.__plot_br_ex_to_br_int()

        if redirected:
            self.__digraph_close()
            self.outfile.close()
            sys.stdout = sys.__stdout__
项目:python-don    作者:openstack    | 项目源码 | 文件源码
def debug(msg):
    if settings['debug']:
        if sys.stdout != sys.__stdout__:
            tmp = sys.stdout
            sys.stdout = sys.__stdout__
            print('DEBUG: ' + msg)
            sys.stdout = tmp
        else:
            print('DEBUG: ' + msg)
项目:python-don    作者:openstack    | 项目源码 | 文件源码
def error(msg):
    if sys.stdout != sys.__stdout__:
        tmp = sys.stdout
        sys.stdout = sys.__stdout__
        print('ERROR: ' + msg)
        sys.stdout = tmp
    else:
        print('ERROR: ' + msg)
项目:python-don    作者:openstack    | 项目源码 | 文件源码
def warning(msg):
    if sys.stdout != sys.__stdout__:
        tmp = sys.stdout
        sys.stdout = sys.__stdout__
        print('WARNING: ' + msg)
        sys.stdout = tmp
    else:
        print('WARNING: ' + msg)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def set_trace():
    """Call pdb.set_trace in the calling frame, first restoring
    sys.stdout to the real output stream. Note that sys.stdout is NOT
    reset to whatever it was before the call once pdb is done!
    """
    import pdb
    import sys
    stdout = sys.stdout
    sys.stdout = sys.__stdout__
    pdb.Pdb().set_trace(sys._getframe().f_back)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def parseArgs(self, argv):
        """Parse argv and env and configure running environment.
        """
        self.config.configure(argv, doc=self.usage())
        log.debug("configured %s", self.config)

        # quick outs: version, plugins (optparse would have already
        # caught and exited on help)
        if self.config.options.version:
            from nose import __version__
            sys.stdout = sys.__stdout__
            print "%s version %s" % (os.path.basename(sys.argv[0]), __version__)
            sys.exit(0)

        if self.config.options.showPlugins:
            self.showPlugins()
            sys.exit(0)

        if self.testLoader is None:
            self.testLoader = defaultTestLoader(config=self.config)
        elif isclass(self.testLoader):
            self.testLoader = self.testLoader(config=self.config)
        plug_loader = self.config.plugins.prepareTestLoader(self.testLoader)
        if plug_loader is not None:
            self.testLoader = plug_loader
        log.debug("test loader is %s", self.testLoader)

        # FIXME if self.module is a string, add it to self.testNames? not sure

        if self.config.testNames:
            self.testNames = self.config.testNames
        else:
            self.testNames = tolist(self.defaultTest)
        log.debug('defaultTest %s', self.defaultTest)
        log.debug('Test names are %s', self.testNames)
        if self.config.workingDir is not None:
            os.chdir(self.config.workingDir)
        self.createTests()
项目:ceph-medic    作者:ceph    | 项目源码 | 文件源码
def __init__(self, string):
        self.stdout = sys.__stdout__
        self.appends = ''
        self.prepends = ''
        self.isatty = self.stdout.isatty()
项目:properties    作者:aranzgeo    | 项目源码 | 文件源码
def test_task(self):

        class AddTask(Task):

            addend_a = properties.Float('First add argument')
            addend_b = properties.Float('Second add argument')

            class Result(BaseResult):
                value = properties.Float('Result of add operation')

            def __call__(self):
                self.report_status({'progress': 0., 'message': 'Starting'})
                if self.addend_a == self.addend_b:
                    raise PermanentTaskFailure()
                return self.Result(value=self.addend_a + self.addend_b)

        add = AddTask(addend_a=0., addend_b=10.)

        sys.stdout = temp_out = StringIO()
        result = add()
        sys.stdout = sys.__stdout__
        assert temp_out.getvalue() == 'AddTask |   0% | Starting\n'
        assert result.value == 10.

        add = AddTask(addend_a=5., addend_b=5.)
        with self.assertRaises(PermanentTaskFailure):
            add()

        with self.assertRaises(NotImplementedError):
            Task()()

        with self.assertRaises(ValueError):
            Task().report_status(.5)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def __dir__(self):
        return dir(sys.__stdout__)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def __getattribute__(self, name):
        if name == '__members__':
            return dir(sys.__stdout__)
        try:
            stream = _local.stream
        except AttributeError:
            stream = sys.__stdout__
        return getattr(stream, name)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def __repr__(self):
        return repr(sys.__stdout__)


# add the threaded stream as display hook
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def __dir__(self):
        return dir(sys.__stdout__)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def __getattribute__(self, name):
        if name == '__members__':
            return dir(sys.__stdout__)
        try:
            stream = _local.stream
        except AttributeError:
            stream = sys.__stdout__
        return getattr(stream, name)
项目:p2pool-bch    作者:amarian12    | 项目源码 | 文件源码
def write(self, data):
        try:
            self.inner_file.write(data)
        except:
            sys.stdout = sys.__stdout__
            log.DefaultObserver.stderr = sys.stderr = sys.__stderr__
            raise
项目:sqf    作者:LordGolias    | 项目源码 | 文件源码
def tearDown(self):
        sys.stdout = sys.__stdout__
        sys.stdin = sys.__stdin__
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def __dir__(self):
        return dir(sys.__stdout__)
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def __getattribute__(self, name):
        if name == '__members__':
            return dir(sys.__stdout__)
        try:
            stream = _local.stream
        except AttributeError:
            stream = sys.__stdout__
        return getattr(stream, name)
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def __repr__(self):
        return repr(sys.__stdout__)


# add the threaded stream as display hook
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def __dir__(self):
        return dir(sys.__stdout__)