我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用builtins.dict()。
def main(post, name, password, text): """?? ??? ?????.""" data = _make_comment_data(post, name, password, text) SESSION.headers['Referer'] = _get_comment_referrer_url(post) SESSION.headers['X-Requested-With'] = 'XMLHttpRequest' # Post comment r = SESSION.post(url=URL.COMMENT_SUBMIT, data=data) # Post comment count data = dict(ci_t=post['ci_t'], id=post['gall_name'], no=post['post_num']) SESSION.post(url=URL.COMMENT_COUNT, data=data) # Post view count data['comment_page'] = 1 SESSION.post(url=URL.COMMENT_VIEW, data=data) return r
def openscope(self, customlocals=None): '''Opens a new (embedded) scope. Args: customlocals (dict): By default, the locals of the embedding scope are visible in the new one. When this is not the desired behaviour a dictionary of customized locals can be passed, and those locals will become the only visible ones. ''' self._locals_stack.append(self._locals) self._globalrefs_stack.append(self._globalrefs) if customlocals is not None: self._locals = customlocals.copy() elif self._locals is not None: self._locals = self._locals.copy() else: self._locals = {} self._globalrefs = set() self._scope = self._globals.copy() self._scope.update(self._locals)
def update_file(self, filepath, df, notes=None): """ Sets a new DataFrame for the DataFrameModel registered to filepath. :param filepath (str) The filepath to the DataFrameModel to be updated :param df (pandas.DataFrame) The new DataFrame to register to the model. :param notes (str, default None) Optional notes to register along with the update. """ assert isinstance(df, pd.DataFrame), "Cannot update file with type '{}'".format(type(df)) self._models[filepath].setDataFrame(df, copyDataFrame=False) if notes: update = dict(date=pd.Timestamp(datetime.datetime.now()), notes=notes) self._updates[filepath].append(update) self._paths_updated.append(filepath)
def to_dict(self): """ Convert the named hash to a dictionary Returns: dict: Dictionary representing this enum type """ d = {} d['name'] = self.name d['type'] = self.typ d['description'] = self.description d['fields'] = self.fields return d
def set_description(self, name, description, version, custom_fields = None): """ Set the description of this RPC service Args: name (str): Name of the service description (str): Description of the service version (str): Version of the service custom_fields (dict): A dict with user-defined fields """ self.description['name'] = name self.description['description'] = description self.description['version'] = version if custom_fields is None: custom_fields = {} self.description['custom_fields'] = custom_fields
def handle_error(self, e, reply): """ Rewrite a reply dict for an exception caught during RPC function execution Args: e (Exception): The exception caught when executing an RPC function reply (dict): The reply we wanted to send to the client before the error occured Returns: dict: The modified reply dict now containing information about the error """ try: raise e except JsonRpcError as e: reply['error'] = e.to_dict() reply['result'] = None except Exception as e: traceback.print_exc() error = JsonRpcInternalError("Internal error") reply['error'] = error.to_dict() reply['result'] = None return reply
def test_metabolite_id_namespace_consistency(read_only_model, store): """Expect metabolite IDs to be from the same namespace.""" overview = annotation.generate_component_id_namespace_overview( read_only_model, "metabolites") store['met_ids_in_each_namespace'] = dict( (key, int(val)) for key, val in overview.iteritems()) distribution = overview.sum() cols = list(distribution.index) largest = distribution[cols].idxmax() if largest != 'bigg.metabolite': warn( 'memote currently only supports syntax checks for BiGG identifiers.' ' Please consider mapping your IDs from {} to BiGG' ''.format(largest) ) assert distribution[largest] == len(read_only_model.metabolites), \ "Metabolite IDs that don't belong to the largest fraction: {}"\ "".format(", ".join(overview.loc[~overview[largest], largest].index))
def test_reaction_id_namespace_consistency(read_only_model, store): """Expect reaction IDs to be from the same namespace.""" overview = annotation.generate_component_id_namespace_overview( read_only_model, "reactions") store['rxn_ids_in_each_namespace'] = dict( (key, int(val)) for key, val in overview.iteritems()) distribution = overview.sum() cols = list(distribution.index) largest = distribution[cols].idxmax() if largest != 'bigg.reaction': warn( 'memote currently only supports syntax checks for BiGG identifiers.' ' Please consider mapping your IDs from {} to BiGG' ''.format(largest) ) assert distribution[largest] == len(read_only_model.metabolites), \ "Reaction IDs that don't belong to the largest fraction: {}" \ "".format(", ".join(overview.loc[~overview[largest], largest].index))
def _collect_consistency_plots(self): """Create plots from the consistency info data frame.""" df = self.bag.get_consistency_dataframe() plots = dict() plots["is_consistent"] = plt.boolean_chart( df[[self.index, "is_consistent"]], "Is Stoichiometrically Consistent") plots["unconserved_metabolites"] = plt.scatter_line_chart( df[[self.index, "unconserved_metabolites"]], "Number of Unconserved Metabolites") plots["magic_atp_production"] = plt.boolean_chart( df[[self.index, "magic_atp_production"]], "Has Magic ATP Production") plots["imbalanced_reactions"] = plt.scatter_line_chart( df[[self.index, "imbalanced_reactions"]], "Number of Imbalanced Reactions") plots["blocked_reactions"] = plt.scatter_line_chart( df[[self.index, "blocked_reactions"]], "Number of Blocked Reactions") # TODO: Fix looped in tests. # plots["looped_reactions"] = plt.scatter_line_chart( # df[[self.index, "looped_reactions"]], # "Number of Looped Reactions") return plots
def _collect_biomass_plots(self): """Create plots from the biomass info data frame.""" df = self.bag.get_biomass_dataframe() plots = dict() # components sum factor = "biomass_ids" plots["biomass_sum"] = plt.scatter_line_chart( df[[self.index, "biomass_sum", factor]], r"$ \text{Sum of Biomass Components }" r"[ \text{mmol} \text{g}_{\text{DW}}^{-1} \text{h}^{-1} ] $") plots["biomass_default_flux"] = plt.scatter_line_chart( df[[self.index, "biomass_default_flux", factor]], r"$ \text{Biomass Flux }" r"[ \text{mmol} \text{g}_{\text{DW}}^{-1} \text{h}^{-1} ] $") plots["num_default_blocked_precursors"] = plt.scatter_line_chart( df[[self.index, "num_default_blocked_precursors", factor]], "Number of Blocked Biomass Precursors in Default Medium") plots["num_open_blocked_precursors"] = plt.scatter_line_chart( df[[self.index, "num_open_blocked_precursors", factor]], "Number of Blocked Biomass Precursors in Complete Medium") plots["gam_in_biomass"] = plt.boolean_chart( df[[self.index, "gam_in_biomass"]], "Biomass Contains Growth-associated Maintenance") return plots
def build_index(self): """Build a data index either from timestamps and commit hashes.""" LOGGER.debug("Building index...") expected = pd.DataFrame({ "timestamp": pd.Series(dtype="datetime64[ns]"), "commit_hash": pd.Series(dtype="str") }) df = self._bag.pluck("meta", dict()).to_dataframe(expected).compute() df.set_index( "commit_hash", drop=True, inplace=True, verify_integrity=True) trunc = 5 res = df.index.str[:trunc] while len(res.unique()) < len(df): trunc += 1 res = df.index.str[:trunc] df["commit_hash"] = res.copy() df.sort_values("timestamp", inplace=True, kind="mergesort") self._index = df LOGGER.debug("%s", str(df))
def clear(cls): '''Clear the global list of Peekers.''' cls._peekers = dict()
def _find_num_copies(**attribs): """ Return the number of copies to make from the length of attribute values. Keyword Args: attribs: Dict of Keyword/Value pairs for setting object attributes. If the value is a scalar, then the number of copies is one. If the value is a list/tuple, the number of copies is the length of the list/tuple. Returns: The length of the longest value in the dict of attributes. Raises: Exception if there are two or more list/tuple values with different lengths that are greater than 1. (All attribute values must be scalars or lists/tuples of the same length.) """ num_copies = set() for k, v in attribs.items(): if isinstance(v, (list, tuple)): num_copies.add(len(v)) else: num_copies.add(1) num_copies = list(num_copies) if len(num_copies) > 2: logger.error("Mismatched lengths of attributes: {}!".format( num_copies)) raise Exception elif len(num_copies) > 1 and min(num_copies) > 1: logger.error("Mismatched lengths of attributes: {}!".format( num_copies)) raise Exception try: return max(num_copies) except ValueError: return 0 # If the list if empty. ##############################################################################
def _load_sch_lib_skidl(self, filename=None, lib_search_paths=None): """ Load the parts from a SKiDL schematic library file. Args: filename: The name of the SKiDL schematic library file. """ f = _find_and_open_file(filename, lib_search_paths, lib_suffixes[SKIDL], allow_failure=True) if not f: logger.warning('Unable to open SKiDL Schematic Library File {}.\n'.format(filename)) return try: # The SKiDL library is stored as a Python module that's executed to # recreate the library object. vars = {} # Empty dictionary for storing library object. exec(f.read(), vars) # Execute and store library in dict. # Now look through the dict to find the library object. for v, val in vars.items(): if isinstance(val, SchLib): # Overwrite self with the new library. self.__dict__.update(val.__dict__) return # Oops! No library object. Something went wrong. raise Exception('No SchLib object found in {}'.format(filename)) except Exception as e: logger.error('Problem with {}'.format(f)) logger.error(e) raise Exception
def main(post, gall_name_en, comment_num, password): """??? ?????.""" data = dict(ci_t=post['ci_t'], no=post['post_num'], id=gall_name_en, p_no=post['post_num'], re_no=comment_num, orgin_no=0, password=password, best_orgin='', check_7=post['check_7']) SESSION.headers['X-Requested-With'] = 'XMLHttpRequest' return SESSION.post(url=URL.COMMENT_DELETE_SUBMIT, data=data)
def _yield_comment_html(post, cmt_all=True): """? ???? html? yield???.""" if cmt_all: max_pages = post['total_pages'] else: max_pages = 1 for page in range(1, max_pages + 1): data = dict(ci_t=post['ci_t'], id=post['gall_name'], no=post['post_num'], comment_page=page) SESSION.headers['X-Requested-With'] = 'XMLHttpRequest' r = SESSION.post(url=URL.COMMENT_VIEW, data=data).text yield r
def upvote(gall_name_en, post_num, ci_t): """?? ??? ??? ???.""" SESSION.headers['X-Requested-With'] = 'XMLHttpRequest' SESSION.headers['Referer'] = URL.POST_VOTE_UP SESSION.cookies[gall_name_en + str(post_num) + '_Firstcheck'] = 'Y' data = dict(ci_t=ci_t, id=gall_name_en, no=post_num, recommend=0, vote='vote', user_id='') return SESSION.post(url=URL.POST_VOTE_UP, data=data)
def downvote(gall_name_en, post_num, ci_t): """?? ??? ???.""" SESSION.headers['X-Requested-With'] = 'XMLHttpRequest' SESSION.headers['Referer'] = URL.POST_VOTE_DOWN SESSION.cookies[gall_name_en + str(post_num) + '_Firstcheck'] = 'Y' SESSION.cookies[gall_name_en + str(post_num) + '_Firstcheck_down'] = 'Y' data = dict(ci_t=ci_t, id=gall_name_en, no=post_num, recommend=0, vote='vote', user_id='') return SESSION.post(url=URL.POST_VOTE_DOWN, data=data)
def _get_key(ci_t, gall_name_en, password, post_num): """key? ????.""" SESSION.headers['X-Requested-With'] = 'XMLHttpRequest' data = dict(ci_t=ci_t, password=password, id=gall_name_en, no=post_num) con = SESSION.post(url=URL.DELETE_POST2, data=data) key = con.text.split('||')[1] return key
def _get_block_key(block_key, ci_t, r_key, gall_name_en): """block_key? ????.""" while 1: url = 'http://gall.dcinside.com/block/block/' SESSION.headers['X-Requested-With'] = 'XMLHttpRequest' SESSION.cookies['dcgame_top'] = 'Y' data = dict(ci_t=ci_t, id=gall_name_en, block_key=block_key) con = SESSION.post(url, data=data) if con.text != '': return con.text # ?? ??? ?? ?? ?? ? ?? ?? ? time.sleep(0.5)
def _submit(gall_name_en, ci_t, r_key, block_key, subject, password, name, memo): """?? ?????.""" url = 'http://gall.dcinside.com/forms/article_submit' data = dict( upload_status='N', id=gall_name_en, ci_t=ci_t, subject=subject, password=password, r_key=r_key, name=name, memo=memo, block_key=block_key, vid='', isMovie='', campus=0, ipt_movieCompType='', wiki_tag='', sijosae='tlwhtorororRl' ) SESSION.headers['Referer'] = URL.POST_CREATE.format( gall_name_en=gall_name_en) return SESSION.post(url, data=data)
def post_parser(html, gall_en_name, post_num): d = pq(html) gall_name, post_num = gall_en_name, post_num comments_count = _parse_comments_count(d) post = dict( gall_name=gall_name, post_num=post_num, imgs1=_parse_imgs1(d), imgs2=_parse_imgs2(d), title=_parse_title(d), author=_parse_author(d), views=_parse_views_count(d), comments_count=comments_count, content=_parse_text_content(d), html_content=_parse_html_content(d), datetime=_parse_datetime(d), IP=_parse_ip(d), user_id=_parse_author_id(d), ci_t=_parse_ci_t(d), total_pages=_parse_total_pages(comments_count), check_6=_parse_check_6(d), check_7=_parse_check_7(d), check_8=_parse_check_8(d), delete_values=_parse_delete_values(d), upvote=_parse_upvote(d), downvote=_parse_downvote(d), mandu=_parse_mandu(d), ) return post
def parse_td(td_list): post_number = get_number_from_lxml_elem(td_list[0]) author = [i for i in td_list[2].itertext()][0] return dict( post_num=post_number, subject=pq(td_list[1])('a')[0].text_content(), reply_num=parse_int(pq(td_list[1])('em').text()), post_type=pq(td_list[1])('a')[0].get('class'), author=author, user_id=pq(td_list[2])[0].get('user_id'), date=datetime.strptime(td_list[3].text_content(), '%Y.%m.%d'), views=get_number_from_lxml_elem(td_list[4]), recommended_num=get_number_from_lxml_elem(td_list[5]) )
def __init__(self): self.filters = dict() self.tags = dict() self.takes_context = dict()
def _request_authentication(self): """Ask Open-Spending if the token is valid. """ query = dict(jwt=self.token) response = authenticate_user(params=query) authentication = handle(response) if not authentication['authenticated']: message = 'Token has expired: request a new one' log.error(message) raise InvalidToken(message) name = authentication['profile']['name'] log.info('Hello %s! You are logged into Open-Spending', name) return authentication
def _request_permissions(self): """Request permissions for Open-Spending services. """ permissions = {} for service in OS_SERVICES: query = dict(jwt=self.token, service=service) response = authorize_user(params=query) permission = handle(response) permissions.update({service: permission}) return permissions
def _get_restricted_builtins(cls): bidict = dict(cls._RESTRICTED_BUILTINS) major = sys.version_info[0] if major == 2: bidict['True'] = True bidict['False'] = False return bidict
def process_file(self, infile, outfile=None): '''Processes input file and writes result to output file. Args: infile (str): Name of the file to read and process. If its value is '-', input is read from stdin. outfile (str, optional): Name of the file to write the result to. If its value is '-', result is written to stdout. If not present, result will be returned as string. env (dict, optional): Additional definitions for the evaluator. Returns: str: Result of processed input, if no outfile was specified. ''' infile = STDIN if infile == '-' else infile output = self._preprocessor.process_file(infile) if outfile is None: return output else: if outfile == '-': outfile = sys.stdout else: outfile = _open_output_file(outfile, self._create_parent_folder) outfile.write(output) if outfile != sys.stdout: outfile.close()
def process_text(self, txt): '''Processes a string. Args: txt (str): String to process. env (dict, optional): Additional definitions for the evaluator. Returns: str: Processed content. ''' return self._preprocessor.process_text(txt)
def __init__(self, name, log_git_hash=True, use_visdom=False, visdom_opts=None, time_indexing=True, xlabel=None): """ Create an experiment with the following parameters: - log_git_hash (bool): retrieve current commit hash to log code status - use_visdom (bool): monitor metrics logged on visdom - visdom_opts (dict): options for visdom - time_indexing (bool): use time to index values (otherwise counter) """ super(Experiment, self).__init__() self.name = name.split('/')[-1] self.name_and_dir = name self.date_and_time = time.strftime('%d-%m-%Y--%H-%M-%S') self.logged = defaultdict(OrderedDict) self.metrics = defaultdict(dict) self.registered = [] self.config = dict() self.use_visdom = use_visdom self.time_indexing = time_indexing if self.use_visdom: self.plotter = Plotter(self, visdom_opts, xlabel) if log_git_hash: self.log_git_hash()
def log_git_hash(self): try: repo = git.Repo(search_parent_directories=True) git_hash = repo.head.object.hexsha head = repo.head.commit.tree git_diff = repo.git.diff(head) self.log_config(dict(git_hash=git_hash, git_diff=git_diff), to_visdom=False) except: print("I tried to find a git repository in current " "and parent directories but did not find any.")
def _dict_process(my_dict): logged = defaultdict(OrderedDict) for key in my_dict['logged'].keys(): splitted = key.split('_') if not len(splitted): splitted.append("default") name, tag = '_'.join(splitted[:-1]), splitted[-1] values = my_dict['logged'].pop(key) # sort values based on x-value values = sorted(values.items(), key=lambda x: float(x[0])) logged[tag][name] = OrderedDict(values) my_dict['logged'] = logged # no need for an ordered dictionary for config my_dict['config'] = dict(my_dict['config']) return my_dict
def __repr__(self): fmt = 'Single {contract}: {side} {qty}@{pricetimelimit} ({cp})' kwargs = dict(contract=self.contract, side=self.side, qty=self.quantity, pricetimelimit=self.price_time_limit, cp=self.counterparty) return fmt.format(**kwargs) # TODO: Can combine if contract can hold same info as contract_bundle
def __repr__(self): fmt = 'Bundle {contract}: {side} {qty} ({cp})' kwargs = dict(contract=self.contract_bundle, side=self.side, qty=self.quantity, cp=self.counterparty) return fmt.format(**kwargs)
def safeurlencode(data): data = dict(data) return urlencode( dict((convert.to_bytes(x, x), convert.to_bytes(y, y)) for x, y in data.items()))
def json2py(json_type): mapping = {'bool': 'bool', 'int': 'int', 'float': 'float', 'string': 'str', 'array': 'list', 'hash': 'dict', 'base64': 'str'} if json_type not in mapping: return None return mapping[json_type]
def to_dict(self): """ Convert the error to a dictionary Returns: dict: Dictionary representing this error """ error = {} error['name'] = self.name error['message'] = self.msg return error
def to_dict(self): """ Convert the enum to a dictionary Returns: dict: Dictionary representing this enum type """ d = {} d['name'] = self.name d['type'] = self.typ d['description'] = self.description d['values'] = self.values return d
def to_dict(self): """ Convert the function description to a dictionary Returns: dict: Dictionary representing this error """ d = {} d['name'] = self.name d['description'] = self.description d['result_type'] = self.result_type d['result_desc'] = self.result_desc params = [] for p in self.params: param = {} param['name'] = p['name'] param['description'] = p['description'] param['type'] = p['type'] params.append(param) d['params'] = params return d
def describe_service(self): """ Return the self-description of this RPC service Returns: dict: Description of this service """ return self.description
def call_function(self, rpcfunction, rpcinfo, *params): """ Execute the actual function Params: rpcfunction (RpcFunction): RPC function object representing a function rpcinfo (dict): Additional information to pass to the function """ if rpcfunction.requires_rpcinfo: return rpcfunction.func(rpcinfo, *params) return rpcfunction.func(*params)
def __init__(self, segid, functions, other_components_id_list=[]): """Builds a new SegmentPlotList :param functions: functions which must return a `Plot` (or an Plot-convertible object). **the first item MUST be `_getme` by default** """ super(SegmentPlotList, self).__init__([None] * len(functions)) self.functions = functions # use dict instead of {} to make it py2 compatible (see future imports at module's top) self.data = dict(stream=None, plot_title_prefix='', **({k: None for k in self._data_to_invalidate})) self.segment_id = segid self.oc_segment_ids = other_components_id_list
def get_plots(self, session, plot_indices, inv_cache, config): ''' Returns the list of `Plot`s representing the the custom functions of the segment identified by `seg_id. The length of the returned list equals `len(plot_indices)`. The list can be manipulated without affecting the stored internal list, the elements are passed by reference and thus each element modification affects the stored element :param seg_id: (integer) a valid segment id (i.e., must be the id of one of the segments passed in the constructor) :param inv: (inventory object) an object either inventory or exception (will be handled by `exec_function`, which is called internally) :param config: (dict) the plot config parsed from a user defined yaml file :param all_components_on_main_plot: (bool) if True, and the index of the main plot (usually 0) is in plot_indices, then the relative plot will show the stream and and all other components together ''' with enhancesegmentclass(config): index_of_main_plot = 0 # the index of the function returning the # trace plot (main plot returning the trace as it is) ret = [] for i in plot_indices: if self[i] is None: # trace either trace or exception (will be handled by exec_function: # skip calculation and return empty trace with err message) # inv: either trace or exception (will be handled by exec_function: # append err mesage to warnings and proceed to calculation normally) self[i] = self.get_plot(self.functions[i], session, inv_cache, config, func_name='' if i == index_of_main_plot else None) ret.append(self[i]) return ret
def update(self, *args, **kwargs): # python2 compatibility (python3 calls __setitem__) if args: if len(args) > 1: raise TypeError("update expected at most 1 arguments, got %d" % len(args)) other = dict(args[0]) for key in other: super(LimitedSizeDict, self).__setitem__(key, other[key]) for key in kwargs: super(LimitedSizeDict, self).__setitem__(key, kwargs[key]) self._check_size_limit()
def __init__(self, size_limit=30): super(InventoryCache, self).__init__(size_limit=size_limit) self._segid2staid = dict()
def thai_num_to_num(text): """?????? ''str'' ?????? ''str'' ????????????????""" thaitonum = dict((x[2], x[1]) for x in p[1:]) return thaitonum[text] #????????????????
def thai_num_to_text(text): """?????? ''str'' ?????? ''str'' ????????????????????""" thaitonum = dict((x[2], x[0]) for x in p[1:]) return thaitonum[text] #????????????
def num_to_thai_num(text): """?????? ''str'' ?????? ''str'' ????????????????""" thaitonum = dict((x[1], x[2]) for x in p[1:]) return thaitonum[text] #?????????????
def num_to_text(text): """?????? ''str'' ?????? ''str'' ?????????????????""" thaitonum = dict((x[1], x[0]) for x in p[1:]) return thaitonum[text] #?????????????
def text_to_num(text): """?????? ''str'' ?????? ''str'' ?????????????????""" thaitonum = dict((x[0], x[1]) for x in p[1:]) return thaitonum[text] #????????????????