我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves.cStringIO()。
def readline( self ): if self.dirty: self.fix_dirty() if self.at_eof: return "" rval = [] while 1: line = self.current_block.readline() self.file_pos += len( line ) rval.append( line ) if len( line ) > 0 and line[-1] == '\n': break elif self.current_block_index == self.nblocks - 1: self.at_eof = True break else: self.current_block_index += 1 self.current_block = StringIO( self.load_block( self.current_block_index ) ) return "".join( rval )
def readline( self ): if self.dirty: self.fix_dirty() if self.at_eof: return "" rval = [] while 1: line = self.current_block.readline() rval.append( line ) if len( line ) > 0 and line[-1] == '\n': break elif self.current_block_index == self.nblocks - 1: self.at_eof = True break else: self.current_block_index += 1 self.current_block = StringIO( self.load_block( self.current_block_index ) ) return "".join( rval )
def __str__(self): tempfile = StringIO() tempfile.write("***Bids***\n") if self.bids != None and len(self.bids) > 0: for key, value in self.bids.price_tree.items(reverse=True): tempfile.write('%s' % value) tempfile.write("\n***Asks***\n") if self.asks != None and len(self.asks) > 0: for key, value in list(self.asks.price_tree.items()): tempfile.write('%s' % value) tempfile.write("\n***Trades***\n") if self.tape != None and len(self.tape) > 0: num = 0 for entry in self.tape: if num < 10: # get last 5 entries tempfile.write(str(entry['quantity']) + " @ " + str(entry['price']) + " (" + str(entry['timestamp']) + ") " + str(entry['party1'][0]) + "/" + str(entry['party2'][0]) + "\n") num += 1 else: break tempfile.write("\n") return tempfile.getvalue()
def test_new_query_mocked(self, N, Process): """ A basic functionality test, in a case where everything is just normal. """ _configure_mock(N, Process) gpustats = gpustat.new_query() fp = StringIO() gpustats.print_formatted(fp=fp, no_color=False, show_user=True, show_cmd=True, show_pid=True, show_power=True) result = fp.getvalue() print(result) unescaped = remove_ansi_codes(result) # remove first line (header) unescaped = '\n'.join(unescaped.split('\n')[1:]) self.maxDiff = 4096 self.assertEqual(unescaped, MOCK_EXPECTED_OUTPUT_FULL)
def test_args_endtoend(self, N, Process): """ End-to-end testing given command line args. """ _configure_mock(N, Process) def capture_output(*args): f = StringIO() import contextlib with contextlib.redirect_stdout(f): # requires python 3.4+ try: gpustat.main(*args) except SystemExit: raise AssertionError("Argparse failed (see above error message)") return f.getvalue() s = capture_output('gpustat', ) unescaped = remove_ansi_codes(s) unescaped = '\n'.join(unescaped.split('\n')[1:]) # remove first line (header) self.maxDiff = 4096 self.assertEqual(unescaped, MOCK_EXPECTED_OUTPUT_DEFAULT) s = capture_output('gpustat', '--no-header') self.assertIn("[0]", s.split('\n')[0])
def test_copy_dir(self): from pecan.scaffolds import PecanScaffold class SimpleScaffold(PecanScaffold): _scaffold_dir = ('pecan', os.path.join( 'tests', 'scaffold_fixtures', 'simple' )) SimpleScaffold().copy_to(os.path.join( self.scaffold_destination, 'someapp' ), out_=StringIO()) assert os.path.isfile(os.path.join( self.scaffold_destination, 'someapp', 'foo' )) assert os.path.isfile(os.path.join( self.scaffold_destination, 'someapp', 'bar', 'spam.txt' )) with open(os.path.join( self.scaffold_destination, 'someapp', 'foo' ), 'r') as f: assert f.read().strip() == 'YAR'
def test_start_auto_refresher(self, mock_write_auto_session, mock_kill_all): capturedOut = StringIO() sys.stdout = capturedOut args = argparse.Namespace() args.profile_name = 'profile-name' userSession = collections.OrderedDict() config = collections.OrderedDict() config['role_arn'] = 'the-role-arn' credentials = collections.OrderedDict() credentials['__name__'] = 'creds-name' mock_out_data = mock.Mock() mock_set_data = mock.Mock() mock_out_data.set_data = mock_set_data awsumepy.start_auto_refresher(args, userSession, config, credentials, mock_out_data) mock_set_data.assert_called_with('Auto auto-refresh-profile-name profile-name')
def _indented_print(f_locals, d, indent, excludes=('__init__',), file=sys.stdout): """ Print trace info, indenting based on call depth. """ sindent = tab * indent sep = '=' if d is f_locals else ':' for name in sorted(d, key=lambda a: str(a)): if name not in excludes: if isinstance(d[name], (dict, OrderedDict)): f = cStringIO() _indented_print(f_locals, d[name], 0, file=f) s = " %s%s%s{%s}" % (sindent, name, sep, f.getvalue()) else: s = " %s%s%s%s" % (sindent, name, sep, d[name]) if ' object at ' in s: s = addr_regex.sub('', s) linelen = len(s) leneq = len(s.split(sep, 1)[0]) if linelen > MAXLINE: if '\n' in s: # change indent s = s.replace("\n", "\n%s" % (' '*leneq)) print(s, file=file)
def test_list_inputs(self): self.prob.run_model() stream = cStringIO() inputs = self.prob.model.list_inputs(out_stream=stream) self.assertEqual(sorted(inputs), [ ('comp2.a', [1.]), ('comp2.b', [-4.]), ('comp2.c', [3.]), ('comp3.a', [1.]), ('comp3.b', [-4.]), ('comp3.c', [3.]) ]) text = stream.getvalue() self.assertEqual(text.count('comp2.'), 3) self.assertEqual(text.count('comp3.'), 3) self.assertEqual(text.count('value:'), 6)
def get_publisherinfo(self, header=None, ccancel=None): """Get publisher information from the repository.""" try: pubs = self._frepo.get_publishers() buf = cStringIO() p5i.write(buf, pubs) except Exception as e: reason = "Unable to retrieve publisher configuration " \ "data:\n{0}".format(e) ex = tx.TransportProtoError("file", errno.EPROTO, reason=reason, repourl=self._url) self.__record_proto_error(ex) raise ex buf.seek(0) return buf
def get_publisherinfo(self, header=None, ccancel=None): """Get publisher information from the repository.""" try: pubs = self._arc.get_publishers() buf = cStringIO() p5i.write(buf, pubs) except Exception as e: reason = "Unable to retrieve publisher configuration " \ "data:\n{0}".format(e) ex = tx.TransportProtoError("file", errno.EPROTO, reason=reason, repourl=self._url) self.__record_proto_error(ex) raise ex buf.seek(0) return buf
def get_versions(self, header=None, ccancel=None): """Query the repo for versions information. Returns a file-like object.""" buf = cStringIO() vops = { "catalog": ["1"], "file": ["0"], "manifest": ["0"], "publisher": ["0", "1"], "versions": ["0"], "status": ["0"] } buf.write("pkg-server {0}\n".format(pkg.VERSION)) buf.write("\n".join( "{0} {1}".format(op, " ".join(vers)) for op, vers in six.iteritems(vops) ) + "\n") buf.seek(0) self.__stats.record_tx() return buf
def run_pylint(): buff = StringIO() reporter = text.ParseableTextReporter(output=buff) args = ["--include-ids=y", "-E", "cinder"] lint.Run(args, reporter=reporter, exit=False) val = buff.getvalue() buff.close() return val
def blank(cls, path, environ=None, base_url=None, headers=None, **kwargs): # pragma: no cover """Adds parameters compatible with WebOb > 1.2: POST and **kwargs.""" try: request = super(Request, cls).blank( path, environ=environ, base_url=base_url, headers=headers, **kwargs ) if cls._request_charset and not cls._request_charset == 'utf-8': return request.decode(cls._request_charset) return request except TypeError: if not kwargs: raise data = kwargs.pop('POST', None) if data is not None: environ = environ or {} environ['REQUEST_METHOD'] = 'POST' if hasattr(data, 'items'): data = list(data.items()) if not isinstance(data, str): data = urlencode(data) environ['wsgi.input'] = cStringIO(data) environ['webob.is_body_seekable'] = True environ['CONTENT_LENGTH'] = str(len(data)) environ['CONTENT_TYPE'] = 'application/x-www-form-urlencoded' base = super(Request, cls).blank(path, environ=environ, base_url=base_url, headers=headers) if kwargs: obj = cls(base.environ, **kwargs) obj.headers.update(base.headers) return obj else: return base
def pp(value): """ Utility method used to print the data nicely. """ output = cStringIO() PrettyPrinter(stream=output).pprint(value) return output.getvalue()
def __init__(self, delimiter): """Initializes the writer wrapper. Args: delimiter: A one-character string used to separate fields. """ self._state = (delimiter) self._buffer = moves.cStringIO() # Since we use self._writer to encode individual rows, we set # lineterminator='' so that self._writer doesn't add a newline. self._writer = csv.writer( self._buffer, lineterminator='', delimiter=delimiter)
def fix_dirty( self ): chunk, offset = self.get_block_and_offset( self.file_pos ) if self.current_block_index != chunk: self.current_block = StringIO( self.load_block( chunk ) ) self.current_block.read( offset ) self.current_block_index = chunk else: self.current_block.seek( offset ) self.dirty = False
def __init__(self, config, buildroot): self.config = config self.buildroot = buildroot self.rundir = buildroot.make_chroot_path(RUNDIR) self.socket_path = os.path.join(self.rundir, SOCKET_NAME) self.executed_commands = [] # util.do cannot return output when the command fails, we need to # capture it's logging self.log_buffer = StringIO() self.log = logging.getLogger("mockbuild.plugin.pm_request") self.log.level = logging.DEBUG self.log.addFilter(OutputFilter()) self.log.propagate = False self.log.addHandler(logging.StreamHandler(self.log_buffer))
def __repr__(self): return self.print_to(StringIO()).getvalue()
def _JoinedStr(self, t): self.write("f") strings = [] for value in t.values: if isinstance(value, ast.Str) or isinstance(value, typed_ast.ast3.Str): strings.append(value.s) continue unparser = type(self)(value, cStringIO()) s = unparser.f.getvalue().rstrip() for delimiter in ['"""', "'''", '"', "'"]: if s.startswith(delimiter) and s.endswith(delimiter): s = s[len(delimiter):-len(delimiter)] break strings.append(s) self.write(repr(''.join(strings)))
def unparse(tree: t.Union[ast.AST, typed_ast.ast3.AST]) -> str: """Unparse the abstract syntax tree into a str. Behave just like astunparse.unparse(tree), but handle trees which are typed, untyped, or mixed. In other words, a mixture of ast.AST-based and typed_ast.ast3-based nodes will be unparsed. """ stream = cStringIO() Unparser(tree, file=stream) return stream.getvalue()
def dump( tree: t.Union[ast.AST, typed_ast.ast3.AST], annotate_fields: bool=True, include_attributes: bool=False) -> str: """Behave just like astunparse.dump(tree), but handle typed_ast.ast3-based trees.""" stream = cStringIO() Printer( file=stream, annotate_fields=annotate_fields, include_attributes=include_attributes).visit(tree) return stream.getvalue()
def __call__(self, environ, start_response): try: return self.app(environ, start_response) except Exception as exc: # get a formatted exception out = StringIO() print_exc(file=out) LOG.exception(exc) # get formatted WSGI environment formatted_environ = pformat(environ) # render our template result = debug_template.render( traceback=out.getvalue(), environment=formatted_environ ) # construct and return our response response = Response() if isinstance(exc, HTTPException): response.status_int = exc.status else: response.status_int = 500 response.unicode_body = result return response(environ, start_response)
def setUp(self): super(TestScaffoldUtils, self).setUp() self.scaffold_destination = tempfile.mkdtemp() self.out = sys.stdout sys.stdout = StringIO()
def test_destination_directory_levels_deep(self): from pecan.scaffolds import copy_dir f = StringIO() copy_dir( ( 'pecan', os.path.join('tests', 'scaffold_fixtures', 'simple') ), os.path.join(self.scaffold_destination, 'some', 'app'), {}, out_=f ) assert os.path.isfile(os.path.join( self.scaffold_destination, 'some', 'app', 'foo') ) assert os.path.isfile(os.path.join( self.scaffold_destination, 'some', 'app', 'bar', 'spam.txt') ) with open(os.path.join( self.scaffold_destination, 'some', 'app', 'foo' ), 'r') as f: assert f.read().strip() == 'YAR' with open(os.path.join( self.scaffold_destination, 'some', 'app', 'bar', 'spam.txt' ), 'r') as f: assert f.read().strip() == 'Pecan'
def test_destination_directory_already_exists(self): from pecan.scaffolds import copy_dir f = StringIO() copy_dir( ( 'pecan', os.path.join('tests', 'scaffold_fixtures', 'simple') ), os.path.join(self.scaffold_destination), {}, out_=f ) assert 'already exists' in f.getvalue()
def test_copy_dir_with_file_content_substitution(self): from pecan.scaffolds import copy_dir copy_dir( ( 'pecan', os.path.join('tests', 'scaffold_fixtures', 'content_sub'), ), os.path.join( self.scaffold_destination, 'someapp' ), {'package': 'thingy'}, out_=StringIO() ) assert os.path.isfile(os.path.join( self.scaffold_destination, 'someapp', 'foo') ) assert os.path.isfile(os.path.join( self.scaffold_destination, 'someapp', 'bar', 'spam.txt') ) with open(os.path.join( self.scaffold_destination, 'someapp', 'foo' ), 'r') as f: assert f.read().strip() == 'YAR thingy' with open(os.path.join( self.scaffold_destination, 'someapp', 'bar', 'spam.txt' ), 'r') as f: assert f.read().strip() == 'Pecan thingy'
def test_logging_setup(self): class RootController(object): @expose() def index(self): import logging logging.getLogger('pecantesting').info('HELLO WORLD') return "HELLO WORLD" f = StringIO() app = TestApp(make_app(RootController(), logging={ 'loggers': { 'pecantesting': { 'level': 'INFO', 'handlers': ['memory'] } }, 'handlers': { 'memory': { 'level': 'INFO', 'class': 'logging.StreamHandler', 'stream': f } } })) app.get('/') assert f.getvalue() == 'HELLO WORLD\n'
def test_logging_setup_with_config_obj(self): class RootController(object): @expose() def index(self): import logging logging.getLogger('pecantesting').info('HELLO WORLD') return "HELLO WORLD" f = StringIO() from pecan.configuration import conf_from_dict app = TestApp(make_app(RootController(), logging=conf_from_dict({ 'loggers': { 'pecantesting': { 'level': 'INFO', 'handlers': ['memory'] } }, 'handlers': { 'memory': { 'level': 'INFO', 'class': 'logging.StreamHandler', 'stream': f } } }))) app.get('/') assert f.getvalue() == 'HELLO WORLD\n'
def test_basic_single_default_hook(self): _stdout = StringIO() class RootController(object): @expose() def index(self): return 'Hello, World!' app = TestApp( make_app( RootController(), hooks=lambda: [ RequestViewerHook(writer=_stdout) ] ) ) response = app.get('/') out = _stdout.getvalue() assert response.status_int == 200 assert response.body == b_('Hello, World!') assert 'path' in out assert 'method' in out assert 'status' in out assert 'method' in out assert 'params' in out assert 'hooks' in out assert '200 OK' in out assert "['RequestViewerHook']" in out assert '/' in out
def test_bad_response_from_app(self): """When exceptions are raised the hook deals with them properly""" _stdout = StringIO() class RootController(object): @expose() def index(self): return 'Hello, World!' app = TestApp( make_app( RootController(), hooks=lambda: [ RequestViewerHook(writer=_stdout) ] ) ) response = app.get('/404', expect_errors=True) out = _stdout.getvalue() assert response.status_int == 404 assert 'path' in out assert 'method' in out assert 'status' in out assert 'method' in out assert 'params' in out assert 'hooks' in out assert '404 Not Found' in out assert "['RequestViewerHook']" in out assert '/' in out
def test_single_blacklist_item(self): _stdout = StringIO() class RootController(object): @expose() def index(self): return 'Hello, World!' app = TestApp( make_app( RootController(), hooks=lambda: [ RequestViewerHook( config={'blacklist': ['/']}, writer=_stdout ) ] ) ) response = app.get('/') out = _stdout.getvalue() assert response.status_int == 200 assert response.body == b_('Hello, World!') assert out == ''
def test_item_not_in_defaults(self): _stdout = StringIO() class RootController(object): @expose() def index(self): return 'Hello, World!' app = TestApp( make_app( RootController(), hooks=lambda: [ RequestViewerHook( config={'items': ['date']}, writer=_stdout ) ] ) ) response = app.get('/') out = _stdout.getvalue() assert response.status_int == 200 assert response.body == b_('Hello, World!') assert 'date' in out assert 'method' not in out assert 'status' not in out assert 'method' not in out assert 'params' not in out assert 'hooks' not in out assert '200 OK' not in out assert "['RequestViewerHook']" not in out assert '/' not in out
def test_print_version(self): capturedOut = StringIO() sys.stdout = capturedOut awsumepy.print_version() sys.stdout = sys.__stdout__ self.assertEqual(capturedOut.getvalue(), 'Version ' + awsumepy.__version__ + '\n')
def test_handle_profiles(self, mock_validate_profiles, mock_is_role_profile, mock_requires_mfa): config = collections.OrderedDict() credentials = collections.OrderedDict() arguments = argparse.Namespace() arguments.profile_name = 'profile-admin' mock_requires_mfa.return_value = False mock_is_role_profile.return_value = False capturedOut = StringIO() sys.stdout = capturedOut mock_set_data = mock.Mock() mock_out_data = mock.Mock() mock_out_data.set_data = mock_set_data awsumepy.handle_profiles(config, credentials, arguments, mock_out_data) mock_set_data.assert_called_with('Awsume None None None None profile-admin') mock_is_role_profile.return_value = True awsumepy.handle_profiles(config, credentials, arguments, mock_out_data) mock_requires_mfa.return_value = True mock_is_role_profile.return_value = False awsumepy.handle_profiles(config, credentials, arguments, mock_out_data) mock_is_role_profile.return_value = True awsumepy.handle_profiles(config, credentials, arguments, mock_out_data) self.assertEqual(mock_validate_profiles.call_count, 4)
def test_print_formatted_data(self): capturedOut = StringIO() sys.stderr = capturedOut awsumepy.print_formatted_data('some-formatted-data') sys.stderr = sys.__stderr__ self.assertNotEqual(capturedOut.getvalue(), '')
def test_inp_inp_conn_no_src(self): raise unittest.SkipTest("no setup testing yet") self.p.model.connect('G3.G4.C3.x', 'G3.G4.C4.x') stream = cStringIO() self.p.setup(out_stream=stream) self.p['G3.G4.C3.x'] = 999. self.assertEqual(self.p.model.G3.G4.C3._inputs['x'], 999.) self.assertEqual(self.p.model.G3.G4.C4._inputs['x'], 999.) content = stream.getvalue() self.assertTrue("The following parameters have no associated unknowns:\nG1.G2.C1.x\nG3.G4.C3.x\nG3.G4.C4.x" in content) self.assertTrue("The following components have no connections:\nG1.G2.C1\nG1.G2.C2\nG3.G4.C3\nG3.G4.C4\n" in content) self.assertTrue("No recorders have been specified, so no data will be saved." in content)
def test_list_explicit_outputs(self): self.prob.run_model() stream = cStringIO() outputs = self.prob.model.list_outputs(implicit=False, out_stream=stream) self.assertEqual(sorted(outputs), [ ('comp1.a', [1.]), ('comp1.b', [-4.]), ('comp1.c', [3.]) ]) text = stream.getvalue() self.assertEqual(text.count('comp1.'), 3) self.assertEqual(text.count('value:'), 3) self.assertEqual(text.count('residual:'), 3)
def test_list_implicit_outputs(self): self.prob.run_model() stream = cStringIO() states = self.prob.model.list_outputs(explicit=False, out_stream=stream) self.assertEqual(sorted(states), [ ('comp2.x', [3.]), ('comp3.x', [3.]) ]) text = stream.getvalue() self.assertEqual(text.count('comp2.x'), 1) self.assertEqual(text.count('comp3.x'), 1) self.assertEqual(text.count('value:'), 2) self.assertEqual(text.count('residual:'), 2)
def test_linesearch_vector_bound_enforcement(self): top = self.top ls = top.model.nonlinear_solver.linesearch = BoundsEnforceLS(bound_enforcement='vector') ls.options['print_bound_enforce'] = True # Setup again because we assigned a new linesearch top.setup(check=False) # Test lower bounds: should go to the lower bound and stall top['px.x'] = 2.0 top['comp.y'] = 0. top['comp.z'] = 1.6 top.run_model() for ind in range(3): assert_rel_error(self, top['comp.z'][ind], [1.5], 1e-8) # Test upper bounds: should go to the minimum upper bound and stall top['px.x'] = 0.5 top['comp.y'] = 0. top['comp.z'] = 2.4 stdout = sys.stdout strout = StringIO() sys.stdout = strout try: top.run_model() finally: sys.stdout = stdout txt = strout.getvalue() self.assertTrue("'comp.z' exceeds upper bound" in txt) for ind in range(3): assert_rel_error(self, top['comp.z'][ind], [2.5], 1e-8)
def run_model(prob): """Call `run_model` on problem and capture output.""" stdout = sys.stdout strout = StringIO() sys.stdout = strout try: prob.run_model() finally: sys.stdout = stdout return strout.getvalue()
def test_find_cite_with_write(self): p = self.prob p.setup() dest = StringIO() find_citations(p, out_stream=dest) expected = """Class: <class 'openmdao.core.problem.Problem'> @inproceedings{2014_openmdao_derivs, Author = {Justin S. Gray and Tristan A. Hearn and Kenneth T. Moore and John Hwang and Joaquim Martins and Andrew Ning}, Booktitle = {15th AIAA/ISSMO Multidisciplinary Analysis and Optimization Conference}, Doi = {doi:10.2514/6.2014-2042}, Month = {2014/07/08}, Publisher = {American Institute of Aeronautics and Astronautics}, Title = {Automatic Evaluation of Multidisciplinary Derivatives Using a Graph-Based Problem Formulation in OpenMDAO}, Year = {2014} } Class: <class 'openmdao.core.group.Group'> foobar model Class: <class 'openmdao.solvers.nonlinear.nonlinear_runonce.NonlinearRunOnce'> foobar nonlinear_solver Class: <class 'openmdao.solvers.linear.linear_runonce.LinearRunOnce'> foobar linear_solver Class: <class 'openmdao.components.exec_comp.ExecComp'> foobar exec comp""" self.assertEqual(expected, dest.getvalue().strip())
def publisher_0(self, *tokens): """Returns a pkg(7) information datastream based on the repository configuration's publisher information.""" prefix = self._get_req_pub() pubs = [ pub for pub in self.repo.get_publishers() if not prefix or pub.prefix == prefix ] if prefix and not pubs: # Publisher specified in request is unknown. e = srepo.RepositoryUnknownPublisher(prefix) cherrypy.log("Request failed: {0}".format(str(e))) raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e)) buf = cStringIO() try: p5i.write(buf, pubs) except Exception as e: # Treat any remaining error as a 404, but log it and # include the real failure information. cherrypy.log("Request failed: {0}".format(str(e))) raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e)) buf.seek(0) self.__set_response_expires("publisher", 86400*365, 86400*365) # Page handlers MUST return bytes. return misc.force_bytes(buf.getvalue())
def publisher_1(self, *tokens): """Returns a pkg(7) information datastream based on the the request's publisher or all if not specified.""" prefix = self._get_req_pub() pubs = [] if not prefix: pubs = self.repo.get_publishers() else: try: pub = self.repo.get_publisher(prefix) pubs.append(pub) except Exception as e: # If the Publisher object creation fails, return # a not found error to the client so it will # treat it as an unsupported operation. cherrypy.log("Request failed: {0}".format( str(e))) raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e)) buf = cStringIO() try: p5i.write(buf, pubs) except Exception as e: # Treat any remaining error as a 404, but log it and # include the real failure information. cherrypy.log("Request failed: {0}".format(str(e))) raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e)) buf.seek(0) self.__set_response_expires("publisher", 86400*365, 86400*365) return buf.getvalue()
def _parse_html_error(content): """Parse a html document that contains error information. Return the html as a plain text string.""" msg = None if not content: return msg from xml.dom.minidom import Document, parse dom = parse(cStringIO(content)) msg = "" paragraphs = [] if not isinstance(dom, Document): # Assume the output was the message. msg = content else: paragraphs = dom.getElementsByTagName("p") # XXX this is specific to the depot server's current # error output style. for p in paragraphs: for c in p.childNodes: if c.nodeType == c.TEXT_NODE: value = c.nodeValue if value is not None: msg += ("\n{0}".format(value)) return msg
def get_versions(self, header=None, ccancel=None): """Query the repo for versions information. Returns a file-like object.""" buf = cStringIO() vops = { "abandon": ["0"], "add": ["0"], "admin": ["0"], "append": ["0"], "catalog": ["1"], "close": ["0"], "file": ["0", "1"], "manifest": ["0", "1"], "open": ["0"], "publisher": ["0", "1"], "search": ["1"], "status": ["0"], "versions": ["0"], } buf.write("pkg-server {0}\n".format(pkg.VERSION)) buf.write("\n".join( "{0} {1}".format(op, " ".join(vers)) for op, vers in six.iteritems(vops) ) + "\n") buf.seek(0) self.__stats.record_tx() return buf
def intercept_log_messages(): try: mylog = logging.getLogger('nova') stream = cStringIO() handler = logging.logging.StreamHandler(stream) handler.setFormatter(formatters.ContextFormatter()) mylog.logger.addHandler(handler) yield stream finally: mylog.logger.removeHandler(handler)
def _check_error(self,args,code,expected,**kw): original = sys.stderr try: actual = cStringIO() sys.stderr = actual try: shell.main(args,**kw) except SystemExit as e: self.assertEqual(code,e.args[0]) else: self.fail('No exception raised') finally: sys.stderr = original actual = actual.getvalue() self.assertTrue(expected in actual,'%r not in:\n"""\n%s\n"""'%(expected,actual))