我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pprint.pprint()。
def load_data(self): # work in the parent of the pages directory, because we # want the filenames to begin "pages/...". chdir(dirname(self.setup.pages_dir)) rel = relpath(self.setup.pages_dir) for root, dirs, files in walk(rel): for filename in files: start, ext = splitext(filename) if ext in self.setup.data_extensions: #yield root, dirs, filename loader = self.setup.data_loaders.get(ext) path = join(root,filename) if not loader: raise SetupError("Identified data file '%s' by type '%s' but no loader found" % (filename, ext)) data_key = join(root, start) loaded_dict = loader.loadf(path) self.data[data_key] = loaded_dict #self.setup.log.debug("data key [%s] ->" % (data_key, ), root, filename, ); pprint.pprint(loaded_dict, sys.stdout) #pprint.pprint(self.data, sys.stdout) #print("XXXXX data:", self.data)
def loadData(self): # Load the toilet collection data to pandas collects = pd.read_sql('SELECT * FROM premodeling.toiletcollection', self.conn, coerce_float=True, params=None) pprint.pprint(collects.keys()) collects = collects[['ToiletID','ToiletExID','Collection_Date','Area','Feces_kg_day','year','month']] pprint.pprint(collects.keys()) # Load the density data to pandas density = pd.read_sql('SELECT * FROM premodeling.toiletdensity', self.conn, coerce_float=True, params=None) pprint.pprint(density.keys()) # Return the data self.collects = collects self.density = density return(collects, density)
def constructGCSFilePath(fileUuid, tokenFile): filters = { "op": "=", "content": { "field": "file_id", "value": [fileUuid] } } params = { "filters": json.dumps(filters) } query = "?expand=cases.project" fileInfo = GDCDataUtils.query(tokenFile, "files", query=query, params=params).json() pprint.pprint(fileInfo) return "{project}/{strategy}/{platform}/{uuid}/{filename}".format( project=fileInfo["data"]["hits"][0]["cases"][0]["project"]["project_id"], strategy=str(fileInfo["data"]["hits"][0]["experimental_strategy"]), platform=str(fileInfo["data"]["hits"][0]["platform"]), uuid=str(fileUuid), filename=str(fileInfo["data"]["hits"][0]["file_name"]) )
def pprint(self, *args, **kwargs): """ Pretty-printer for parsed results as a list, using the C{pprint} module. Accepts additional positional or keyword args as defined for the C{pprint.pprint} method. (U{http://docs.python.org/3/library/pprint.html#pprint.pprint}) Example:: ident = Word(alphas, alphanums) num = Word(nums) func = Forward() term = ident | num | Group('(' + func + ')') func <<= ident + Group(Optional(delimitedList(term))) result = func.parseString("fna a,b,(fnb c,d,200),100") result.pprint(width=40) prints:: ['fna', ['a', 'b', ['(', 'fnb', ['c', 'd', '200'], ')'], '100']] """ pprint.pprint(self.asList(), *args, **kwargs) # add support for pickle protocol
def main(argv): parser = build_cli_parser() opts, args = parser.parse_args(argv) if not opts.server_url or not opts.token: print "Missing required param; run with --help for usage" sys.exit(-1) # build a cbapi object # cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify) start = 0 pagesize=100 while True: results = cb.alert_search(opts.query, rows=int(pagesize), start=start) if len(results['results']) == 0: break for result in results['results']: pprint.pprint(result) start = start + int(pagesize)
def pprint(to_be_printed): """nicely formated print""" try: import pprint as pp # generate an instance PrettyPrinter # pp.PrettyPrinter().pprint(to_be_printed) pp.pprint(to_be_printed) except ImportError: if isinstance(to_be_printed, dict): print('{') for k, v in to_be_printed.items(): print("'" + k + "'" if str(k) == k else k, ': ', "'" + v + "'" if str(v) == v else v, sep="") print('}') else: print('could not import pprint module, appling regular print') print(to_be_printed) # todo: this should rather be a class instance
def test(**kwargs): """ Runtime test case """ args = _parse_args(kwargs) local_client = salt.client.LocalClient() proposals = local_client.cmd(args['target'], 'proposal.test', expr_form='compound', kwarg=args) # determine which proposal to choose for node, proposal in proposals.items(): _proposal = _choose_proposal(node, proposal, args) if _proposal: pprint.pprint(_proposal)
def peek(**kwargs): """ Display the output to the user """ args = _parse_args(kwargs) local_client = salt.client.LocalClient() proposals = local_client.cmd(args['target'], 'proposal.generate', expr_form='compound', kwarg=args) # determine which proposal to choose for node, proposal in proposals.items(): _proposal = _choose_proposal(node, proposal, args) if _proposal: pprint.pprint(_proposal)
def _record_filter(args, base_dir): """ Save the filter provided """ filter_file = '{}/.filter'.format(base_dir) if not isfile(filter_file): # do a touch filter_file open(filter_file, 'a').close() current_filter = {} with open(filter_file) as filehandle: current_filter = yaml.load(filehandle) if current_filter is None: current_filter = {} pprint.pprint(current_filter) # filter a bunch of salt content and the target key before writing rec_args = {k: v for k, v in args.items() if k is not 'target' and not k.startswith('__')} current_filter[args['target']] = rec_args with open(filter_file, 'w') as filehandle: yaml.dump(current_filter, filehandle, default_flow_style=False)
def systemInfo(): print("sys.platform: %s" % sys.platform) print("sys.version: %s" % sys.version) from .Qt import VERSION_INFO print("qt bindings: %s" % VERSION_INFO) global __version__ rev = None if __version__ is None: ## this code was probably checked out from bzr; look up the last-revision file lastRevFile = os.path.join(os.path.dirname(__file__), '..', '.bzr', 'branch', 'last-revision') if os.path.exists(lastRevFile): rev = open(lastRevFile, 'r').read().strip() print("pyqtgraph: %s; %s" % (__version__, rev)) print("config:") import pprint pprint.pprint(CONFIG_OPTIONS) ## Rename orphaned .pyc files. This is *probably* safe :) ## We only do this if __version__ is None, indicating the code was probably pulled ## from the repository.
def query_api(term, location): """Queries the API by the input values from the user. Args: term (str): The search term to query. location (str): The location of the business to query. """ response = search(API_KEY, term, location) businesses = response.get('businesses') if not businesses: print(u'No businesses for {0} in {1} found.'.format(term, location)) return business_id = businesses[0]['id'] print(u'{0} businesses found, querying business info ' \ 'for the top result "{1}" ...'.format( len(businesses), business_id)) response = get_business(API_KEY, business_id) print(u'Result for business "{0}" found:'.format(business_id)) pprint.pprint(response, indent=2)
def store_serp_result(self, serp, config): """Store the parsed SERP page. When called from SearchEngineScrape, then a parser object is passed. When called from caching, a list of serp object are given. """ if self.outfile: data = self.row2dict(serp) data['results'] = [] for link in serp.links: data['results'].append(self.row2dict(link)) if self.output_format == 'json': self.outfile.write(data) elif self.output_format == 'csv': serp = self.row2dict(serp) self.outfile.write(data, serp) elif self.output_format == 'stdout': if config.get('print_results') == 'summarize': print(serp) elif config.get('print_results') == 'all': pprint.pprint(data)
def write_statement(vardict): """ A function to write a conditional statement based on the conditions in a variable Args DICT[dict] VARDICT A dictionary of variable names, where the values are conditions Returns LIST[str] Conditions A list of condition statements """ conditions = [] for feat in vardict: if bool(vardict[feat])==True: # Dictionary is not empty, parse the split values into a statement for split in vardict[feat]: if ((split=='and')|(split=="or")): statement = split.join(['("%s"%s%s)' %(feat,sp[0],sp[1]) for sp in vardict[feat][split]]) elif (split=='not'): statement = split + ' "%s"%s%s' %(feat, vardict[feat][split][0], vardict[feat][split][1]) elif (split=='list'): statement = '"%s"' %(feat) + "=any('{%s}')" %(','.join(vardict[feat][split])) conditions.append('('+statement+')') pprint.pprint(conditions) return(conditions)
def run(parsed_args, job_type): start = time() worker_run(**vars(parsed_args)) logger.info(u"finished update in {} seconds".format(elapsed(start))) # resp = None # if job_type in ["normal"]: # my_location = PageNew.query.get(parsed_args.id) # resp = my_location.__dict__ # pprint(resp) print "done" return # python doi_queue.py --hybrid --filename=data/dois_juan_accuracy.csv --dynos=40 --soup
def default(self, line, *args, **kwargs): async def query(): data = await self.request.query(line) if self.print_full_response: print('*' * 80) print('Full Response: \n') pprint.pprint(data) print('*' * 80) print('\n') table = Tabulate(data) print(table.draw()) asyncio.get_event_loop().run_until_complete(query())
def get_replay(): response = requests.get( "https://www.rocketleaguereplays.com/api/replays?owned", params={ # "owned": "" } ) print(response) print(response.url) pprint(response.json()) # _id = response.json()['id'] # # response = requests.post( # "https://www.rocketleaguereplays.com/api/replays/{}".format(_id) # ) # print(response) # pprint(response.json())
def process_ping(filename, ip=None, check_ssh_connectivity_only=False): if not os.path.isfile(filename): return False status_update('Trying to read ' + filename) with open(filename) as f: lines = f.readlines() pprint.pprint(lines) info = load_json(filename) if not check_ssh_connectivity_only: return info.get('pass', False) cmd_list = info['command_list'] for cmd in cmd_list: m = re.search( 'ssh (\S+) with provided username and passwd', cmd['cmd']) if m: if ip == m.group(1): return cmd['pass'] return False
def submit_requests(queue, logger, key=ARGS.key): """Submit queued requests to each Dashboard org contained within.""" operations = {"add": meraki_admins.DashboardAdmins.add_admin, "modify": meraki_admins.DashboardAdmins.update_admin, "delete": meraki_admins.DashboardAdmins.del_admin} for oid, user_list in queue.items(): submitter = meraki_admins.DashboardAdmins(oid, key) for user in user_list: operation = user.pop('operation') request_id = user.pop('request_id') if operation not in operations.keys(): error = "Unknown operation %s" % operation # add new column to fail_tracker tied to original row logger.fail_tracker[request_id]['error'] = error print request_id print logger.fail_tracker[request_id]['error'] print logger.fail_tracker[request_id] continue else: user['orgAccess'] = user.pop('orgaccess') # requires camelcase # pprint.pprint(user) result = operations[operation](submitter, **user)
def start_containers(self, containers): """ Starts all given containers. """ started_containers = [] for container in containers: if self.verbose: print "Going to start the following container:" pprint.pprint(container) try: self.log("Starting container '%s'..." % container['MyName'], no_nl=True) self.client.start(container=container['Id']) started_containers.append(container) self.log("successful") except docker.errors.APIError as e: raise DockerError("Failed to start container: %s" % e) return started_containers #}}} #{{{ Stop containers
def test( strng ): global testnum print(strng) try: bnf = CORBA_IDL_BNF() tokens = bnf.parseString( strng ) print("tokens = ") pprint.pprint( tokens.asList() ) imgname = "idlParse%02d.bmp" % testnum testnum += 1 #~ tree2image.str2image( str(tokens.asList()), imgname ) except ParseException as err: print(err.line) print(" "*(err.column-1) + "^") print(err) print()
def runTest(self): from examples.jsonParser import jsonObject from test.jsonParserTests import test1,test2,test3,test4,test5 from test.jsonParserTests import test1,test2,test3,test4,test5 expected = [ [], [], [], [], [], ] import pprint for t,exp in zip((test1,test2,test3,test4,test5),expected): result = jsonObject.parseString(t) ## print result.dump() result.pprint() print() ## if result.asList() != exp: ## print "Expected %s, parsed results as %s" % (exp, result.asList())
def pass_one(thread, common_words): """Works through every post in the thread and returns a tuple of maps: ( {subject : [post_ordinals, ...], ...} {post_ordinals : set([subject, ...]), ...} ) """ subject_post_map = collections.defaultdict(list) post_subject_map = {} user_subject_map = collections.defaultdict(set) for i, post in enumerate(thread.posts): subjects = analyse_thread.match_words(post, common_words, 10, concorde_pub_map.WORDS_MAP) subjects |= analyse_thread.match_all_caps(post, common_words, concorde_pub_map.CAPS_WORDS) subjects |= analyse_thread.match_phrases(post, common_words, 2, concorde_pub_map.PHRASES_2_MAP) if post.pprune_sequence_num in concorde_pub_map.SPECIFIC_POSTS_MAP: subjects.add(concorde_pub_map.SPECIFIC_POSTS_MAP[post.pprune_sequence_num]) for subject in subjects: subject_post_map[subject].append(i) post_subject_map[post.pprune_sequence_num] = subjects user_subject_map[post.user.strip()] |= subjects # print('Post {:3d} subjects [{:3d}]: {}'.format(i, len(subjects), subjects)) # pprint.pprint(subject_map, width=200) return subject_post_map, post_subject_map, user_subject_map
def _wiki_table_sub(self, match): ttext = match.group(0).strip() #print 'wiki table: %r' % match.group(0) rows = [] for line in ttext.splitlines(0): line = line.strip()[2:-2].strip() row = [c.strip() for c in re.split(r'(?<!\\)\|\|', line)] rows.append(row) #pprint(rows) hlines = ['<table>', '<tbody>'] for row in rows: hrow = ['<tr>'] for cell in row: hrow.append('<td>') hrow.append(self._run_span_gamut(cell)) hrow.append('</td>') hrow.append('</tr>') hlines.append(''.join(hrow)) hlines += ['</tbody>', '</table>'] return '\n'.join(hlines) + '\n'
def tune_num_boost_round(): # global watchlist global num_boost_round global evals_result global eval_metric_xgb_format evals_result = {} xgb.train(params=params_no_sklearn,dtrain=dtrain,num_boost_round=num_boost_round,evals=watchlist,evals_result=evals_result) evals_result = evals_result['eval'][eval_metric_xgb_format] # pprint.pprint(evals_result) max = 0.0 max_loc = 0 for i,v in enumerate(evals_result): # print '%d ...... %d : %d'%(i,max_loc,max) if v>max: max = v max_loc = i # print "max_loc : %s , max : %s"%(max_loc,max) num_boost_round = max_loc+1 print('**** num_boost_round : ', num_boost_round)
def tune_num_boost_round(): # global watchlist global num_boost_round global evals_result evals_result = {} xgb.train(params=params_no_sklearn,dtrain=dtrain,num_boost_round=num_boost_round,evals=watchlist,evals_result=evals_result) evals_result = evals_result['eval']['map'] pprint.pprint(evals_result) max = 0.0 max_loc = 0 for i,v in enumerate(evals_result): # print '%d ...... %d : %d'%(i,max_loc,max) if v>max: max = v max_loc = i print "max_loc : %d , max : %d"%(max_loc,max) num_boost_round = max_loc+1 print '**** num_boost_round : ', num_boost_round
def update_neo4j_parallel_worker(results): """ Just a worker interface for the different Neo4j update executions. Input: - results: json-style dictionary. Check create_neo4j_ functions output for details Output: - res : dic, Output: None, creates/merges the nodes to the wanted database """ # Update Neo4j as usual from pprint import pprint #pprint(results) #print('~'*50) update_neo4j(results) # Return 1 for everything is ok return 1
def pprint(to_be_printed): """nicely formated print""" try: import pprint as pp # generate an instance PrettyPrinter # pp.PrettyPrinter().pprint(to_be_printed) pp.pprint(to_be_printed) except ImportError: if isinstance(to_be_printed, dict): print('{') for k, v in list(to_be_printed.items()): print("'" + k + "'" if isinstance(k, str) else k, ': ', "'" + v + "'" if isinstance(k, str) else v, sep="") print('}') else: print('could not import pprint module, appling regular print') print(to_be_printed)
def _submitSchema(self): jobIdMap = {} for p in self._schema["pipelines"]: # Add all jobs to the jobs table jobIdMap[p["name"]] = self._pipelineDbUtils.insertJob(None, None, p["name"], p["tag"], None, 0, p["request"]["pipelineArgs"]["logging"]["gcsPath"], None, None, None, None, None, json.dumps(p["request"])) for p in self._schema["pipelines"]: # Add dependency info to the job dependency table if "children" in p.keys() and len(p["children"]) > 0: for c in p["children"]: parentId = jobIdMap[p["name"]] childId = jobIdMap[c] self._pipelineDbUtils.insertJobDependency(parentId, childId) for p in self._schema["pipelines"]: # schedule pipelines parents = self._pipelineDbUtils.getParentJobs(jobIdMap[p["name"]]) self._pipelineDbUtils.updateJob(jobIdMap[p["name"]], setValues={"current_status": "WAITING"}, keyName="job_id") # if the job is a root job, send the job request to the queue msg = { "job_id": jobIdMap[p["name"]], "request": p["request"] } #pprint.pprint(msg) if len(parents) == 0: self._pipelineQueueUtils.publish(json.dumps(msg))
def debug(self, fh=sys.stderr): self.cursor.execute('select * from kv') pprint.pprint(self.cursor.fetchall(), stream=fh) self.cursor.execute('select * from kv_revisions') pprint.pprint(self.cursor.fetchall(), stream=fh)
def get_AFSCs(reddit): """ Returns a dict used to lookup AFSCs full_afsc_dict -> "enlisted": -> "1W0X1": -> "base_afsc": "1W0X1" -> "job_title": "Weather Technician" -> "shreds": {} -> "link": "https://www.reddit.com/r/AirForce/wiki/jobs/1w0x1" ... -> "officer": -> "12SX": -> "base_afsc": "12SX" -> "job_title": "Special Operations Combat Systems Officer" -> "shreds": -> "C": "AC-130H" "K": "MC-130H EWO" ... -> "link": "" ... :param reddit: PRAW reddit object :return: full_afsc_dict used for looking up AFSC information """ enlisted_dict = {"dict_type": "enlisted"} officer_dict = {"dict_type": "officer"} full_afsc_dict = {"enlisted": enlisted_dict, "officer": officer_dict} # add AFSCs and their titles add_afsc(enlisted_dict, "EnlistedAFSCs.csv") add_afsc(officer_dict, "OfficerAFSCs.csv") # add shreds to AFSCs add_shreds(enlisted_dict, "EnlistedShreds.csv") add_shreds(officer_dict, "OfficerShreds.csv") # add links to AFSCs add_afsc_links(full_afsc_dict, reddit) # uncomment to see full dictionary #pprint(full_afsc_dict) return full_afsc_dict
def import_summerdata(exampleName,actionDirectory): import parsingSummerActionAndFluentOutput fluent_parses = parsingSummerActionAndFluentOutput.readFluentResults(exampleName) action_parses = parsingSummerActionAndFluentOutput.readActionResults("{}.{}".format(actionDirectory,exampleName)) #import pprint #pp = pprint.PrettyPrinter(depth=6) #pp.pprint(action_parses) #pp.pprint(fluent_parses) return [fluent_parses, action_parses]
def bench_on(runner, sym, Ns, trials, dtype=None): global args, kernel, out, mkl_layer prepare = globals().get("prepare_"+sym, prepare_default) kernel = globals().get("kernel_"+sym, None) if not kernel: kernel = getattr(np.linalg, sym) out_lvl = runner.__doc__.split('.')[0].strip() func_s = kernel.__doc__.split('.')[0].strip() log.debug('Preparing input data for %s (%s).. ' % (sym, func_s)) args = [prepare(int(i)) for i in Ns] it = range(len(Ns)) # pprint(Ns) out = np.empty(shape=(len(Ns), trials)) b = body(trials) tic, toc = (0, 0) log.debug('Warming up %s (%s).. ' % (sym, func_s)) runner(range(1000), empty_work) kernel(*args[0]) runner(range(1000), empty_work) log.debug('Benchmarking %s on %s: ' % (func_s, out_lvl)) gc_old = gc.isenabled() # gc.disable() tic = time.time() runner(it, b) toc = time.time() - tic if gc_old: gc.enable() if 'reused_pool' in globals(): del globals()['reused_pool'] #calculate average time and min time and also keep track of outliers (max time in the loop) min_time = np.amin(out) max_time = np.amax(out) mean_time = np.mean(out) stdev_time = np.std(out) #print("Min = %.5f, Max = %.5f, Mean = %.5f, stdev = %.5f " % (min_time, max_time, mean_time, stdev_time)) #final_times = [min_time, max_time, mean_time, stdev_time] print('## %s: Outter:%s, Inner:%s, Wall seconds:%f\n' % (sym, out_lvl, mkl_layer, float(toc))) return out
def main(cb, args): # perform a single process search # processes = cb.process_search(args.get('query')) print "%-20s : %s" % ('Displayed Results', len(processes['results'])) print "%-20s : %s" % ('Total Results', processes['total_results']) print "%-20s : %sms" % ('QTime', int(1000*processes['elapsed'])) print '\n' # for each result for process in processes['results']: pprint.pprint(process) print '\n'
def main(argv): parser = build_cli_parser() opts, args = parser.parse_args(argv) if not opts.url or not opts.token or not opts.sensor: print "Missing required param; run with --help for usage" sys.exit(-1) cb = cbapi.CbApi(opts.url, token=opts.token, ssl_verify=opts.ssl_verify, ignore_system_proxy=True) with warnings.catch_warnings(): warnings.simplefilter("ignore") sensor = cb.sensor(opts.sensor) pprint.pprint(sensor)