我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用builtins.input()。
def update_bibs_in(grouped_bibs, db_abbrev): actions = { "y": lambda items: [update_in(bibs, db_abbrev) for bibs in items], "m": lambda items: [manual_update_in(bibs, db_abbrev) for bibs in items], "n": lambda items: items } print("\n ") action = input("Abbreviate everthing?" + "y(yes, automatic)/m(manual)/n(do nothing)") grouped_bibs.sort(key=operator.itemgetter('journal')) grouped_by_journal = [] for key, items in groupby(grouped_bibs, lambda i: i["journal"]): grouped_by_journal.append(list(items)) if action in ("y", "m", "n"): updated_bibs = actions.get(action)(grouped_by_journal) else: return update_bibs_in(grouped_bibs, db_abbrev) updated_bibs = reduce(lambda a, b: a+b, updated_bibs) return updated_bibs
def get_user_api_token(logger): """ Generate iAuditor API Token :param logger: the logger :return: API Token if authenticated else None """ username = input("iAuditor username: ") password = getpass() generate_token_url = "https://api.safetyculture.io/auth" payload = "username=" + username + "&password=" + password + "&grant_type=password" headers = { 'content-type': "application/x-www-form-urlencoded", 'cache-control': "no-cache", } response = requests.request("POST", generate_token_url, data=payload, headers=headers) if response.status_code == requests.codes.ok: return response.json()['access_token'] else: logger.error('An error occurred calling ' + generate_token_url + ': ' + str(response.json())) return None
def _setup_token(self): self.logger.warn("\n\n(One Time Setup) Please create a Personal Access Token") self.logger.warn("https://%s/profile/personal_access_tokens" % self.origin_domain) self.logger.warn("Scope: API, Expires: Never\n") token = input("Please enter your Personal Access Token: ") # Make request to resource that requires us to be authenticated path = 'projects/%s/labels' % self._url_encoded_path() url = urljoin(str(self._API()), path) res = requests.get( url, headers={"PRIVATE-TOKEN": token} ) if res.status_code == 200: return(token, None) return(-1, "Invalid Personal Access Token")
def first_auth(self, client_secrets): """Authenticate with Google API.""" flow = client.flow_from_clientsecrets( client_secrets, scope=[ 'https://www.googleapis.com/auth/admin.directory.group.readonly', # noqa 'https://www.googleapis.com/auth/admin.directory.group.member.readonly', # noqa 'https://www.googleapis.com/auth/apps.groups.settings' ], redirect_uri='urn:ietf:wg:oauth:2.0:oob') logger.debug('Generating authorization URL.') auth_uri = flow.step1_get_authorize_url() webbrowser.open(auth_uri) auth_code = input('Enter the auth code: ') logger.debug('Generating credentials.') self.credentials = flow.step2_exchange(auth_code)
def __init__(self, verbose): # Attempt to find the _ampersand.json configuration file try: config = build.get_json("_ampersand.json") root = p.dirname(p.abspath("./_ampersand.json")) except OSError: # Ask the user where to find the _ampersand.json file try: location = input("Enter the path (from here) to the root of " + "your project: ") config = build.get_json(p.join(location, "_ampersand.json")) root = p.abspath(location) except (KeyboardInterrupt, OSError) as e: print(str(e)) sys.exit() self.root = root self.config = config self.verbose = verbose
def update_out(bibs, db_abbrev): is_abreviation = input( "'{}' is a abreviation?y(yes)n(no): ".format(bibs[0]["journal"]) ) if is_abreviation == "y": full_name = input("Insert journal name:\n") abreviation = bibs[0]["journal"] elif is_abreviation == "n": abreviation = input("Insert abreviation:\n") full_name = bibs[0]["journal"] else: return update_out(bibs, db_abbrev) db_abbrev.insert(full_name, abreviation) for i, bib in enumerate(bibs): bibs[i]["journal"] = abreviation return bibs
def is_incomplete_source(self, src, filename="<input>", symbol="single"): """ Test if a given source code is incomplete. Incomplete code may appear in users interactions when user is typing a multi line command: for x in range(10): ... should continue here, but user already pressed enter! """ try: pytuga_src = self.transpile(src) except SyntaxError: return True return codeop.compile_command(pytuga_src, filename, symbol) is None
def get_data_Weather(): while True: print("Input the data for the Wunderground weather API.\n") API_KEY = input("API Key:\n") COUNTRY = input("Country:\n") CITY = input("City:\n") CITY_code = check_weather(API_KEY,COUNTRY, CITY) if CITY_code == False: if y_n("Retry?") == False: raise Exception("User abort on Weather Data select.") else: break while True: print("Input the data for the DWD FTP Server.\n") dwdname = input("Username:\n") dwdpass = input("Password:\n") if not check_dwdftp(dwdname, dwdpass): if y_n("Retry?") == False: raise Exception("User abort on DWD FTP Server data input.") else: break return InputWeather(API_KEY, CITY_code,dwdname,dwdpass)
def runBatchFpropWithGivenInput(hidden_size, X): """ run the LSTM model through the given input data. The data has dimension (seq_len, batch_size, hidden_size) """ # seq_len = X.shape[0] # batch_size = X.shape[1] input_size = X.shape[2] WLSTM = LSTM.init(input_size, hidden_size) # batch forward Hout, cprev, hprev, batch_cache = LSTM.forward(X, WLSTM) IFOGf = batch_cache['IFOGf'] Ct = batch_cache['Ct'] return Hout, IFOGf, Ct, batch_cache
def runBatchBpropWithGivenDelta(hidden_size, batch_cache, delta): """ run the LSTM model through the given input errors. The data has dimension (seq_len, batch_size, hidden_size) """ dH = delta # get the batched version gradients dX, dWLSTM, dc0, dh0 = LSTM.backward(dH, batch_cache) input_size = dWLSTM.shape[0] - hidden_size - 1 dWrecur = dWLSTM[-hidden_size:, :] dWinput = dWLSTM[1:input_size + 1, :] db = dWLSTM[0, :] return dX, dWrecur, dWinput, db, dWLSTM # ------------------- # TEST CASES # -------------------
def __prompt_for_role(account_name, role_names): border = "" spaces = "" for index in range(len(account_name)): border = "-" + border spaces = " " + spaces print('{}#------------------------------------------------{}#'.format(Colors.lblue,border)) print('# {}You have access to the following roles in {}{}{} #'.format(Colors.white,Colors.yellow,account_name,Colors.lblue)) print('# {}Which role would you like to assume?{}{} #'.format(Colors.white,Colors.lblue,spaces)) print('#------------------------------------------------{}#{}'.format(border,Colors.normal)) for index, role_name in enumerate(role_names): if role_name == "AccountAdministrator": print("\t{}{} {}{}".format(Colors.red, str(index).rjust(2), role_name,Colors.normal)) else: print("\t{}{}{} {}{}".format(Colors.white, str(index).rjust(2), Colors.cyan, role_name, Colors.normal)) while True: choice = input('{}Select role: {}'.format(Colors.lblue, Colors.normal)) try: return role_names[int(choice)] except: maximum = len(role_names) - 1 print('{}Please enter an integer between 0 and {}{}'.format(Colors.lred, maximum, Colors.normal))
def _get_user_ids(profile): # Ask for NetID, or use cached if user doesn't specify another cached_netid = load_last_netid(profile) if cached_netid: net_id_prompt = 'BYU Net ID [{}{}{}]: '.format(Colors.blue,cached_netid,Colors.normal) else: net_id_prompt = 'BYU Net ID: ' net_id = input(net_id_prompt) or cached_netid # Append the ADFS-required "@byu.local" to the Net ID if "@byu.local" in net_id: print('{}@byu.local{} is not required'.format(Colors.lblue,Colors.normal)) username = net_id else: username = '{}@byu.local'.format(net_id) return net_id, username
def repl(args, quantizer): dataset = Dataset(args.model) clf = load_clf(dataset, args) classes = loadClasses(dataset) try: while True: title = input("> ") X = quantizer.quantize(title) y_hat = clf.predict(X, 'dict')[0] yi = islice(iter(y_hat.items()), args.max_predict) nvals = [[str(classes[k]), v] for k, v in yi] pprint.pprint(nvals) except KeyboardInterrupt: pass
def compare_sequences(seq_filter=""): test_subdirs = ['TestAPS1', 'TestAPS2'] for subdir in test_subdirs: testdirs = glob.glob(os.path.join(BASE_TEST_DIR, subdir, '*')) for test in testdirs: # build up subdirectory name _, name = os.path.split(test) testfiles = glob.glob(os.path.join(test, '*.h5')) # recurse into subdirectories while len(testfiles) == 1 and os.path.isdir(testfiles[0]): _, subname = os.path.split(testfiles[0]) name = os.path.join(name, subname) testfiles = glob.glob(os.path.join(testfiles[0], '*')) newpath = os.path.join(BASE_AWG_DIR, subdir, name) newfiles = glob.glob(os.path.join(newpath, '*.h5')) #filter py27 look for py27 versions testfiles = filter_py27(testfiles) newfiles = filter_py27(newfiles) if seq_filter and any(seq_filter in seq_file for seq_file in testfiles): print("{0} comparing to {1}".format(test, newpath)) PulseSequencePlotter.plot_pulse_files_compare(testfiles, newfiles) c = input('Enter to continue (q to quit): ') if c == 'q': break
def yes_no_input(msg): if hasattr(__builtin__, 'raw_input'): input = __builtin__.raw_input else: input = builtins.input try: choice = input("{} [y/N]: ".format(msg)).lower() while True: if choice in ['y', 'ye', 'yes']: return True elif choice in ['n', 'no']: return False else: choice = input( "Please respond with 'yes' or 'no' [y/N]: ").lower() except (EOFError, KeyboardInterrupt): return False
def yn_choice(message, default='y'): """ Querry the user if he or she wants to proceed. Parameters ---------- message : Yes/No message presented to the user. default : Default action if no input is given Returns ------- bool : `True` for yes, else `False` """ choices = 'Y/n' if default.lower() in ('y', 'yes') else 'y/N' choice = input("%s (%s) " % (message, choices)) values = ('y', 'yes', '') if default == 'y' else ('y', 'yes') return choice.strip().lower() in values
def __init__(self, output_type="list", initial_len=1, interval=0, force_single_line=False, no_warning=False, sort_key=lambda x:x[0]): self.sort_key = sort_key self.no_warning = no_warning no_warning and print("All reprint warning diabled.") global is_atty # reprint does not work in the IDLE terminal, and any other environment that can't get terminal_size if is_atty and not all(get_terminal_size()): if not no_warning: r = input("Fail to get terminal size, we got {}, continue anyway? (y/N)".format(get_terminal_size())) if not (r and isinstance(r, str) and r.lower()[0] in ['y','t','1']): sys.exit(0) is_atty = False if output_type == "list": self.warped_obj = output.SignalList(self, [''] * initial_len) elif output_type == "dict": self.warped_obj = output.SignalDict(self, {}) self.interval = interval self.force_single_line = force_single_line self._last_update = int(time.time()*1000)
def download_from_scihub(doi, pdf_file): found, r = ScrapSci.navigate_to(doi, pdf_file) if not found: return False, r has_captcha, has_iframe = ScrapSci.check_captcha() while (has_captcha and has_iframe): captcha_img = ScrapSci.get_captcha_img() captcha_img.show() captcha_text = input("\tPut captcha:\n\t") has_captcha, has_iframe = ScrapSci.solve_captcha(captcha_text) if has_iframe: found, r = ScrapSci.download() found = has_iframe return has_iframe, r
def initialize_twiolio(): """Initialize twilio credentials from command line input and save in config file. :returns: None """ number = input('Twilio Number (e.g. +11234567890): ') account_sid = input('Twilio Account SID: ') auth_token = input('Twilio Authentication Token: ') config = configparser.ConfigParser() config.read(CONFIG_PATH) if config.remove_section('twilio'): print('\U00002B55 Old twilio credentials removed.') config.add_section('twilio') config.set('twilio', 'number', number) config.set('twilio', 'account_sid', account_sid) config.set('twilio', 'auth_token', auth_token) with open(CONFIG_PATH, 'w') as configfile: config.write(configfile) print('\U00002705 New twilio account added.')
def initialize_payment_msg(): """Initialize payment message to be appended to the charging details before sending to users (generally a mesaage to tell users how to pay you). :returns: None """ prompt_msg = ('You can enter a short message to put after the charge ' 'details to send to your users. (For example, letting your ' 'users know how to pay you)\n-> ') message = input(prompt_msg) config = configparser.ConfigParser() config.read(CONFIG_PATH) if config.remove_section('message'): print('\U00002B55 Old payment message removed.') config.add_section('message') config.set('message', 'payment', message) with open(CONFIG_PATH, 'w') as configfile: config.write(configfile) print('\U00002705 New payment message saved.')
def start_crawler(self, index, daemonize=False): """ Starts a crawler from the input-array. :param int index: The array-index of the site :param int daemonize: Bool if the crawler is supposed to be daemonized (to delete the JOBDIR) """ python = self.get_python_command() call_process = [python, self.__single_crawler, self.cfg_file_path, self.json_file_path, "%s" % index, "%s" % self.shall_resume, "%s" % daemonize] self.log.debug("Calling Process: %s", call_process) crawler = Popen(call_process, stderr=None, stdout=None) crawler.communicate() self.crawlers.append(crawler)
def prompt(self): symbol = {'playing': '|>', 'paused': '||', 'stopped': '[]', None: '--', } uri = self.uri prompt_line = 'Mopidy %s>> ' % ( # '{%s}(%s)' % (symbol[self.state], uri) '(%s)' % (symbol[self.state]) if self.mopidy.is_connected() else '[OFFLINE]' ) user_input = input(prompt_line).decode(sys.stdin.encoding) command_line = user_input.strip(' \t\n\r').split(' ') command = command_line[0].lower() args = command_line[1:] return command, args
def _add_cleanup(self, kp, path, update=False, branch=None, squash=False, message=None): self.git.index.add([path]) # Commit the knowledge post and rollback if it fails try: if message is None: message = input("Please enter a commit message for this post: ") self.git.index.commit(message) except (KeyboardInterrupt, Exception) as e: if message is None: logger.warning("No commit message input for post '{}'. Rolling back post addition...") else: logger.error("Something went wrong. Rolling back post addition...") self.git.index.reset() try: self.git.git.clean('-df', path) self.git.git.checkout('--', path) except: pass raise e
def checkDataParameters(self): """Run several prompts one after giving the user opportunity to verify key parameters """ while True: overideSamplesFlag = input("Default data parameters are:-\n\n Number of samples = %d\n Number of replicates = %d\n Number of QC replicates = %d\n Number of solvent replicates = %d\n File polarity mode = %s\n Column type = %s\n\nChange these sample parameters (Y or N)? [N]: " % (self.numberOfSamples, self.numberOfTechReps, self.numberOfQCReps, self.numberOfSolventReps, self.filePolarityMode, self.columnType)) or 'N' if overideSamplesFlag.upper() == 'N': break elif overideSamplesFlag.upper() == 'Y': self.setNumSamples() self.__setNumberOfTechReps__() self.__setNumberOfQCReps__() self.__setNumberOfSolventReps__() self.__setFilePolarityMode__() self.__setColumnType__() break else: print("\nY or N required!")
def setNumSamples(self): """Prompts the user for the number of samples Returns: int: The number of samples """ while True: try: self.numberOfSamples = int(input( " How many samples? [%s]: " % self.numberOfSamples) or self.numberOfSamples) if self.numberOfSamples >= 1: break else: print (" Integer >= 1 needed!") except ValueError: print (" Integer >= 1 needed!") # Function to set number of technical replicates in the run and handle # input errors
def setNumberOfFiles(self): """Prompts the user for the number of source files """ while True: try: tempNumberOfFiles = int(input( "How many files make up the run? [%d]: " % self.numberOfFiles) or self.numberOfFiles) if tempNumberOfFiles >= 1: self.numberOfFiles = tempNumberOfFiles break else: print ("Integer >= 1 needed!") except ValueError: print ("Integer >= 1 needed!") # create a loop to prompt user to input file names according to number of # files to append together
def get_xsf_stored(self, logger, charge, xsf_type='MoKa'): # Set up dictionary of the prestored scattering factors xsf_dict = {'MoKa': (pse_mo_xsf_1, pse_mo_xsf_2), 'CuKa': (pse_cu_xsf_1, pse_cu_xsf_2), 'CoKa': (pse_co_xsf_1, pse_co_xsf_2), 'FeKa': (pse_fe_xsf_1, pse_fe_xsf_2), 'CrKa': (pse_cr_xsf_1, pse_cr_xsf_2)} if not xsf_type in ['MoKa', 'CuKa', 'CoKa', 'FeKa', 'CrKa']: # Check for valid user input logger.pt_xsf_wrong_source() xsf_type = 'MoKa' # Get scattering factors from nuclear charge chosen_xsf_1, chosen_xsf_2 = xsf_dict[xsf_type] xsf1, xsf2 = np.take(chosen_xsf_1, charge - 1), np.take(chosen_xsf_2, charge - 1) # Return value(s) return xsf1, xsf2
def lg_multiple_occupation_found(self, logg_for='molecule1'): print("\n> WARNING: Atoms on identical positions were found!") print(" Please carefully check your input files!") if logg_for == 'molecule1': self.disorder_mol1 = True name = self.name_mol1 else: self.disorder_mol2 = True name = self.name_mol2 print(" Disordered positions in molecule: '" + str(name) + "'\n")
def get_user_input(): while True: from_x = input("from X number : ") if from_x.isdigit(): from_x = int(from_x) break while True: from_y = input("from Y number : ") if from_y.isdigit(): from_y = int(from_y) break while True: to_x = input("to X number : ") if to_x.isdigit(): to_x = int(to_x) break while True: to_y = input("to Y number : ") if to_y.isdigit(): to_y = int(to_y) break return from_x, from_y, to_x, to_y
def compile(source, filename, mode, flags=0, dont_inherit=False, compile_function=None): """ Similar to the built-in function compile() for Pytuguês code. The additional compile_function() argument allows to define a function to replace Python's builtin compile(). Args: source (str or code): Code to be executed. filename: File name associated with code. Use '<input>' for strings. mode: One of 'exec' or 'eval'. The second allows only simple statements that generate a value and is used by the eval() function. forbidden (bool): If true, initialize the forbidden lib functionality to enable i18n for Python builtins in C-level. compile_function (callable): A possible replacement for Python's built-in compile(). """ return pytuga_transpyler.compile(**_locals())
def start_crawler(self, index, daemonize=False): """ Starts a crawler from the input-array. :param int index: The array-index of the site :param int daemonize: Bool if the crawler is supposed to be daemonized (to delete the JOBDIR) """ call_process = [sys.executable, self.__single_crawler, self.cfg_file_path, self.json_file_path, "%s" % index, "%s" % self.shall_resume, "%s" % daemonize] self.log.debug("Calling Process: %s", call_process) crawler = Popen(call_process, stderr=None, stdout=None) crawler.communicate() self.crawlers.append(crawler)
def demo(base_url): """Login through a third-party OAuth handler and print some stats. Parameters ---------- base_url : str Base URL of the CMS server. """ session = requests.Session() adapter = HTTPAdapter(max_retries=Retry(total=3, backoff_factor=0.02)) session.mount('{}://'.format(urlparse(base_url).scheme), adapter) wb = webbrowser.get() login_url = os.path.join(base_url, "login?complete=no") session.get(login_url) wb.open(login_url) auth_url = input("Enter the URL returned after authentication:") response = session.get(auth_url.replace("complete=no", 'complete=yes')) assert response.status_code == 200 print(session.get(os.path.join(base_url, 'me')).content)
def parseArgs(description=None, prog='picopore'): checkDeprecatedArgs() if description is not None: description = __description + "\n\n" + description else: description = __description parser = ArgumentParser(description=description, prog=prog, formatter_class=RawDescriptionHelpFormatter) parser.add_argument("--mode", choices=('lossless', 'deep-lossless', 'raw'), help="choose compression mode", required=True) parser.add_argument("--revert", default=False, action="store_true", help="reverts files to original size (lossless modes only)") parser.add_argument("--fastq", action=AutoBool, default=True, help="retain FASTQ data (raw mode only)") parser.add_argument("--summary", action=AutoBool, default=False, help="retain summary data (raw mode only)") parser.add_argument("--manual", default=None, help="manually remove only groups whose paths contain STR (raw mode only, regular expressions permitted, overrides defaults)", metavar="STR") parser = addCommonArgs(parser) args = parser.parse_args() args.input = checkInputs(args) args.group = "all" # TODO: is it worth supporting group? return args
def login_interactive(app, email=None): print_out("Log in to <green>{}</green>".format(app.instance)) if email: print_out("Email: <green>{}</green>".format(email)) while not email: email = input('Email: ') password = getpass('Password: ') try: print_out("Authenticating...") response = api.login(app, email, password) except api.ApiError: raise ConsoleError("Login failed") return create_user(app, email, response['access_token'])
def __init__(self, accept_input=False, default_measure=0): """ Initialize a circuit drawing engine. The TikZ code generator uses a settings file (settings.json), which can be altered by the user. It contains gate widths, heights, offsets, etc. Args: accept_input (bool): If accept_input is true, the printer queries the user to input measurement results if the CircuitDrawer is the last engine. Otherwise, all measurements yield the result default_measure (0 or 1). default_measure (bool): Default value to use as measurement results if accept_input is False and there is no underlying backend to register real measurement results. """ BasicEngine.__init__(self) self._accept_input = accept_input self._default_measure = default_measure self._qubit_lines = dict() self._free_lines = [] self._map = dict()
def __init__(self, accept_input=True, default_measure=False, in_place=False): """ Initialize a CommandPrinter. Args: accept_input (bool): If accept_input is true, the printer queries the user to input measurement results if the CommandPrinter is the last engine. Otherwise, all measurements yield default_measure. default_measure (bool): Default measurement result (if accept_input is False). in_place (bool): If in_place is true, all output is written on the same line of the terminal. """ BasicEngine.__init__(self) self._accept_input = accept_input self._default_measure = default_measure self._in_place = in_place
def push_branch_to_remote(source): cmd = "git push origin HEAD:%s" % source u = input("\n\nPushing local branch [%s] to remote [%s], Yes or no? " % (source, source)) if u.lower() != 'yes' and u.lower() != 'y': logger.warn("Exiting...\n") sys.exit() output = run_cmd(cmd) logger.warn(output)
def _req_user_pass(self): self.logger.warn("\n\n(One Time Setup) Please enter credentials to request API key") user = input("%s username: " % self.SERVICE_NAME) pw = getpass.getpass("%s password: " % self.SERVICE_NAME) return (user, pw)
def update_bibs_get_doi(bibs): action = input("Get DOI absent using the title?y(yes, automatic)/m(manual)/n(do nothing)") if action not in ("y", "m", "n"): return update_bibs_get_doi(bibs) if action != "n": get_first = True if action == "y" else False for i, bib in enumerate(bibs): bibs[i] = update_bib(bib, get_first) return bibs
def update_bibs_from_doi(bibs): action = input("Update bibs using DOI field?y(yes)/n(do nothing)") if action not in ("y", "n"): return update_bibs_from_doi(bibs) if action == "n": return bibs for i, bib in enumerate(bibs): bibs[i] = update_bib(bib) return bibs
def manual_update_in(bibs, db_abbrev): actions = { "y": lambda: update_in(bibs, db_abbrev), "n": lambda: bibs, "c": lambda: update_in(bibs, db_abbrev, custom=True) } question = "Replace '{}' for {}? y(yes)/n(no)/c(custom): " question = question.format(bibs[0]["journal"], bibs[0]["_text"]) action = input(question) try: return actions.get(action)() except TypeError: return manual_update_in(bibs, db_abbrev)
def update_in(bibs, db_abbrev, custom=False): if custom: abbrev = input("Insert abreviation:\n") db_abbrev.update( bibs[0]["journal"], abbrev ) else: abbrev = bibs[0]["_text"] for i, bib in enumerate(bibs): bibs[i]["journal"] = abbrev return bibs
def update_bibs_arxiv(bibs): action = input("Check if arxiv have been published?y(yes, automatic)/m(manual)/n(do nothing)") if action not in ("y", "m", "n"): return update_bibs_arxiv(bibs) if action != "n": automatic = True if action == "y" else False for i, bib in enumerate(bibs): if "journal" in bib: if "arxiv" in bib["journal"].lower(): bib_updated = update_bib(bib, automatic, i) bibs[i] = bib_updated return bibs
def update_bibs_out(grouped_bibs, db_abbrev): print("\nThe next items does not exist in database.") action = input("Manually update your database?y(yes)/n(do nothing)") if action == "n": return grouped_bibs elif action != "y": return update_bibs_out(grouped_bibs, db_abbrev) grouped_bibs.sort(key=operator.itemgetter('journal')) updated_bibs = [] for key, items in groupby(grouped_bibs, lambda i: i["journal"]): updated_bibs.append(update_out(list(items), db_abbrev)) updated_bibs = reduce(lambda a, b: a+b, updated_bibs) return updated_bibs
def get_input(self, text): return input(text)
def ratio_to_delta(self, isos_ss, ratio, oneover=False): ''' Transforms an isotope ratio into a delta value Parameters ---------- isos_ss: list or float list w/ isotopes, e.g., ['N-14','N-15'] OR the solar system ratio. ratio : float ratio of the isotopes to transform. oneover : boolean take the inverse of the ratio before transforming (never inverse of delta value!). The default is False. Returns ------- float delta value ''' # define if isos_ss is the ratio or the isotopes if type(isos_ss) == float: ss_ratio = isos_ss elif type(isos_ss) == list: ss_ratio = self.inut.isoratio_init(isos_ss) else: print('Check input of isos_ss into ratio_to_delta routine') return None # check if one over is necessary or not if oneover: ratio = old_div(1,ratio) # calculate delta value delta = (old_div(ratio, ss_ratio) - 1.) * 1000. return delta
def iso_name_converter(iso): ''' Converts the name of the given isotope (input), e.g., 'N-14' to 14N as used later to compare w/ grain database. ''' sp = iso.split('-') output = sp[1] + sp[0] return output.lower()
def after_command(self, cmd): super(ReadlineAlikeReader, self).after_command(cmd) if self.more_lines is None: # Force single-line input if we are in raw_input() mode. # Although there is no direct way to add a \n in this mode, # multiline buffers can still show up using various # commands, e.g. navigating the history. try: index = self.buffer.index("\n") except ValueError: pass else: self.buffer = self.buffer[:index] if self.pos > len(self.buffer): self.pos = len(self.buffer)
def multiline_input(self, more_lines, ps1, ps2, returns_unicode=False): """Read an input on possibly multiple lines, asking for more lines as long as 'more_lines(unicodetext)' returns an object whose boolean value is true. """ reader = self.get_reader() saved = reader.more_lines try: reader.more_lines = more_lines reader.ps1 = reader.ps2 = ps1 reader.ps3 = reader.ps4 = ps2 return reader.readline(returns_unicode=returns_unicode) finally: reader.more_lines = saved
def confirmation_required(func): """Asks for the user's confirmation before calling the decorated function. This step is ignored when the script is not run from a TTY.""" @wraps(func) def wrapper(self, opts, *args, **kwargs): cli_mode = getattr(opts, 'cli_mode', False) if cli_mode and not opts.assume_yes: confirmation = input("The {} operation cannot be undone. " "Are you sure? [y/N] ".format(func.__name__)) if not confirmation.lower().startswith("y"): return opts.assume_yes = True return func(self, opts, *args, **kwargs) return wrapper