我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用prompt_toolkit.prompt()。
def post_message(self, channel_name): # self.channels_history(channel_name) os.system("echo '\u001b[1m\u001b[31m To mention a user write @ while chatting \u001b[0m'") text = prompt("your message > ", completer=WordCompleter(users)) channel_id = self.find_channel_id(channel_name) url = "https://slack.com/api/chat.postMessage?token={token}&channel={channel_id}&text={text}&as_user=true&link_names=1".format( token=settings.token, text=text, channel_id=channel_id) response = requests.get(url).json() # TODO : retrieve message history and print to screen while chatting if response["ok"]: os.system("figlet 'Sent' | lolcat") time.sleep(2) os.system("clear") else: print "something goes wrong :( (\u001b[1m\u001b[31m " + response["error"] + "\u001b[0m)"
def channels_invite(self, channel_name): channel_id = self.find_channel_id(channel_name) invites = prompt("send invites -> ", completer=WordCompleter(users), style=DocumentStyle) for i in invites.split(" "): user_id = self.find_user_id(i.strip("@")) url = "https://slack.com/api/channels.invite?token={token}&channel={channel_id}&user={user}".format( token=settings.token, channel_id=channel_id, user=user_id) response = requests.get(url).json() if response["ok"]: os.system("figlet 'Invited " + i + "' | lolcat") time.sleep(2) os.system("clear") else: print "something goes wrong :( (\u001b[1m\u001b[31m " + response["error"] + "\u001b[0m)"
def channels_create(self): os.system("clear") fields = ["name", "purpose(OPTINAL)", "send invite"] for i in fields: if i == "name": name = raw_input("\u001b[1m\u001b[31m channel name -> \u001b[0m") elif i == "purpose(OPTINAL)": purpose = raw_input("\u001b[1m\u001b[31m purpose of the channel(OPTINAL) : \u001b[0m") else: invites = prompt("send invites -> ", completer=WordCompleter(users), style=DocumentStyle) url = "https://slack.com/api/channels.create?token={token}&name={name}&purpose={purpose}".format( token=settings.token, name=name, purpose=purpose) response = requests.get(url).json() # TODO : send channel and user_id to channels_invite if response["ok"]: os.system("figlet 'Created' | lolcat") time.sleep(2) os.system("clear")
def main(): """ Start the Slack Client """ os.system("clear; figlet 'Slack Gitsin' | lolcat") history = FileHistory(os.path.expanduser("~/.slackHistory")) while True: text = prompt("slack> ", history=history, auto_suggest=AutoSuggestFromHistory(), on_abort=AbortAction.RETRY, style=DocumentStyle, completer=Completer(fuzzy_match=False, text_utils=TextUtils()), complete_while_typing=Always(), get_bottom_toolbar_tokens=get_bottom_toolbar_tokens, key_bindings_registry=manager.registry, accept_action=AcceptAction.RETURN_DOCUMENT ) slack = Slack(text) slack.run_command()
def config_settings(event): """ opens the configuration """ global PROMPTING telemetry.track_key('F1') PROMPTING = True config = azclishell.configuration.CONFIGURATION answer = "" questions = { "Do you want command descriptions" : "command_description", "Do you want parameter descriptions" : "param_description", "Do you want examples" : "examples" } for question in questions: while answer.lower() != 'y' and answer.lower() != 'n': answer = prompt(u'\n%s (y/n): ' % question) config.set_val('Layout', questions[question], format_response(answer)) answer = "" PROMPTING = False print("\nPlease restart shell for changes to take effect.\n\n") event.cli.set_return_value(event.cli.current_buffer)
def _process_input(self): input_ = prompt('> ', get_bottom_toolbar_tokens=self._get_toolbar, style=self.style, history=self.history, completer=self.completer, complete_while_typing=False, key_bindings_registry=self.registry) input_ = input_.split(' ') cmd = input_[0] args = input_[1:] if cmd == '': return try: args = self.parser.parse_args(input_) result = getattr(self, 'cmd_{}'.format(cmd))(args) if result: print(result) except SystemExit: pass
def interactive(self, context=None): """Runs interactive command line chat between user and bot. Runs indefinitely until EOF is entered to the prompt. context -- optional initial context. Set to {} if omitted """ if context is None: context = {} # input/raw_input are not interchangeable between Python 2 and 3 try: input_function = raw_input except NameError: input_function = input history = InMemoryHistory() while True: try: message = prompt(INTERACTIVE_PROMPT, history=history, mouse_support=True).rstrip() except (KeyboardInterrupt, EOFError): return print(self.message(message, context))
def BootstrapBlockchain(): current_chain_dir = settings.LEVELDB_PATH bootstrap_file = settings.BOOTSTRAP_FILE if bootstrap_file is None: print("no bootstrap file specified. Please update your configuration file.") sys.exit(0) print("This will overwrite any data currently in %s.\nType 'confirm' to continue" % current_chain_dir) confirm = prompt("[confirm]> ", is_password=False) if confirm == 'confirm': return do_bootstrap() print("bootstrap cancelled") sys.exit(0)
def _migrate_legacy_app_data_if_just_upgraded_and_user_agrees(): if is_cli_base_dir_untouched() and legacy_base_dir_exists(): print('Application data from previous Indy version has been found') answer = prompt('Do you want to migrate it? [Y/n] ') if not answer or answer.upper().startswith('Y'): try: combined_migration.migrate() # Invalidate config caches to pick up overridden config # parameters from migrated application data invalidate_config_caches() print('Application data has been migrated') except Exception as e: print('Error occurred when trying to migrate' ' application data: {}'.format(e)) traceback.print_exc() print('Application data has not been migrated') else: print('Application data was not migrated')
def prompt_yesno(msg: str, default: bool = None, meta_dict: t.Dict[str, str] = None) -> bool: """ Prompt for simple yes or no decision. :param msg: asked question :param default: default value :param meta_dict: mapping 'yes' or 'no' to further explanations :return: user input converted to bool """ valid_words = ["yes", "no", "y", "n"] if default is not None: msg += "[" + ("y" if default else "n") + "] " valid_words.append("") completer = WordCompleter(["yes", "no"], ignore_case=True, meta_dict=meta_dict) text = prompt(msg, completer=completer, display_completions_in_columns=True, validator=WordValidator(valid_words, ignore_case=True)) if text == "": return default return text.lower().startswith("y")
def default_prompt(msg: str, default: t.Optional = None, **kwargs): """ Wrapper around prompt that shows a nicer prompt with a default value that isn't editable. Interpretes the empty string as "use default value". :param msg: message :param default: default value :param kwargs: arguments passed directly to the prompt function :return: user input """ msg = message(msg, default) if default is not None and "validator" in kwargs: vali = kwargs["validator"] if isinstance(vali, TypeValidator): vali.allow_empty = True if isinstance(vali, WordValidator): vali.allow_empty = True res = prompt(msg, **kwargs) if res == "" and default is not None: return default return res
def prompt_attributes_dict(default_description: str = None) -> t.Dict[str, str]: """ Prompts for the contents of the attributes dict. :param default_description: default value for the description attribute :return: attributes dict """ attributes = {} descr_msg = "Give a description for the current block: " if default_description is not None: attributes["description"] = default_prompt(descr_msg, default_description, completer=WordCompleter([default_description])) else: attributes["description"] = prompt(descr_msg, validator=NonEmptyValidator()) try: while prompt_yesno("Do you want to set or add another attribute? ", default=False): name = prompt("Attribute name: ", validator=NonEmptyValidator(), completer=WordCompleter(sorted(list(attributes.keys())), meta_dict=attributes)) default = attributes[name] if name in attributes else "" attributes[name] = prompt("Attribute value: ", default=default, validator=NonEmptyValidator()) except KeyboardInterrupt: pass return attributes
def prompt_rusage_exec_dict(run_dict: dict) -> dict: """ Prompt for the config of the rusage exec runner. :param run_dict: run config dict (without the runner part) :return: runner config """ runner_dict = {} default_props = ", ".join(RusageExecRunner.misc_options["properties"].get_default()) class RusagePropertiesValidator(Validator): def validate(self, document: Document): vals = [elem.strip() for elem in document.text.split(",")] ret = verbose_isinstance(vals, ValidRusagePropertyList()) if not ret: raise ValidationError(message=str(ret), cursor_position=len(document.text)) props = prompt("Which properties should be obtained from getrusage(1)? ", validator=RusagePropertiesValidator(), default=default_props, completer=WordCompleter(sorted(list(set(get_av_rusage_properties().keys()))), meta_dict=get_av_rusage_properties(), ignore_case=False, WORD=True)) runner_dict["properties"] = [prop.strip() for prop in props.split(",")] return runner_dict
def prompt_time_exec_dict(run_dict: dict) -> dict: """ Prompt for the config of the time exec runner. :param run_dict: run config dict (without the runner part) :return: runner config """ runner_dict = {} default_props = ", ".join(TimeExecRunner.misc_options["properties"].get_default()) class TimePropertiesValidator(Validator): def validate(self, document: Document): vals = [elem.strip() for elem in document.text.split(",")] ret = verbose_isinstance(vals, ValidTimePropertyList()) if not ret: raise ValidationError(message=str(ret), cursor_position=len(document.text)) props = prompt("Which properties should be obtained from gnu time? ", validator=TimePropertiesValidator(), default=default_props, completer=WordCompleter(sorted(list(set(get_av_time_properties().keys()))), meta_dict=get_av_rusage_properties(), ignore_case=False, WORD=True)) runner_dict["properties"] = [prop.strip() for prop in props.split(",")] return runner_dict
def read(self): if self.multi_line: self.multi_line = False return prompt(u"", multiline=True, **self.prompt_args) def get_prompt_tokens(_): tokens = [] if self.tx is None: tokens.append((Token.Prompt, "\n-> ")) else: tokens.append((Token.Prompt, "\n-(")) tokens.append((Token.TxCounter, "{}".format(self.tx_counter))) tokens.append((Token.Prompt, ")-> ")) return tokens return prompt(get_prompt_tokens=get_prompt_tokens, **self.prompt_args)
def config_settings(event): """ opens the configuration """ global PROMPTING shell_telemetry.track_key('F1') PROMPTING = True config = azclishell.configuration.CONFIGURATION answer = "" questions = { "Do you want command descriptions": "command_description", "Do you want parameter descriptions": "param_description", "Do you want examples": "examples" } for question in questions: while answer.lower() != 'y' and answer.lower() != 'n': answer = prompt(u'\n%s (y/n): ' % question) config.set_val('Layout', questions[question], format_response(answer)) answer = "" PROMPTING = False print("\nPlease restart the interactive mode for changes to take effect.\n\n") event.cli.set_return_value(event.cli.current_buffer)
def auto_complete_prompt(allowed_extensions, file_type): os_walk = os.walk(".") all_files = [] for cur_dir, child_dirs, files in os_walk: if cur_dir == ".": for file in files: file_name, extension = os.path.splitext(file) if (extension in allowed_extensions): all_files.append(file) break auto_complete_files = WordCompleter(all_files, ignore_case=True) video_file_name = prompt( "Which {0} file you like to trim: ".format(file_type), completer=auto_complete_files, complete_while_typing=True) return video_file_name
def trim_the_video(): allowed_extensions = [".mp4", ".mov", ".webm"] video_file_name = auto_complete_prompt(allowed_extensions, "video") start_time = prompt("Start time for the video: ") end_time = prompt("End time for the video: ") file_name, extension = os.path.splitext(video_file_name) if extension not in allowed_extensions: print("The file you specified isn't supported at the moment.") print("Exiting now.") exit() command = ( "ffmpeg -i {0} -r 24 -ss {1} -to {2} trimmed-{0}.mp4" ).format(video_file_name, start_time, end_time, video_file_name) args = shlex.split(command) with Halo(text="Generating preface video...", spinner='earth'): subprocess.run(args) return "trimmed-{0}.mp4".format(video_file_name)
def confirm(prompt, yes=True): """Confirm with user input :param prompt: Question or text that the user gets. :type prompt: str :param yes: If yes should be the default. :type yes: bool :returns: True if go ahead, False if stop :rtype: bool """ import prompt_toolkit result = prompt_toolkit.prompt( prompt + ' (%s): ' % ('Y/n' if yes else 'y/N') ) if yes: return result not in ['N', 'n'] else: return result not in ['Y', 'y']
def input(prompt, default=""): """Prompt user for input :param prompt: Question or text that the user gets. :type prompt: str :param default: Default value to give if the user does not input anything :type default: str :returns: User input or default :rtype: bool """ import prompt_toolkit result = prompt_toolkit.prompt( prompt + ' (%s): ' % (default) ) return result if result else default
def tagger_repl(tagger, **kwargs): mxlen = int(kwargs.get('mxlen', 100)) maxw = int(kwargs.get('maxw', 100)) #zeropad = int(kwargs.get('zeropad', 0)) prompt_name = kwargs.get('prompt', 'class> ') history_file = kwargs.get('history_file', '.history') history = FileHistory(history_file) while True: text = prompt(prompt_name, history=history) text = text.strip() if text == 'quit': break try: tokens = text.split(' ') best = tagger.predict_text(tokens, mxlen=mxlen, maxw=maxw) print(best) except Exception as e: logging.exception('Error')
def test(self): self._load_auth() history = FileHistory('.history') project_id = self._get_current_project_id() bot = self._load_bot() while True: try: line = prompt('BotHub> ', history=history) if not line: continue event = make_event(line) context = {} bot.handle_message(event, context) except (EOFError, KeyboardInterrupt): break except Exception: traceback.print_exc()
def askYesNo(msg, default=None): default = default and default.lower() y = 'Y' if default == 'y' else 'y' n = 'N' if default == 'n' else 'n' prompt = msg + ' (%s/%s)? ' % (y, n) value = None while value is None: value = input(prompt).lower() if value == '' and default: return default == 'y' if value not in ('y', 'n', 'yes', 'no'): value = None return value in ('y', 'yes')
def prompt_path_if_none(arguments, key, key_desc, force_prompt=False, extension=""): value = arguments[key] or jkv.get(key, None) if not value or force_prompt: confirm = False while not confirm: confirm = True value = prompt( message=key_desc+": ", validator=NotEmptyValidator(error=key_desc+' required'), completer=PathCompleter( expanduser=True, file_filter=has_extension(extension) )) value = abspath(expanduser(value)) root, ext = splitext(value) value = root + extension print(value) if not exists(value): confirm = prompt_yn('Create {}? (Y/n): '.format(value)) jkv[key] = value return value
def set_prompt_tokens(self, device_info: tuple) -> None: """ Set prompt tokens sourced from a command.device.device_info() call. :param device_info: :return: """ device_name, system_name, model, system_version = device_info self.prompt_tokens = [ (Token.Applicationname, device_name), (Token.On, ' on '), (Token.Devicetype, '(' + model + ': '), (Token.Version, system_version + ') '), (Token.Connection, '[' + state_connection.get_comms_type_string() + '] # '), ]
def get_prompt_tokens(self, _) -> list: """ Return prompt tokens to use in the cli app. If none were set during the init of this class, it is assumed that the connection failed. :param _: :return: """ if self.prompt_tokens: return self.prompt_tokens return [ (Token.Applicationname, 'unknown application'), (Token.On, ''), (Token.Devicetype, ''), (Token.Version, ' '), (Token.Connection, '[' + state_connection.get_comms_type_string() + '] # '), ]
def _get_object_option_value_message(option, set_=None): """Retuns a message for object_option_value prompt.""" if option.default is not None: default_msg = option.default if option.type_name == 'boolean': if default_msg: default_msg = 'Y/n' else: default_msg = 'y/N' msg = '{} [{}]'.format(option.verbose_name.title(), default_msg) else: msg = '{}'.format(option.verbose_name.title()) set_msg = '' if set_ is not None: set_msg = ' (installation set {})'.format(set_) msg = '{}{}: '.format(msg, set_msg) return msg # pylint: disable=too-many-arguments
def file_upload(self, channel_name): os.system("clear") channel_id = self.find_channel_id(channel_name) fields = ["file", "content", "filename", "title", "initial_comment"] for i in fields: if i == "file": os.system("echo 'opening the file dialog. wait...' ") file = subprocess.check_output(['zenity', '--file-selection']) os.system("echo '\u001b[1m\u001b[31m file : \u001b[0m'" + file + "'") elif i == "content": content = raw_input("\u001b[1m\u001b[31m content : \u001b[0m") elif i == "filename": filename = raw_input("\u001b[1m\u001b[31m filename : \u001b[0m") elif i == "title": title = raw_input("\u001b[1m\u001b[31m title : \u001b[0m") else: initial_comment = prompt("add comment : ", completer=WordCompleter(users), style=DocumentStyle) url = "https://slack.com/api/files.upload?token={token}&content={content}&filename={filename}&channels={channel_id}&title={title}&initial_comment={initial_comment}".format( token=settings.token, content=content, filename=filename, channel_id=channel_id, title=title, initial_comment=initial_comment) response = requests.get(url).json() if response["ok"]: os.system("figlet 'Uploaded!' | lolcat") time.sleep(2) os.system("clear") else: print "something goes wrong :( (\u001b[1m\u001b[31m " + response["error"] + "\u001b[0m)"
def ask_user_for_telemetry(): """ asks the user for if we can collect telemetry """ answer = " " while answer.lower() != 'yes' and answer.lower() != 'no': answer = prompt(u'\nDo you agree to sending telemetry (yes/no)? Default answer is yes: ') if answer == '': answer = 'yes' return answer
def login(url, username, password, update_apidoc, display_meta): s = requests.Session() r = s.post('{}/api/rest/auth/login'.format(url), json={'username': username, 'password': password}) rest = r.json() if rest['status'] == 'login.failed': click.secho('Fail to login, wrong username or password!', fg='red') return headers = {item: rest[item] for item in ('token', 'apiToken', 'apiLicenseToken')} s.headers = headers if update_apidoc: update_completions(s, url) click.echo('Syntax: <command> [params] [options]') click.echo('Press `Ctrl+D` to exit') history = InMemoryHistory() history.append('scm apiname') history.append('help apiname') while True: try: text = prompt(get_prompt_tokens=get_prompt_tokens, completer=SCMCompleter(TextUtils(display_meta)), auto_suggest=AutoSuggestFromHistory(), style=DocumentStyle, history=history, on_abort=AbortAction.RETRY) except EOFError: break process_command(s, url, text)
def _create_keybindings_registry(self): registry = load_key_bindings_for_prompt() for keystroke, callback in self.keybindings: @registry.add_binding(keystroke) def _(event): """ We use ``run_in_terminal``, because that ensures that the prompt is hidden right before something gets printed and it's drawn again after it. (Otherwise this would destroy the output.) """ event.cli.run_in_terminal(lambda: callback(None)) return registry
def interactive_loop(self): while True: try: line_buffer = prompt( get_prompt_tokens=self._get_prompt_tokens, style=self._get_prompt_style(), history=self._history, auto_suggest=AutoSuggestFromHistory(), completer=self._completer, complete_while_typing=True, validator=ReplValidator(), ).strip().lower().split() if not line_buffer: continue else: option = line_buffer[0] parameters = line_buffer[1:] except (EOFError, KeyboardInterrupt): sys.stdout.write('\n') sys.exit(0) else: if option == 'help': self._show_help_info() elif option == 'show': self._show_inventory() elif option == 'attach': serial = int(parameters[0]) self._invoke_shell(serial) elif option == 'exit': sys.stdout.write('\n') sys.exit(0) else: pass
def do_open(self, arguments): if self.Wallet: self.do_close_wallet() item = get_arg(arguments) if item and item == 'wallet': path = get_arg(arguments, 1) if path: if not os.path.exists(path): print("wallet file not found") return passwd = prompt("[Password]> ", is_password=True) try: self.Wallet = UserWallet.Open(path, passwd) self._walletdb_loop = task.LoopingCall(self.Wallet.ProcessBlocks) self._walletdb_loop.start(1) print("Opened wallet at %s" % path) except Exception as e: print("could not open wallet: %s " % e) else: print("Please specify a path") else: print("please specify something to open")
def do_create(self, arguments): item = get_arg(arguments) if item and item == 'wallet': path = get_arg(arguments, 1) if path: if os.path.exists(path): print("File already exists") return passwd1 = prompt("[Password 1]> ", is_password=True) passwd2 = prompt("[Password 2]> ", is_password=True) if passwd1 != passwd2 or len(passwd1) < 10: print("please provide matching passwords that are at least 10 characters long") return try: self.Wallet = UserWallet.Create(path=path, password=passwd1) contract = self.Wallet.GetDefaultContract() key = self.Wallet.GetKey(contract.PublicKeyHash) print("Wallet %s " % json.dumps(self.Wallet.ToJson(), indent=4)) print("pubkey %s " % key.PublicKey.encode_point(True)) except Exception as e: print("Exception creating wallet: %s " % e) self.Wallet = None if os.path.isfile(path): try: os.remove(path) except Exception as e: print("Could not remove {}: {}".format(path, e)) return self._walletdb_loop = task.LoopingCall(self.Wallet.ProcessBlocks) self._walletdb_loop.start(1) else: print("Please specify a path")
def test_invoke_contract(self, args): if not self.Wallet: print("please open a wallet") return if args and len(args) > 0: tx, fee, results, num_ops = TestInvokeContract(self.Wallet, args) if tx is not None and results is not None: print("\n-------------------------------------------------------------------------------------------------------------------------------------") print("Test invoke successful") print("Total operations: %s " % num_ops) print("Results %s " % [str(item) for item in results]) print("Invoke TX gas cost: %s " % (tx.Gas.value / Fixed8.D)) print("Invoke TX Fee: %s " % (fee.value / Fixed8.D)) print("-------------------------------------------------------------------------------------------------------------------------------------\n") print("Enter your password to continue and invoke on the network\n") passwd = prompt("[password]> ", is_password=True) if not self.Wallet.ValidatePassword(passwd): return print("Incorrect password") result = InvokeContract(self.Wallet, tx, fee) return else: print("Error testing contract invoke") return print("please specify a contract to invoke")
def load_smart_contract(self, args): if not self.Wallet: print("please open wallet") return function_code = LoadContract(args[1:]) if function_code: contract_script = GatherContractDetails(function_code, self) if contract_script is not None: tx, fee, results, num_ops = test_invoke(contract_script, self.Wallet, []) if tx is not None and results is not None: print("\n-------------------------------------------------------------------------------------------------------------------------------------") print("Test deploy invoke successful") print("Total operations executed: %s " % num_ops) print("Results %s " % [str(item) for item in results]) print("Deploy Invoke TX gas cost: %s " % (tx.Gas.value / Fixed8.D)) print("Deploy Invoke TX Fee: %s " % (fee.value / Fixed8.D)) print("-------------------------------------------------------------------------------------------------------------------------------------\n") print("Enter your password to continue and deploy this contract") passwd = prompt("[password]> ", is_password=True) if not self.Wallet.ValidatePassword(passwd): return print("Incorrect password") result = InvokeContract(self.Wallet, tx, Fixed8.Zero()) return else: print("test ivoke failed") print("tx is, results are %s %s " % (tx, results)) return
def token_send_from(wallet, args, prompt_passwd=True): if len(args) != 4: print("please provide a token symbol, from address, to address, and amount") return False token = get_asset_id(wallet, args[0]) send_from = args[1] send_to = args[2] amount = amount_from_string(token, args[3]) allowance = token_get_allowance(wallet, args[:-1], verbose=False) if allowance and allowance >= amount: tx, fee, results = token.TransferFrom(wallet, send_from, send_to, amount) if tx is not None and results is not None and len(results) > 0: if results[0].GetBigInteger() == 1: if prompt_passwd: print("\n-----------------------------------------------------------") print("Transfer of %s %s from %s to %s" % ( string_from_amount(token, amount), token.symbol, send_from, send_to)) print("Transfer fee: %s " % (fee.value / Fixed8.D)) print("-------------------------------------------------------------\n") passwd = prompt("[Password]> ", is_password=True) if not wallet.ValidatePassword(passwd): print("incorrect password") return False return InvokeContract(wallet, tx, fee) print("Requestest transfer from is greater than allowance") return False
def token_approve_allowance(wallet, args, prompt_passwd=True): if len(args) != 4: print("please provide a token symbol, from address, to address, and amount") return False token = get_asset_id(wallet, args[0]) approve_from = args[1] approve_to = args[2] amount = amount_from_string(token, args[3]) tx, fee, results = token.Approve(wallet, approve_from, approve_to, amount) if tx is not None and results is not None and len(results) > 0: if results[0].GetBigInteger() == 1: print("\n-----------------------------------------------------------") print("Approve allowance of %s %s from %s to %s" % (string_from_amount(token, amount), token.symbol, approve_from, approve_to)) print("Transfer fee: %s " % (fee.value / Fixed8.D)) print("-------------------------------------------------------------\n") if prompt_passwd: passwd = prompt("[Password]> ", is_password=True) if not wallet.ValidatePassword(passwd): print("incorrect password") return return InvokeContract(wallet, tx, fee) print("could not transfer tokens") return False
def token_mint(wallet, args, prompt_passwd=True): token = get_asset_id(wallet, args[0]) mint_to_addr = args[1] if len(args) < 3: raise Exception("please specify assets to attach") asset_attachments = args[2:] tx, fee, results = token.Mint(wallet, mint_to_addr, asset_attachments) if results[0].GetBigInteger() > 0: print("\n-----------------------------------------------------------") print("[%s] Will mint tokens to address: %s " % (token.symbol, mint_to_addr)) print("Fee: %s " % (fee.value / Fixed8.D)) print("-------------------------------------------------------------\n") if prompt_passwd: passwd = prompt("[Password]> ", is_password=True) if not wallet.ValidatePassword(passwd): print("incorrect password") return return InvokeWithTokenVerificationScript(wallet, tx, token, fee) else: print("Could not register addresses: %s " % str(results[0])) return False
def do_token_transfer(token, wallet, from_address, to_address, amount, prompt_passwd=True): if from_address is None: print("Please specify --from-addr={addr} to send NEP5 tokens") return False tx, fee, results = token.Transfer(wallet, from_address, to_address, amount) if tx is not None and results is not None and len(results) > 0: if results[0].GetBigInteger() == 1: print("\n-----------------------------------------------------------") print("Will transfer %s %s from %s to %s" % (string_from_amount(token, amount), token.symbol, from_address, to_address)) print("Transfer fee: %s " % (fee.value / Fixed8.D)) print("-------------------------------------------------------------\n") if prompt_passwd: passwd = prompt("[Password]> ", is_password=True) if not wallet.ValidatePassword(passwd): print("incorrect password") return False return InvokeContract(wallet, tx, fee) print("could not transfer tokens") return False
def main(host, port, username, password, askpass, tls, version, database): if version: print(get_version()) return 0 if askpass: password = prompt('Enter password: ', is_password=True) config = Config(host, port, username, password, tls, database) couch_server = couchdb.Server(config.url) r = repl.Repl(couch_server, config) return r.run()
def _getPromptUsage(): return ["prompt <principal name>"]
def getActiveEnv(self): prompt, env = PlenumCli.getPromptAndEnv(self.name, self.currPromptText) return env
def prompt_bash(msg: str, allow_empty: bool) -> str: """ Prompts for bash shell code. :param msg: shown message :param allow_empty: allow an empty string? :return: user input """ from pygments.lexers.shell import BashLexer validator = None if allow_empty else NonEmptyValidator() return prompt(msg, lexer=PygmentsLexer(BashLexer), completer=SystemCompleter())
def prompt_python(msg: str, get_globals: t.Callable[[str], t.Any], get_locals: t.Callable[[str], t.Any]) -> str: """ Prompt for python code. :param get_globals: function that returns the global variables :param get_locals: function that returns the local variables :return: user input """ from ptpython.completer import PythonCompleter from pygments.lexers.python import Python3Lexer python_completer = PythonCompleter(get_globals, get_locals) return prompt(msg, multiline=True, mouse_support=True, lexer=PygmentsLexer(Python3Lexer), completer=python_completer)
def prompt_perf_stat_exec_dict(run_dict: dict) -> dict: """ Prompt for the config of the perf stat exec runner. :param run_dict: run config dict (without the runner part) :return: runner config """ runner_dict = {} default_repeat = PerfStatExecRunner.misc_options["repeat"].get_default() runner_dict["repeat"] = int(default_prompt("How many times should perf stat itself repeat the measurement? ", default=default_repeat, validator=TypeValidator(PositiveInt()))) default_props = ", ".join(PerfStatExecRunner.misc_options["properties"].get_default()) class PerfStatPropertiesValidator(Validator): def validate(self, document: Document): vals = [elem.strip() for elem in document.text.split(",")] cmd = "perf stat -x ';' -e {props} -- /bin/echo".format(props=",".join(vals)) proc = subprocess.Popen(["/bin/sh", "-c", cmd], stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, universal_newlines=True) out, err = proc.communicate() if proc.poll() > 0: msg = str(err).split("\n")[0].strip() raise ValidationError(message=msg, cursor_position=len(document.text)) props = prompt("Which properties should perf stat measure? ", validator=PerfStatPropertiesValidator(), default=default_props, completer=WordCompleter(sorted(list(set(get_av_perf_stat_properties()))), ignore_case=False, WORD=True)) runner_dict["properties"] = [prop.strip() for prop in props.split(",")] return runner_dict
def prompt_spec_exec_dict(run_dict: dict) -> dict: """ Prompt for the config of the spec exec runner. :param run_dict: run config dict (without the runner part) :return: runner config """ runner_dict = {} runner_dict["file"] = default_prompt("SPEC like result file to use: ", validator=TypeValidator(FileName()), completer=PathCompleter()) runner_dict["base_path"] = prompt("Property base path: ") runner_dict["path_regexp"] = prompt("Regexp matching the property path for each measured property: ", validator=NonEmptyValidator()) def get(sub_path: str = ""): # just a mock """ Get the value of the property with the given path. :param sub_path: given path relative to the base path :return: value of the property """ print("The python code is entered via a multiline input. ") print("Press [Meta+Enter] or [Esc] followed by [Enter] to accept input.") print("You can click with the mouse in order to select text.") print("Use the get(sub_path: str) -> str function to obtain a properties value.") locs = locals() runner_dict["code"] = prompt_python("The python is executed for each measured property: \n", lambda: {}, lambda: {"get": locs["get"]}) return runner_dict
def __init__(self): self.prompt = None self.pargs = None self.items = None self.show_links = False
def shell(self, items, pargs): self.pargs = pargs self.items = items stdout.write( '\n') # When the fetching messege ends need to add \n after \r. self.prompt_show_items() history = InMemoryHistory() while True: p = prompt( u'we-get > ', history=history, auto_suggest=AutoSuggestFromHistory(), completer=WGCompleter(list(self.items.keys())), style=we_get_prompt_style ) if self.prompt_no_command(p): continue elif self.prompt_is_single_command(p): command = p args = None else: _ = p.split() command = _[0] _.pop(0) args = ' '.join(_) if not self.prompt_verify_command(command, args): continue elif not self.prompt_parse_command(command, args): break