我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pexpect.run()。
def kernel(): """Create a ipykernel conda environment separate from this test environment where jupyter console is installed. The environments must be separate otherwise we cannot easily check if kernel start is activating the environment or if it was already active when the test suite started. """ # unique name for the kernel and environment name = str(uuid4()) env_path = '{}/kernel-env-{name}'.format(gettempdir(), name=name) pexpect.run('/bin/bash -c "conda create -y -p {env_path} ipykernel && \ source activate {env_path} && \ python -m ipykernel install --user \ --name {name}"'.format(env_path=env_path, name=name)) # query jupyter for the user data directory in a separate command to # make parsing easier stdout = pexpect.run('jupyter --data-dir') user_path = stdout.decode('utf-8').strip() # the kernel spec resides in the jupyter user data path spec_path = os.path.join(user_path, 'kernels', name) yield Kernel(name, os.path.join(spec_path, 'kernel.json'), env_path) shutil.rmtree(env_path)
def GetGcloud(args, project=None, service=None): """Get gcloud command with arguments. Functionalities might be expanded later to run gcloud commands. Args: args: command with arguments as an array project: the project on which the glcoud compute will work service: the service on gcloud that you want to use. default: compute Returns: returns thr formatted command for gcloud compute """ command = ["gcloud"] if service: command.append(service) if project: command.extend(["--project", project]) command.extend(args) return command # TODO(sohamcodes): pexpect.spawn can be changed to subprocess call # However, timeout for subprocess is available only on python3 # So, we can implement it later.
def RunCommand(args, timeout=None, logfile=None): """Runs a given command through pexpect.run. This function acts as a wrapper over pxpect.run . You can have exception or return values based on the exitstatus of the command execution. If exitstatus is not zero, then it will return -1, unless you want RuntimeError. If there is TIMEOUT, then exception is raised. If events do not match, command's output is printed, and -1 is returned. Args: args: command with arguments as an array timeout: timeout for pexpect.run . logfile: an opened filestream to write the output Raises: RuntimeError: Command's exit status is not zero Returns: Returns -1, if bad exitstatus is not zero and when events do not match Otherwise returns 0, if everything is fine """ child = pexpect.spawn(args[0], args=args[1:], timeout=timeout, logfile=logfile) child.expect(pexpect.EOF) child.close() if child.exitstatus: print args raise RuntimeError(("Error: {}\nProblem running command. " "Exit status: {}").format(child.before, child.exitstatus)) return 0
def get_docker_volume_list(*expected_volumes): '''Get the output from "docker volume ls" for specified volumes.''' volume_listing_pattern = ( r'(?P<driver>\S+)\s+' r'(?P<name>\S+)' # r'\s*$' ) volume_listing_re = re.compile(volume_listing_pattern) docker_volumes_response = pexpect.run('docker volume ls') volume_list = [] for line in docker_volumes_response.split('\n'): match = volume_listing_re.match(line) if match: volume_list.append(match.groupdict()) return volume_list # - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def getoutput(self, cmd): """Run a command and return its stdout/stderr as a string. Parameters ---------- cmd : str A command to be executed in the system shell. Returns ------- output : str A string containing the combination of stdout and stderr from the subprocess, in whatever order the subprocess originally wrote to its file descriptors (so the order of the information in this string is the correct order as would be seen if running the command in a terminal). """ try: return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n') except KeyboardInterrupt: print('^C', file=sys.stderr, end='')
def getoutput_pexpect(self, cmd): """Run a command and return its stdout/stderr as a string. Parameters ---------- cmd : str A command to be executed in the system shell. Returns ------- output : str A string containing the combination of stdout and stderr from the subprocess, in whatever order the subprocess originally wrote to its file descriptors (so the order of the information in this string is the correct order as would be seen if running the command in a terminal). """ try: return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n') except KeyboardInterrupt: print('^C', file=sys.stderr, end='')
def get_architecture(firmware_id): """Gets the architecture of the given firmware image.""" print(bcolors.OKBLUE + "[-] Getting the firmware architecture..." + bcolors.ENDC) command = GETARCH_COMMAND.format(FIRMADYNE_PATH, OUTPUT_DIR, firmware_id) print(bcolors.ITALIC + command + bcolors.ENDC) output = pexpect.run(command, events={'Password for user {}:'.format(USER):PASSWORD + '\n'}) # extract the architecture info from the output arch = "" try: arch = output.split('\n')[0].split(':')[1] except: print(bcolors.FAIL + "[!] The firmware architecture couldn't be determined..." + bcolors.ENDC) print(bcolors.ITALIC + "[!] Please try manually with the file command and provide the correct architecture type with the --arch parameter..." + bcolors.ENDC) else: print(bcolors.OKGREEN + "[+] The architecture of your firmware image is:" + arch + bcolors.ENDC) return arch
def exec2(): child.expect('#') if subprocess.getstatusoutput('id root >> /dev/null 2&1 && echo $?') != 0: __newpasswd = 'edong&1310' subprocess.getstatusoutput('useradd zach') run('passwd zach',events={'(?i)password:':__newpasswd}) #TODO run EQUAL TO FOLLOW COMMIT! ''' child.expect('password:') child.sendline() child.expect('password:') child.sendline(__newpasswd) ''' child.expect('#') child.sendline('su - zach') child.expect('$') child.sendline('whomai')
def _start(self, startup_args=None): if not startup_args: startup_args = self._create_startup_arg_list(self._current_color, **self.init_kwargs) try: previous_instances = pexpect.run('pgrep -d, -u %s xflux' % pexpect.run('whoami')).strip() if previous_instances != "": for process in previous_instances.split(","): pexpect.run('kill -9 %s' % process) self._xflux = pexpect.spawn("xflux", startup_args) #logfile=file("tmp/xfluxout.txt",'w')) except pexpect.ExceptionPexpect: raise FileNotFoundError( "\nError: Please install xflux in the PATH \n")
def run(step, cmd, expect, bail, timeout=10): print_local_step(step) res = pexpect.run(cmd, timeout=timeout).decode('utf-8') log(res) res = ansi.sub('', res) if expect not in res: bail_out(bail, res) return res
def test_original_spec(kernel): """The kernel should output a conda path in the test suite environment. More of a test of the complicated logic in the test fixture than anything, but it ensures a a good test baseline. """ stdout = pexpect.run('which conda') conda_path = stdout.decode('utf-8').strip() assert conda_path.startswith(sys.prefix)
def _stop_tunnel(cmd): pexpect.run(cmd)
def RunGCloudService(args, project, service, logfile=None): """This function runs a gcloud `service` command. Args: args: command with arguments as an array project: the project in which the remotehost belongs to service: the service user wants to run on gcloud logfile: an opened filestream to write log Returns: Returns the return value of RunCommand """ return RunCommand(GetGcloud(args, project=project, service=service), logfile=logfile)
def TryFunctionWithTimeout(func, error_handler, num_tries, sleep_between_attempt_secs, *args, **kwargs): """The function tries to run a function without any exception. The function tries for a certain number of tries. If it cannot succeed, it raises BenchmarkError. Args: func: the function to try running without exception error_handler: the exception that the function should catch and keep trying. num_tries: number of tries it should make before raising the final exception sleep_between_attempt_secs: number of seconds to sleep between each retry *args: arguments to the function **kwargs: named arguments to the function Raises: BenchmarkError: When all tries are failed. """ count = num_tries while count > 0: try: count -= 1 ret_val = func(*args, **kwargs) if not ret_val: return else: print ret_val except error_handler as e: print e print ("Problem running function, {}. Trying again after" " {}s. Total tries left: {}.").format(func, sleep_between_attempt_secs, count) time.sleep(sleep_between_attempt_secs) raise BenchmarkError("All tries failed.")
def tearDown(self): '''Test case common fixture teardown.''' logger.info("======= tearDown: %s", self.delete_artifacts) self.log_docker_constructs() new_containers = self.new_constructs('container') new_images = self.new_constructs('image') new_volumes = self.new_constructs('volume') if self.delete_artifacts: # Every case should clean up its docker images and containers. for container in new_containers: # These should have been launched with the --rm flag, # so they should be removed once stopped. logger.info("REMOVING %s", container['id']) pexpect.run('docker stop {}'.format(container['id'])) for image in new_images: logger.info("REMOVING %s", image['id']) pexpect.run('docker rmi {}'.format(image['id'])) for volume in new_volumes: logger.info("REMOVING %s", volume['name']) pexpect.run('docker volume rm {}'.format(volume['name'])) else: # We'll leave behind any new docker constructs, so we need # to update the "original docker volumes". self.constructs['current']['container'].extend(new_containers) self.constructs['current']['image'].extend(new_images) self.constructs['current']['volume'].extend(new_volumes) # - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def get_docker_image_list(*expected_images): '''Get the output from "docker image ls" for specified image names.''' image_listing_pattern = ( r'(?P<name>[^\s]+)\s+' r'(?P<tag>[^\s]+)\s+' r'(?P<id>[0-9a-f]+)\s+' r'(?P<created>.+ago)\s+' r'(?P<size>[^\s]+)' r'\s*$' ) image_listing_re = re.compile(image_listing_pattern) docker_images_response = pexpect.run('docker image ls') image_list = [] expected_image_nametag_pairs = [ (x.split(':') + ['latest'])[0:2] for x in expected_images ] if expected_images else None for line in docker_images_response.split('\n'): match = image_listing_re.match(line) if ( match and ( not expected_images or [ match.groupdict()['name'], match.groupdict()['tag'] ] in expected_image_nametag_pairs ) ): image_list.append(match.groupdict()) return image_list # - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def get_docker_container_list(*expected_containers): '''Get the output from "docker ps -a" for specified container names.''' container_listing_pattern = ( r'(?P<id>[0-9a-f]+)\s+' r'(?P<image>[^\s]+)\s+' r'(?P<command>"[^"]+")\s+' r'(?P<created>.+ago)\s+' r'(?P<status>(Created|Exited.*ago|Up \d+ \S+))\s+' r'(?P<ports>[^\s]+)?\s+' r'(?P<name>[a-z]+_[a-z]+)' # r'\s*$' ) container_listing_re = re.compile(container_listing_pattern) docker_containers_response = pexpect.run('docker ps -a') container_list = [] # expected_container_nametag_pairs = [ # (x.split(':') + ['latest'])[0:2] for x in expected_containers # ] if expected_containers else [] for line in docker_containers_response.split('\n'): match = container_listing_re.match(line) if match: container_list.append(match.groupdict()) return container_list # - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def get_container_details(self, image): '''Run a docker container shell and retrieve several details.''' # (detail name, command, result filter) for extracting details # from container command lines. shell_commands = ( ('pwd', 'pwd', None), ('phantomjs_path', 'which phantomjs', None), ('phantomjs_perms', 'ls -l $(which phantomjs)', lambda x: x[1:10]), ('config_file', 'ls {}'.format(CONTAINER_CONFIG_PATH), None), ('config_contents', 'cat {}'.format(CONTAINER_CONFIG_PATH), None), ) command = "docker run --rm -it {} bash".format(image) logger.info('IMAGE: %s', image) logger.info('CONTAINER LAUNCH COMMAND: %s', command) spawn = pexpect.spawn(command) container_details = {} for field, shell_command, response_filter in shell_commands: container_details[field] = interact( spawn, shell_command, response_filter ) # Exit the container. spawn.sendcontrol('d') # "Expand" the config records if we found a config file. if container_details['config_file'] == CONTAINER_CONFIG_PATH: try: exec(container_details['config_contents'], container_details) except SyntaxError: pass # The '__builtins__' are noise: if '__builtins__' in container_details: del container_details['__builtins__'] return container_details # - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def test_make_aardvark_sqlite_no_images_tag(self): '''Test "make aardvark-sqlite" without specifying the images tag.''' self.case_worker( target='aardvark-sqlite', expected_docker_images=[ 'aardvark-base', 'aardvark-data-init', 'aardvark-collector', 'aardvark-apiserver', ], expected_details={ '_common': { 'pwd': '/usr/share/aardvark-data', 'NUM_THREADS': 5, 'PHANTOMJS': EXPECTED_PHANTOMJS_PATH, 'ROLENAME': 'Aardvark', 'SQLALCHEMY_DATABASE_URI': EXPECTED_SQLITE_DB_URI, 'SQLALCHEMY_TRACK_MODIFICATIONS': EXPECTED_SQL_TRACK_MODS, }, 'aardvark-base': { 'pwd': '/etc/aardvark', }, }, expected_artifacts=[ 'aardvark-base-docker-build', 'aardvark-data-docker-build', 'aardvark-data-docker-run', 'aardvark-apiserver-docker-build', 'aardvark-collector-docker-build', ], set_images_tag=False ) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # Define test suites. # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def __init__(self): """ Constructor. """ output = pexpect.run('electrum listaddresses') print(output) pattern = re.compile(r'\[\W*"[A-z0-9]+"\W*\]') #the specific output for electrum if 1 adress exists print(pattern.search(output)) if(pattern.search(output)): #if a wallet exists, initialize that one print('using already existing wallet') else: self._create_wallet() subprocess.call(['electrum', 'daemon', 'start'])
def _create_wallet(self): print('did not find an existing wallet, creating a new one') #ensure the daemon is stopped, as this causes path errors (mostly usefull for development) pexpect.run('electrum daemon stop') #build a new wallet if no wallet yet exists walletpair=str(subprocess.check_output('python addrgen/addrgen.py',shell=True)) walletpair = re.split('\W+', walletpair) self.address = walletpair[1] self.privkey = walletpair[2] print('created a wallet with address \''+self.address+'\' and privatekey \''+self.privkey+'\'') child = pexpect.spawn('electrum', ['restore', self.privkey]) #respectively: use default password, use default fee (0.002), use default gap limit and give seed self._answer_prompt(child, '') #check if wallet was created succesfulyl command = """electrum listaddresses""" output = pexpect.run(command) walletFinder = re.compile(r'\[\W*"([A-z0-9]+)"\W*\]') result = walletFinder.search(output) #This horrible feedback loop is here due to a quirk of electrum. #Needs refactoring, but do not refactor without extensive testing (i.e. multiple vps all from clean install) #Because electrum behaviour right after startup tends to differ from server to server (i suspect something to do wtih specs) try: print result.group(1) return result.group(1) except: return self._create_wallet() # def __del__(self): # ''' # clear up the electrum service # ''' # subprocess.call(['electrum', 'daemon', 'stop'])
def get_process_info (): ps = pexpect.run ('ps ax -O ppid') pass
def runu(command, timeout=-1, withexitstatus=False, events=None, extra_args=None, logfile=None, cwd=None, env=None, **kwargs): """This offers the same interface as :func:`run`, but using unicode. Like :class:`spawnu`, you can pass ``encoding`` and ``errors`` parameters, which will be used for both input and output. """ return _run(command, timeout=timeout, withexitstatus=withexitstatus, events=events, extra_args=extra_args, logfile=logfile, cwd=cwd, env=env, _spawn=spawnu, **kwargs)
def readlines(self, sizehint=-1): '''This reads until EOF using readline() and returns a list containing the lines thus read. The optional 'sizehint' argument is ignored. Remember, because this reads until EOF that means the child process should have closed its stdout. If you run this method on a child that is still running with its stdout open then this method will block until it timesout.''' lines = [] while True: line = self.readline() if not line: break lines.append(line) return lines
def get_process_info (): # This seems to work on both Linux and BSD, but should otherwise be considered highly UNportable. ps = pexpect.run ('ps ax -O ppid') pass
def test_expect_eof (self): the_old_way = subprocess.Popen(args=['/bin/ls', '-l', '/bin'], stdout=subprocess.PIPE).communicate()[0].rstrip() p = pexpect.spawn('/bin/ls -l /bin') p.expect(pexpect.EOF) # This basically tells it to read everything. Same as pexpect.run() function. the_new_way = p.before the_new_way = the_new_way.replace(b'\r\n', b'\n' ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip() the_old_way = the_old_way.replace(b'\r\n', b'\n' ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip() assert the_old_way == the_new_way, hex_diff(the_old_way, the_new_way)
def test_env(self): default = pexpect.run('env') userenv = pexpect.run('env', env={'foo':'pexpect'}) assert default!=userenv, "'default' and 'userenv' should be different" assert b'foo' in userenv and b'pexpect' in userenv, "'foo' and 'pexpect' should be in 'userenv'"
def test_cwd (self): # This assumes 'pwd' and '/tmp' exist on this platform. default = pexpect.run('pwd') tmpdir = pexpect.run('pwd', cwd='/tmp') assert default!=tmpdir, "'default' and 'tmpdir' should be different" assert (b'tmp' in tmpdir), "'tmp' should be returned by 'pwd' command"
def execute_command(): logger.info('Got incoming request') if 'command' not in request.form: return jsonify(error="Missing parameter 'command'"), 422 command = request.form['command'] file_name = _extract_filename_from_command(command) if file_name is not None and not os.path.isfile(file_name): logger.warn("Couldn't find file %s", file_name) if not command.startswith('idaw ') and not command.startswith('idaw64 '): return jsonify(error="'idaw' and 'idaw64' are the only valid commands"), 422 try: logger.info('Executing %s', command) timeout = None if 'timeout' not in request.form else int(request.form['timeout']) _, exit_code = pexpect.run(command, timeout=timeout, withexitstatus=True) except pexpect.TIMEOUT: return jsonify(error='request to ida timed out'), 408 finally: if file_name is not None: _remove_ida_created_files(file_name) logger.info('Removed ida leftover files') if exit_code == 0: logger.info('Command %s finished executing successfully', command) else: logger.warn("Command %s didn't finish correctly, IDA returned exit code %s", command, exit_code) if exit_code != 0: return jsonify(error='ida finish with status code %s' % exit_code), 500 else: return jsonify(message='OK'), 200
def transfer(self, method, source_path, remote_filename): """create Par2 files and transfer the given file and the Par2 files with the wrapped backend. Par2 must run on the real filename or it would restore the temp-filename later on. So first of all create a tempdir and symlink the soure_path with remote_filename into this. """ import pexpect par2temp = source_path.get_temp_in_same_dir() par2temp.mkdir() source_symlink = par2temp.append(remote_filename) source_target = source_path.get_canonical() if not os.path.isabs(source_target): source_target = os.path.join(os.getcwd(), source_target) os.symlink(source_target, source_symlink.get_canonical()) source_symlink.setdata() log.Info("Create Par2 recovery files") par2create = 'par2 c -r%d -n1 %s %s' % (self.redundancy, self.common_options, source_symlink.get_canonical()) out, returncode = pexpect.run(par2create, None, True) source_symlink.delete() files_to_transfer = [] if not returncode: for file in par2temp.listdir(): files_to_transfer.append(par2temp.append(file)) method(source_path, remote_filename) for file in files_to_transfer: method(file, file.get_filename()) par2temp.deltree()
def script(self, logininfo, filepath): ''' run script on managed cloud server using /usr/bin/env expect ''' if not logininfo.admin_password: raise Exception('Unmanaged Cloud Server: no rack password') if '/' in filepath: logininfo.script = filepath.split('/')[-1] else: logininfo.script = filepath if filepath.startswith('https://'): newpath = os.path.expanduser( '~/.cache/hammercloud/{login.script}'.format(login=logininfo) ) if not os.path.exists(newpath): with open(newpath, 'w') as newfile: resp = requests.get(filepath) print(resp.content, file=newfile) filepath = newpath sftp( logininfo, 'put', filepath, logininfo.script, quiet=True, executable=True ) command = '/home/{login.ssh_user}/{login.script} {login.extraargs}; ' if not logininfo.no_clean: command += 'rm /home/{login.ssh_user}/{login.script}' logininfo.command = command cmd(logininfo)
def execute_command(): logger.info('Got incoming request') if 'command' not in request.form: return jsonify(error="Missing parameter 'command'"), 422 command = request.form['command'] file_name = _extract_filename_from_command(command) if file_name is not None and not os.path.isfile(file_name): logger.warn("Couldn't find file %s", file_name) if not command.startswith('idal ') and not command.startswith('idal64 '): return jsonify(error="'idal' and 'idal64' are the only valid commands"), 422 try: logger.info('Executing %s', command) timeout = None if 'timeout' not in request.form else int(request.form['timeout']) _, exit_code = pexpect.run(command, timeout=timeout, withexitstatus=True) except pexpect.TIMEOUT: return jsonify(error='request to ida timed out'), 408 finally: if file_name is not None: _remove_ida_created_files(file_name) logger.info('Removed ida leftover files') if exit_code == 0: logger.info('Command %s finished executing successfully', command) else: logger.warn("Command %s didn't finish correctly, IDA returned exit code %s", command, exit_code) if exit_code != 0: return jsonify(error='ida finish with status code %s' % exit_code), 500 else: return jsonify(message='OK'), 200