我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pprint.pformat()。
def pre_prepare_second_call(**kwargs): """ this function gets called prior to an issued prepare but gets called 'after' the first pre_prepare() call identified above If you return False here, you will skip the preparation entirely """ logger.info( 'DEBUG HOOK pre_prep()\n{kwargs}'.format( kwargs=pformat(kwargs, indent=4, depth=2), ) ) # Below is an example of how you can tag 1 function to 2 different # kinds of hooks:
def take_action(self, parsed_args): session = self.app.client.sessions.get(parsed_args.session_id) if not session: raise exceptions.ApiClientException('Session not found') column = ( 'Session ID', 'Description', 'Status', 'Jobs' ) data = ( session.get('session_id'), session.get('description'), session.get('status'), pprint.pformat(session.get('jobs')) ) return column, data
def _cached_roles(search): """ Return the cached roles in a convenient structure. Trust the cached values from the master pillar since a downed minion will be absent from any dynamic query. Also, do not worry about downed minions that are outside of the search criteria. """ pillar_util = salt.utils.master.MasterPillarUtil(search, "compound", use_cached_grains=True, grains_fallback=False, opts=__opts__) cached = pillar_util.get_minion_pillar() roles = {} for minion in cached: if 'roles' in cached[minion]: for role in cached[minion]['roles']: roles.setdefault(role, []).append(minion) log.debug(pprint.pformat(roles)) return roles.keys()
def _run(cmd): """ NOTE: Taken from osd.py module. """ log.info(cmd) proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) proc.wait() _stdout = proc.stdout.read().rstrip() _stderr = proc.stdout.read().rstrip() log.debug("return code: {}".format(proc.returncode)) log.debug(_stdout) log.debug(_stderr) log.debug(pprint.pformat(proc.stdout.read())) log.debug(pprint.pformat(proc.stderr.read())) # return proc.returncode, _stdout, _stderr return proc.returncode, _stdout, _stderr
def __init__(self, _id, **kwargs): """ Initialize settings, connect to Ceph cluster """ self.osd_id = _id self.settings = { 'conf': "/etc/ceph/ceph.conf", 'filename': '/var/run/ceph/osd.{}-weight'.format(id), 'timeout': 60, 'keyring': '/etc/ceph/ceph.client.admin.keyring', 'client': 'client.admin', 'delay': 6 } self.settings.update(kwargs) log.debug("settings: {}".format(pprint.pformat(self.settings))) # self.cluster=rados.Rados(conffile=self.settings['conf']) self.cluster = rados.Rados(conffile=self.settings['conf'], conf=dict(keyring=self.settings['keyring']), name=self.settings['client']) try: self.cluster.connect() except Exception as error: raise RuntimeError("connection error: {}".format(error))
def readlink(device, follow=True): """ Return the short name for a symlink device """ option = '' if follow: option = '-f' cmd = "readlink {} {}".format(option, device) log.info(cmd) proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) proc.wait() result = proc.stdout.read().rstrip() log.debug(pprint.pformat(result)) log.debug(pprint.pformat(proc.stderr.read())) return result # pylint: disable=too-many-instance-attributes
def __init__(self, device, **kwargs): """ Set attributes for an OSD """ self.device = readlink(device) # top_level_identifiier self.tli = self._set_tli() self.capacity = self.set_capacity() self.size = self.set_bytes() self.small = self._set_small() self.disk_format = self.set_format() self.journal = self.set_journal() self.journal_size = self.set_journal_size() self.wal_size = self.set_wal_size() self.wal = self.set_wal() self.db_size = self.set_db_size() # pylint: disable=invalid-name self.db = self.set_db() # default for encryption can be retrieved from the global pillar self.encryption = self.set_encryption() self.types = self.set_types() log.debug("OSD config: \n{}".format(pprint.pformat(vars(self))))
def set_journal(self, default=False): """ Return the journal device, if defined """ if self._config_version() == OSDConfig.V1: struct = self._convert_data_journals(__pillar__['storage']['data+journals']) log.debug("struct: \n{}".format(pprint.pformat(struct))) if self.device in struct: return struct[self.device] else: log.info("No journal specified for {}".format(self.device)) if self._config_version() == OSDConfig.V2: if (self.device in self.tli and 'journal' in self.tli[self.device]): return self.tli[self.device]['journal'] else: log.info("No journal specified for {}".format(self.device)) return default # pylint: disable=no-self-use
def _fsck(device, _partition): """ Check filesystem on partition Note: xfs_repair returns immediately on success, but takes 3m39s to fail on some broken filesystems. Not good for automation. """ prefix = '' if 'nvme' in device: prefix = 'p' # cmd = "/sbin/fsck -t xfs -n {}{}{}".format(device, prefix, partition) cmd = "/usr/sbin/xfs_admin -u {}{}{}".format(device, prefix, _partition) log.info(cmd) proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) proc.wait() log.debug(pprint.pformat(proc.stdout.read())) log.debug(pprint.pformat(proc.stderr.read())) log.debug("xfs_admin: {}".format(proc.returncode)) return proc.returncode == 0
def link_internal(data, fields=('name', 'product', 'location', 'unit')): """Link internal exchanges by ``fields``. Creates ``input`` field in newly-linked exchanges.""" input_databases = get_input_databases(data) get_tuple = lambda exc: tuple([exc[f] for f in fields]) products = { get_tuple(reference_product(ds)): (ds['database'], ds['code']) for ds in data } for ds in data: for exc in ds['exchanges']: if exc.get('input'): continue if exc['type'] == 'biosphere': raise ValueError("Unlinked biosphere exchange:\n{}".format(pformat(exc))) try: exc['input'] = products[get_tuple(exc)] except KeyError: raise KeyError("Can't find linking activity for exchange:\n{}".format(pformat(exc))) return data
def meld(got, expected): if got == expected: return import inspect call_frame = inspect.getouterframes(inspect.currentframe(), 2) test_name = call_frame[1][3] from pprint import pformat import os from os import path os.makedirs(test_name, exist_ok=True) got_fn = path.join(test_name, 'got') expected_fn = path.join(test_name, 'expected') with open(got_fn, 'w') as got_f, open(expected_fn, 'w') as expected_f: got_f.write(pformat(got)) expected_f.write(pformat(expected)) import subprocess subprocess.run(['meld', got_fn, expected_fn])
def compare_with_expected_file(test, dirpath, results, basename=None): results_str = pprint.pformat(results, width=120) if basename: results_fn = basename + '.results' expected_fn = basename + '.expected' else: results_fn = 'results' expected_fn = 'expected' # save results in a file with open(os.path.join(dirpath, results_fn), 'w') as f: print(results_str, file=f) # read expected from a file with open(os.path.join(dirpath, expected_fn)) as f: expected = f.read().rstrip() test.assertMultiLineEqual(expected, results_str) # # capture a directory tree as a dict 'tree', where each key is a directory path # and the value is a sorted list of filenames #
def _compare_eq_iterable(left, right, verbose=False): if not verbose: return [u('Use -v to get the full diff')] # dynamic import to speedup pytest import difflib try: left_formatting = pprint.pformat(left).splitlines() right_formatting = pprint.pformat(right).splitlines() explanation = [u('Full diff:')] except Exception: # hack: PrettyPrinter.pformat() in python 2 fails when formatting items that can't be sorted(), ie, calling # sorted() on a list would raise. See issue #718. # As a workaround, the full diff is generated by using the repr() string of each item of each container. left_formatting = sorted(repr(x) for x in left) right_formatting = sorted(repr(x) for x in right) explanation = [u('Full diff (fallback to calling repr on each item):')] explanation.extend(line.strip() for line in difflib.ndiff(left_formatting, right_formatting)) return explanation
def on_open(self): """ Once the connection is made, start the query off and start an event loop to wait for a signal to stop. Results are yielded within receive(). """ def event_loop(): logger.debug(pformat(self.query.request)) self.send(json.dumps(self.query.request)) while not self.event.is_set(): #print('Waiting around on the socket') self.event.wait(self.gettimeout()) logger.debug('Event loop terminating.') self.thread = threading.Thread( target=event_loop) self.thread.setDaemon(True) self.thread.start()
def get_wings_on_edge(self, pos1, side1_name, side2_name): wings = self.get_wings(pos1) wings_to_keep = [] #log.info("get_wings_on_edge for pos1 %d, side1 %s, side2 %s, init_wings %s" % (pos1, side1_name, side2_name, pformat(wings))) for (wing_pos1, wing_pos2) in wings: wing_pos1_side = self.get_side_for_index(wing_pos1) wing_pos2_side = self.get_side_for_index(wing_pos2) #log.info("get_wings_on_edge wing_pos1 %d side %s, wing_pos2 %d side %s\n" % # (wing_pos1, wing_pos1_side, wing_pos2, wing_pos2_side)) if ((wing_pos1_side.name == side1_name and wing_pos2_side.name == side2_name) or (wing_pos2_side.name == side1_name and wing_pos1_side.name == side2_name)): wings_to_keep.append((wing_pos1, wing_pos2)) #log.info("get_wings_on_edge keeping %s\n" % pformat(wings_to_keep)) return wings_to_keep
def __repr__(self): return '{} {}'.format(self.value, pprint.pformat(self.context))
def context_repr(self): if self.context: return pprint.pformat(self.context) return ''
def populate(self): # This function will be used to populate the UI. Shocking. I know. # First lets clear all the items that are in the list to start fresh self.listWidget.clear() # Then we ask our library to find everything again in case things changed self.library.find() # Now we iterate through the dictionary # This is why I based our library on a dictionary, because it gives us all the nice tricks a dictionary has for name, info in self.library.items(): # We create an item for the list widget and tell it to have our controller name as a label item = QtWidgets.QListWidgetItem(name) # We set its tooltip to be the info from the json # The pprint.pformat will format our dictionary nicely item.setToolTip(pprint.pformat(info)) # Finally we check if there's a screenshot available screenshot = info.get('screenshot') # If there is, then we will load it if screenshot: # So first we make an icon with the path to our screenshot icon = QtGui.QIcon(screenshot) # then we set the icon onto our item item.setIcon(icon) # Finally we add our item to the list self.listWidget.addItem(item) # This is a convenience function to display our UI
def pre_prepare(**kwargs): """ this function gets called prior to an issued prepare If you return False here, you will skip the preparation entirely """ logger.info( 'DEBUG HOOK pre_prep()\n{kwargs}'.format( kwargs=pformat(kwargs, indent=4, depth=2), ) ) # Below is an example of how you can define a fuction name that differs # from the actual hook you wnat to tie it to
def pre_and_pre_prepare_call(**kwargs): """ A demo of how you can hook tag a function twice """ logger.info( 'DEBUG HOOK pre_and_post_prep()\n{kwargs}'.format( kwargs=pformat(kwargs, indent=4, depth=2), ) )
def post_prepare(**kwargs): """ this function gets called after to an issued prepare """ logger.info( 'DEBUG HOOK post_prep()\n{kwargs}'.format( kwargs=pformat(kwargs, indent=4, depth=2), ) )
def pre_stage(**kwargs): """ this function gets called prior to an issued stage If you return False here, you will skip the staging entirely """ logger.info( 'DEBUG HOOK pre_stage()\n{kwargs}'.format( kwargs=pformat(kwargs, indent=4, depth=2), ) )
def post_staged_segment(**kwargs): """ this function gets called after a segment has been staged for posting """ logger.info( 'DEBUG HOOK post_staged_segment()\n{kwargs}'.format( kwargs=pformat(kwargs, indent=4, depth=2), ) )
def post_staged_nzb(**kwargs): """ this function gets called after a NZB-File has been staged for saving """ logger.info( 'DEBUG HOOK post_staged_nzb()\n{kwargs}'.format( kwargs=pformat(kwargs, indent=4, depth=2), ) )
def post_stage(session, **kwargs): """ this function gets called prior to an issued stage """ logger.info( 'DEBUG HOOK post_stage()\n{kwargs}'.format( kwargs=pformat(kwargs, indent=4, depth=2), ) )
def pre_upload(**kwargs): """ this function gets called prior to an issued upload If you return False here, you will skip the upload entirely """ logger.info( 'DEBUG HOOK pre_upload()\n{kwargs}'.format( kwargs=pformat(kwargs, indent=4, depth=2), ) )
def upload_article(**kwargs): """ this function gets called prior to an actual article upload If you return False from this function, you 'will' prevent the article from being uploaded. """ logger.info( 'DEBUG HOOK upload_article()\n{kwargs}'.format( kwargs=pformat(kwargs, indent=4, depth=2), ) )
def pre_verify(**kwargs): """ this function gets called prior to running a verification check If you return False here, you will skip the verify entirely """ logger.info( 'DEBUG HOOK pre_verify()\n{kwargs}'.format( kwargs=pformat(kwargs, indent=4, depth=2), ) )
def post_verify(**kwargs): """ this function gets called after running a verification check """ logger.info( 'DEBUG HOOK pre_upload()\n{kwargs}'.format( kwargs=pformat(kwargs, indent=4, depth=2), ) )
def pre_clean(**kwargs): """ this function gets called prior to running a cleanup If you return False here, you will skip the clean-up entirely """ logger.info( 'DEBUG HOOK pre_clean()\n{kwargs}'.format( kwargs=pformat(kwargs, indent=4, depth=2), ) )
def post_clean(**kwargs): """ this function gets called after running a cleanup """ logger.info( 'DEBUG HOOK pre_upload()\n{kwargs}'.format( kwargs=pformat(kwargs, indent=4, depth=2), ) )
def configure_linebot_app(app): @app.after_request def commit_database(response): db.commit() return response @app.route("/api/line_webhook", methods=["POST"]) def line_webhook(): signature = request.headers['X-Line-Signature'] body = request.get_data(as_text=True) logger.debug(f'Incoming message:\n{pformat(body)}') try: line_webhook_handler.handle(body, signature) except InvalidSignatureError: logger.warning('Message with an invalid signature received') abort(400) except LineBotApiError as e: logger.error(f'{e}\nDetails:\n{pformat(e.error.details)}') abort(500) except Exception as e: logger.error(f'Uncaught error: {e}') abort(500) return "OK"
def do_lookup(icao_identifier, date): ln = LoggingNight(icao_identifier, date, try_cache=True) if ln.in_zulu: time_format = '%H%M Zulu' else: time_format = '%I:%M %p' if dev_mode == "true": result = dict( airport=icao_identifier, name=ln.name, date=date.isoformat(), sunset=ln.sun_set.strftime(time_format), end_civil=ln.end_civil_twilight.strftime(time_format), one_hour=ln.hour_after_sunset.strftime(time_format), airport_debug=pprint.pformat(ln.airport, indent=4), usno_debug=pprint.pformat(ln.usno, indent=4) ) else: result = dict( airport=icao_identifier, name=ln.name, date=date.isoformat(), sunset=ln.sun_set.strftime(time_format), end_civil=ln.end_civil_twilight.strftime(time_format), one_hour=ln.hour_after_sunset.strftime(time_format) ) return result
def buildFacade(schema): cls = type(schema.name, (Type,), dict(name=schema.name, version=schema.version, schema=schema)) source = """ class {name}Facade(Type): name = '{name}' version = {version} schema = {schema} """.format(name=schema.name, version=schema.version, schema=textwrap.indent(pprint.pformat(schema), " ")) return cls, source
def _format_unequal_keys(dicts): return pformat([sorted(d.keys()) for d in dicts])
def assertDictEqual(self, d1, d2, msg=None): self.assertIsInstance(d1, dict, 'First argument is not a dictionary') self.assertIsInstance(d2, dict, 'Second argument is not a dictionary') if d1 != d2: standardMsg = '%s != %s' % (safe_repr(d1, True), safe_repr(d2, True)) diff = ('\n' + '\n'.join(difflib.ndiff( pprint.pformat(d1).splitlines(), pprint.pformat(d2).splitlines()))) standardMsg = self._truncateMessage(standardMsg, diff) self.fail(self._formatMessage(msg, standardMsg))
def dump_option_dicts(self, header=None, commands=None, indent=""): from pprint import pformat if commands is None: # dump all command option dicts commands = self.command_options.keys() commands.sort() if header is not None: self.announce(indent + header) indent = indent + " " if not commands: self.announce(indent + "no commands known yet") return for cmd_name in commands: opt_dict = self.command_options.get(cmd_name) if opt_dict is None: self.announce(indent + "no option dict for '%s' command" % cmd_name) else: self.announce(indent + "option dict for '%s' command:" % cmd_name) out = pformat(opt_dict) for line in out.split('\n'): self.announce(indent + " " + line) # -- Config file finding/parsing methods ---------------------------
def print_processed_results(self, counts, failures): for p, d in zip(self.possible_results, self.result_descriptions): print('{}: {}'. format(d, counts[p])) if failures: print('Failures:') print(pprint.pformat(failures))
def take_action(self, parsed_args): job = self.app.client.jobs.get(parsed_args.job_id) if not job: raise exceptions.ApiClientException('Job not found') column = ( 'Job ID', 'Client ID', 'User ID', 'Session ID', 'Description', 'Actions', 'Start Date', 'End Date', 'Interval', ) data = ( job.get('job_id'), job.get('client_id'), job.get('user_id'), job.get('session_id', ''), job.get('description'), pprint.pformat(job.get('job_actions')), job.get('job_schedule', {}).get('schedule_start_date', ''), job.get('job_schedule', {}).get('schedule_end_date', ''), job.get('job_schedule', {}).get('schedule_interval', ''), ) return column, data
def take_action(self, parsed_args): backup = self.app.client.backups.get(parsed_args.backup_uuid) if not backup: raise exceptions.ApiClientException('Backup not found') column = ( 'Backup ID', 'Metadata' ) data = ( backup.get('backup_uuid'), pprint.pformat(backup.get('backup_metadata')) ) return column, data
def create_app(self): """Send a POST to spinnaker to create a new application with class variables. Raises: AssertionError: Application creation failed. """ self.appinfo['accounts'] = self.get_accounts() self.log.debug('Pipeline Config\n%s', pformat(self.pipeline_config)) self.log.debug('App info:\n%s', pformat(self.appinfo)) jsondata = self.retrieve_template() wait_for_task(jsondata) self.log.info("Successfully created %s application", self.appname) return
def retrieve_template(self): """Sets the instance links with pipeline_configs and then renders template files Returns: jsondata: A json objects containing templates """ links = self.retrieve_instance_links() self.log.debug('Links is \n%s', pformat(links)) self.pipeline_config['instance_links'].update(links) jsondata = get_template(template_file='infrastructure/app_data.json.j2', appinfo=self.appinfo, pipeline_config=self.pipeline_config) self.log.debug('jsondata is %s', pformat(jsondata)) return jsondata
def create_elb(self): """Create or Update the ELB after rendering JSON data from configs. Asserts that the ELB task was successful. """ json_data = self.make_elb_json() LOG.debug('Block ELB JSON Data:\n%s', pformat(json_data)) wait_for_task(json_data) self.add_listener_policy(json_data) self.add_backend_policy(json_data) self.configure_attributes(json_data)
def render_wrapper(self, region='us-east-1'): """Generate the base Pipeline wrapper. This renders the non-repeatable stages in a pipeline, like jenkins, baking, tagging and notifications. Args: region (str): AWS Region. Returns: dict: Rendered Pipeline wrapper. """ base = self.settings['pipeline']['base'] if self.base: base = self.base email = self.settings['pipeline']['notifications']['email'] slack = self.settings['pipeline']['notifications']['slack'] deploy_type = self.settings['pipeline']['type'] pipeline_id = self.compare_with_existing(region=region) data = { 'app': { 'appname': self.app_name, 'base': base, 'deploy_type': deploy_type, 'environment': 'packaging', 'region': region, 'triggerjob': self.trigger_job, 'email': email, 'slack': slack, }, 'id': pipeline_id } self.log.debug('Wrapper app data:\n%s', pformat(data)) wrapper = get_template(template_file='pipeline/pipeline_wrapper.json.j2', data=data) return json.loads(wrapper)
def render_wrapper(self, region='us-east-1'): """Generate the base Pipeline wrapper. This renders the non-repeatable stages in a pipeline, like jenkins, baking, tagging and notifications. Args: region (str): AWS Region. Returns: dict: Rendered Pipeline wrapper. """ base = self.base or self.settings['pipeline']['base'] email = self.settings['pipeline']['notifications']['email'] slack = self.settings['pipeline']['notifications']['slack'] deploy_type = self.settings['pipeline']['type'] pipeline_id = self.compare_with_existing(region=region) data = { 'app': { 'appname': self.app_name, 'base': base, 'deploy_type': deploy_type, 'environment': 'packaging', 'region': region, 'triggerjob': self.trigger_job, 'email': email, 'slack': slack, }, 'id': pipeline_id } self.log.debug('Wrapper app data:\n%s', pformat(data)) wrapper = get_template(template_file='pipeline/pipeline_wrapper.json.j2', data=data) return json.loads(wrapper)
def debug(*objects): flask.current_app.logger.debug('\n'.join(map(pprint.pformat, objects)))