Python subprocess 模块,Popen() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用subprocess.Popen()。
def cmd(command):
result = Result()
p = Popen(shlex.split(command), stdin=PIPE, stdout=PIPE, stderr=PIPE)
(stdout, stderr) = p.communicate()
result.exit_code = p.returncode
result.stdout = stdout
result.stderr = stderr
result.command = command
if p.returncode != 0:
print 'Error executing command [%s]' % command
print 'stderr: [%s]' % stderr
print 'stdout: [%s]' % stdout
return result
def run_command(args, wait=False):
try:
if (wait):
p = subprocess.Popen(
args,
stdout = subprocess.PIPE)
p.wait()
else:
p = subprocess.Popen(
args,
stdin = None, stdout = None, stderr = None, close_fds = True)
(result, error) = p.communicate()
except subprocess.CalledProcessError as e:
sys.stderr.write(
"common::run_command() : [ERROR]: output = %s, error code = %s\n"
% (e.output, e.returncode))
return result
def store_revision_info(src_path, output_dir, arg_string):
# Get git hash
gitproc = Popen(['git', 'rev-parse', 'HEAD'], stdout = PIPE, cwd=src_path)
(stdout, _) = gitproc.communicate()
git_hash = stdout.strip()
# Get local changes
gitproc = Popen(['git', 'diff', 'HEAD'], stdout = PIPE, cwd=src_path)
(stdout, _) = gitproc.communicate()
git_diff = stdout.strip()
# Store a text file in the log directory
rev_info_filename = os.path.join(output_dir, 'revision_info.txt')
with open(rev_info_filename, "w") as text_file:
text_file.write('arguments: %s\n--------------------\n' % arg_string)
text_file.write('git hash: %s\n--------------------\n' % git_hash)
text_file.write('%s' % git_diff)
def run_command(command, wait=False):
try:
if (wait):
p = subprocess.Popen(
[command],
stdout = subprocess.PIPE,
shell = True)
p.wait()
else:
p = subprocess.Popen(
[command],
shell = True,
stdin = None, stdout = None, stderr = None, close_fds = True)
(result, error) = p.communicate()
except subprocess.CalledProcessError as e:
sys.stderr.write(
"common::run_command() : [ERROR]: output = %s, error code = %s\n"
% (e.output, e.returncode))
return result
def call(*popenargs, **kwargs):
"""Run command with arguments. Wait for command to complete. Sends
output to logging module. The arguments are the same as for the Popen
constructor."""
from subprocess import Popen, PIPE
kwargs['stdout'] = PIPE
kwargs['stderr'] = PIPE
p = Popen(*popenargs, **kwargs)
stdout, stderr = p.communicate()
if stdout:
for line in stdout.strip().split("\n"):
logger.info(line)
if stderr:
for line in stderr.strip().split("\n"):
logger.error(line)
return p.returncode
def mathjax(s):
with open("temp.log", "w") as f:
f.write(s)
p = Popen([app.config['mjpage'],
'--dollars',
'--output', "CommonHTML",
'--fontURL',
("https://cdnjs.cloudflare.com/ajax/libs/"
"mathjax/2.7.0/fonts/HTML-CSS")], stdout=PIPE, stdin=PIPE,
stderr=PIPE)
#filename = hashlib.sha256(s.encode('utf-8')).hexdigest()
#with open(filename, 'w') as f:
# print(s, file=f)
res = p.communicate(input=s.encode('utf-8'))
out = res[0].decode('utf-8')
err = res[1].decode('utf-8')
soup = BeautifulSoup(out, 'html.parser')
style = str(soup.style)
body = "".join(str(s) for s in soup.body.children)
return style, body
def _compile_proto(full_path, dest):
'Helper to compile protobuf files'
proto_path = os.path.dirname(full_path)
protoc_args = [find_protoc(),
'--python_out={}'.format(dest),
'--proto_path={}'.format(proto_path),
full_path]
proc = subprocess.Popen(protoc_args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
outs, errs = proc.communicate(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
outs, errs = proc.communicate()
return False
if proc.returncode != 0:
msg = 'Failed compiling "{}": \n\nstderr: {}\nstdout: {}'.format(
full_path, errs.decode('utf-8'), outs.decode('utf-8'))
raise BadProtobuf(msg)
return True
def run(daemon):
if daemon:
pid_file = './sensor21.pid'
if os.path.isfile(pid_file):
pid = int(open(pid_file).read())
os.remove(pid_file)
try:
p = psutil.Process(pid)
p.terminate()
except:
pass
try:
p = subprocess.Popen(['python3', 'sensor21-server.py'])
open(pid_file, 'w').write(str(p.pid))
except subprocess.CalledProcessError:
raise ValueError("error starting sensor21-server.py daemon")
else:
print("Server running...")
app.run(host='::', port=5002)
def exec_cmd(cmd, **kwds):
"""
Execute arbitrary commands as sub-processes.
"""
stdin = kwds.get('stdin', None)
stdin_flag = None
if not stdin is None:
stdin_flag = subprocess.PIPE
proc = subprocess.Popen(
cmd,
stdin=stdin_flag,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = proc.communicate(stdin)
return (proc.returncode, stdout, stderr)
#=======================================================================
# Extend the Milter Class (where the email is captured)
#=======================================================================
def epubcheck_help():
"""Return epubcheck.jar commandline help text.
:return unicode: helptext from epubcheck.jar
"""
# tc = locale.getdefaultlocale()[1]
with open(os.devnull, "w") as devnull:
p = subprocess.Popen(
[c.JAVA, '-Duser.language=en', '-jar', c.EPUBCHECK, '-h'],
stdout=subprocess.PIPE,
stderr=devnull,
)
result = p.communicate()[0]
return result.decode()
def get_verify_command(self, signature_filename, data_filename,
keystore=None):
"""
Return a suitable command for verifying a file.
:param signature_filename: The pathname to the file containing the
signature.
:param data_filename: The pathname to the file containing the
signed data.
:param keystore: The path to a directory which contains the keys
used in verification. If not specified, the
instance's ``gpg_home`` attribute is used instead.
:return: The verifying command as a list suitable to be
passed to :class:`subprocess.Popen`.
"""
cmd = [self.gpg, '--status-fd', '2', '--no-tty']
if keystore is None:
keystore = self.gpg_home
if keystore:
cmd.extend(['--homedir', keystore])
cmd.extend(['--verify', signature_filename, data_filename])
logger.debug('invoking: %s', ' '.join(cmd))
return cmd
def run_command_with_code(self, cmd, redirect_output=True,
check_exit_code=True):
"""Runs a command in an out-of-process shell.
Returns the output of that command. Working directory is self.root.
"""
if redirect_output:
stdout = subprocess.PIPE
else:
stdout = None
proc = subprocess.Popen(cmd, cwd=self.root, stdout=stdout)
output = proc.communicate()[0]
if check_exit_code and proc.returncode != 0:
self.die('Command "%s" failed.\n%s', ' '.join(cmd), output)
return (output, proc.returncode)
def test_loadSADFile_startorder(self):
maxpid=32768
try:
out=subprocess.Popen(['cat', '/proc/sys/kernel/pid_max'], stdout=subprocess.PIPE)
res=out.communicate()
maxpid=int(res[0].strip())
except:
pass
retval = sb.loadSADFile('sdr/dom/waveforms/ticket_462_w/ticket_462_w.sad.xml')
self.assertEquals(retval, True)
comp_ac = sb.getComponent('ticket_462_ac_1')
self.assertNotEquals(comp_ac, None)
comp = sb.getComponent('ticket_462_1')
self.assertNotEquals(comp, None)
if comp_ac._pid <= maxpid-1:
isless= comp_ac._pid < comp._pid
else:
isless=comp._pid < comp_ac._pid
self.assertTrue(isless)
def test_UserOrGroupNoDaemon(self):
"""Test that we read the correct domainname from the DMD file, the test domain
should have been created by the test runner"""
domainName = scatest.getTestDomainName()
# Test that we don't already have a bound domain
try:
domMgr = self._root.resolve(scatest.getDomainMgrURI())
self.assertEqual(domMgr, None)
except CosNaming.NamingContext.NotFound:
pass # This exception is expected
args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user','domuser','--group','somegroup' ]
nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)
args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--group','somegroup' ]
nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)
args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user','domuser' ]
nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)
def test_BadUserOrBadGroup(self):
"""Test that we read the correct domainname from the DMD file, the test domain
should have been created by the test runner"""
domainName = scatest.getTestDomainName()
# Test that we don't already have a bound domain
try:
domMgr = self._root.resolve(scatest.getDomainMgrURI())
self.assertEqual(domMgr, None)
except CosNaming.NamingContext.NotFound:
pass # This exception is expected
args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user=domuser']
nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
self.assertNotEqual(nb.stderr.read().find('Separator must be a space'),-1)
args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--group=somegroup']
nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
self.assertNotEqual(nb.stderr.read().find('Separator must be a space'),-1)
def plot(self):
if _domainless._DEBUG == True:
print "Plot:plot()"
# Error checking before launching plot
if self.usesPortIORString == None:
raise AssertionError, "Plot:plot() ERROR - usesPortIORString not set ... must call connect() on this object from another component"
if self._usesPortName == None:
raise AssertionError, "Plot:plot() ERROR - usesPortName not set ... must call connect() on this object from another component"
if self._dataType == None:
raise AssertionError, "Plot:plot() ERROR - dataType not set ... must call connect() on this object from another component"
plotCommand = str(self._eclipsePath) + "/bin/plotter.sh -portname " + str(self._usesPortName) + " -repid " + str(self._dataType) + " -ior " + str(self.usesPortIORString)
if _domainless._DEBUG == True:
print "Plot:plotCommand " + str(plotCommand)
args = _shlex.split(plotCommand)
if _domainless._DEBUG == True:
print "Plot:args " + str(args)
try:
dev_null = open('/dev/null','w')
sub_process = _subprocess.Popen(args,stdout=dev_null,preexec_fn=_os.setpgrp)
pid = sub_process.pid
self._processes[pid] = sub_process
except Exception, e:
raise AssertionError, "Plot:plot() Failed to launch plotting due to %s" % ( e)
def print_card(pdf, printer_name):
"""
Send the PDF to the printer!
Shells out to `lpr` to do the work.
:param pdf: Binary PDF buffer
:param printer_name: Name of the printer on the system (ie. CUPS name)
"""
process = subprocess.Popen(
['lpr', '-P', printer_name],
stdin=subprocess.PIPE
)
process.communicate(pdf)
if process.returncode != 0:
raise PrintingError('Return code {}'.format(process.returncode))
def pipe_weighted_edgelist_to_convert(matrix, bin_filename, weight_filename):
""" Pipe a weighted edgelist (COO sparse matrix) to Louvain's convert utility """
raise ValueError('Unsupported method at the moment')
devnull = open(os.devnull, 'w')
proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
'-i', '/dev/stdin',
'-o', bin_filename,
'-w', weight_filename,
], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)
# Stream text triplets to 'convert'
for ijx in itertools.izip(matrix.row, matrix.col, matrix.data):
proc.stdin.write('%d\t%d\t%f\n' % ijx)
proc.stdin.close()
proc.wait()
devnull.close()
def pipe_unweighted_edgelist_to_convert(matrix, bin_filename):
""" Pipe an unweighted edgelist (COO sparse matrix) to Louvain's convert utility """
devnull = open(os.devnull, 'w')
proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
'-i', '/dev/stdin',
'-o', bin_filename,
], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)
# Stream text triplets to 'convert'
for ij in itertools.izip(matrix.row, matrix.col):
proc.stdin.write('%d\t%d\n' % ij)
proc.stdin.close()
proc.wait()
devnull.close()
def read_fastq(self):
if self.file[-2:] == "gz":
proc = subprocess.Popen(["gunzip", "--stdout", self.file], stdout=subprocess.PIPE)
reader = proc.stdout
else:
reader = file(self.file, "r")
while True:
header = reader.next().strip()
seq = reader.next().strip()
reader.next() # incr line
qual = reader.next().strip()
if self.rc:
seq = tk_seq.get_rev_comp(seq)
qual = qual[::-1]
yield FastqRow(header, seq, qual)
reader.close()
def parse_ab3_defines(defines_file): # , pkg_name):
try:
fp = open(defines_file, 'rt')
abd_cont = fp.read()
fp.close()
except:
print('[E] Failed to load autobuild defines file! Do you have read permission?')
return False
script = "ARCH={}\n".format(
get_arch_name()) + abd_cont + gen_laundry_list(['PKGNAME', 'PKGDEP', 'BUILDDEP'])
try:
# Better to be replaced by subprocess.Popen
abd_out = subprocess.check_output(script, shell=True)
except:
print('[E] Malformed Autobuild defines file found! Couldn\'t continue!')
return False
abd_fp = io.StringIO('[wrap]\n' + abd_out.decode('utf-8'))
abd_config = RawConfigParser()
abd_config.read_file(abd_fp)
abd_config_dict = {}
for i in abd_config['wrap']:
abd_config_dict[i.upper()] = abd_config['wrap'][i]
return abd_config_dict
def manage_test_process(queue, f, cpopen):
if queue:
if cpopen is not None:
if cpopen.poll() is None:
return cpopen # not finish
print("OK create new process")
my_env = os.environ.copy()
my_env["CUDA_VISIBLE_DEVICES"] = "" # test on CPU
argument = queue.pop()
popen = subprocess.Popen(argument, stdout=f, stderr=f, env=my_env)
return popen
else:
return cpopen
def runitt():
open(pid, 'w').close()
global variable
variable=str(variable)
variablestr=str(variable)
print('Starting Trade of: ' + variablestr)
process0='./zenbot.sh trade poloniex.' + variablestr
subprocess.Popen(process0,shell=True)
time.sleep(3600)
print('Starting node kill process)
process1='sudo killall node'
subprocess.Popen(process1,shell=True)
print('Starting final sell Of:' + variablestr + ' In Case of Error')
process2='./zenbot.sh sell --order_adjust_time=1000000000 --sell_pct=100 --markup_pct=0 poloniex.' + variablestr
proc2 = subprocess.Popen(process2,shell=True)
proc2.communicate()
print('Starting final sell Of:' + variablestr + ' In Case of Error')
process3='./zenbot.sh sell --order_adjust_time=1000000000 --sell_pct=100 --markup_pct=0 poloniex.' + variablestr
proc3 = subprocess.Popen(process3,shell=True)
proc3.communicate()
os.remove(pid)
print('Done running loop, process file deleted. Waiting for another coin...')
# From now on, any update received will be passed to 'update_handler' NOTE... Later, Zenbot will be modified to cancel on order adjust.
def add_to_vcs(self, summary):
if (
self.git_add and
(SyncStatus.DELETED in summary or SyncStatus.ADDED in summary) and
not self.dry_run and
self.confirm(
question=(
'Do you want to add created and removed files to GIT?'
)
)
):
output, errors = subprocess.Popen(
['git', '-C', app_settings.SYNC_DIRECTORY,
'add', '-A', app_settings.SYNC_DIRECTORY],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
).communicate()
if errors:
raise self.error('Adding file changes to GIT failed!')
def get_audio_streams(self):
with open(os.devnull, 'w') as DEV_NULL:
#Get file info and Parse it
try:
proc = subprocess.Popen([
FFPROBE,
'-i', self.input_video,
'-of', 'json',
'-show_streams'
], stdout=subprocess.PIPE, stderr=DEV_NULL)
except OSError as e:
if e.errno == os.errno.ENOENT:
Logger.error("FFPROBE not found, install on your system to use this script")
sys.exit(0)
output = proc.stdout.read()
return get_audio_streams(json.loads(output))
def launch(self, cfg, path, flags):
logging.debug("Determine the OS and Architecture this application is currently running on")
hostOS = platform.system().lower()
logging.debug("hostOS: " + str(hostOS))
is_64bits = sys.maxsize > 2 ** 32
if is_64bits:
hostArchitecture = 'x64'
else:
hostArchitecture = 'ia32'
logging.debug("hostArchitecture: " + str(hostArchitecture))
if(self.validateConfig(cfg, hostOS, hostArchitecture)):
fnull = open(os.devnull, 'w')
if os.environ.get("WPW_HOME") is not None:
cmd = [os.environ["WPW_HOME"] + '/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
else:
cmd = [path + '/wpwithinpy/iot-core-component/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
cmd.extend(flags)
proc = subprocess.Popen(cmd, stdin=None, stdout=fnull, stderr=subprocess.STDOUT)
return proc
else:
logging.debug("Invalid OS/Architecture combination detected")
def build_mkdocs(self):
"""
Invokes MkDocs to build the static documentation and moves the folder
into the project root folder.
"""
# Setting the working directory
os.chdir(MKDOCS_DIR)
# Building the MkDocs project
pipe = subprocess.PIPE
mkdocs_process = subprocess.Popen(
["mkdocs", "build", "-q"], stdout=pipe, stderr=pipe)
std_op, std_err_op = mkdocs_process.communicate()
if std_err_op:
raise Error("Could not build MkDocs !\n%s" %
std_err_op)
def goglib_get_games_list():
proc = subprocess.Popen(['lgogdownloader', '--exclude', \
'1,2,4,8,16,32', '--list-details'],stdout=subprocess.PIPE)
games_detailed_list = proc.stdout.readlines()
stdoutdata, stderrdata = proc.communicate()
if proc.returncode == 0:
file_path = os.getenv('HOME') + '/.games_nebula/config/games_list'
games_list_file = open(file_path, 'w')
for line in games_detailed_list:
if 'Getting game info' not in line:
games_list_file.write(line)
return 0
else:
return 1
def win64_available(self):
wine_bin, \
wineserver_bin, \
wine_lib = self.get_wine_bin_path()
dev_null = open(os.devnull, 'w')
try:
proc = subprocess.Popen([wine_bin + '64'], stdout=dev_null, \
stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
dev_null.close()
stdoutdata, stderrdata = proc.communicate()
if proc.returncode == 1:
self.combobox_winearch.set_visible(True)
return True
else:
self.combobox_winearch.set_visible(False)
self.winearch = 'win32'
return False
except:
self.combobox_winearch.set_visible(False)
self.winearch = 'win32'
return False
def writerow(self, row):
"""
:param row:
:return:
"""
self._bytes_written += self._out_writer.writerow(row)
row_txt = self._buffer.getvalue()
self._out_csv.write(row_txt)
self._reset_buffer()
self._out_csv.flush()
if self._bytes_written > self.max_bytes:
self._out_csv.close()
self._make_csv_writer()
out_name = str(Path(self._out_csv.name).absolute())
subprocess.Popen(['7z', 'a', '-t7z', '-m0=lzma', '-mx=9', '-mfb=64', '-md=16m',
out_name + '.7z', out_name])
return row_txt
def index():
if request.args.get('code'):
unlock_code = request.args.get('code')
# unlock, new password
re = s.query(RecoveryEmail).filter(RecoveryEmail.password_code==unlock_code).one_or_none()
if re:
jid = re.jid
re.password_code = None
s.merge(re)
s.commit()
# set new password and send email
email_address = re.email
password = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(10))
p = subprocess.Popen(['/usr/bin/prosodyctl', 'passwd', jid], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
args = bytes("%s\n%s\n" % (password, password), encoding='utf8')
p.communicate(args)
sendMail(email_address, 'new password', password)
content = render_template('success.html', message='password was sent')
else:
content = render_template('error.html', message='link invalid')
else:
content = render_template('index.html')
return content
def _download_packages(self):
if not os.path.exists(self.args[0]):
os.makedirs(self.args[0])
for package, pkg_opts in self.packages.iteritems():
target = '%s/%s' % (self.mirror, pkg_opts['Filename'])
debfilename = os.path.basename(target)
local_debfile_path = os.path.join(self.args[0], debfilename)
print 'Processing.. %s' % target
if os.path.exists(local_debfile_path):
md5sum = md5_checksum(local_debfile_path)
if md5sum == pkg_opts['MD5sum']:
print ' . skipping... (MD5sum match)'
continue
print " . downloading..."
proc = subprocess.Popen(['wget', '-nv', '-P', self.args[0], target])
proc.communicate()
def output_articles(articles):
if len(articles) == 0:
print('No articles found')
return
try:
pager = subprocess.Popen(['less'],
stdin=subprocess.PIPE,
stdout=sys.stdout)
for article in articles:
if int(article['reading_time']) <= 0:
article['reading_time'] = 'Unknown'
content = format_article(article, line=True)
if six.PY3:
content = bytearray(content, 'utf-8')
pager.stdin.write(content)
pager.stdin.close()
pager.wait()
except (KeyboardInterrupt, ValueError):
pass
def run_coala_with_specific_file(working_dir, file):
"""Run coala in a specified directory."""
command = ["coala", "--json", "--find-config", "--files", file]
stdout_file = tempfile.TemporaryFile()
kwargs = {"stdout": stdout_file,
"cwd": working_dir}
process = subprocess.Popen(command, **kwargs)
retval = process.wait()
output_str = None
if retval == 1:
stdout_file.seek(0)
output_str = stdout_file.read().decode("utf-8", "ignore")
if output_str:
log("Output =", output_str)
else:
log("No results for the file")
elif retval == 0:
log("No issues found")
else:
log("Exited with:", retval)
stdout_file.close()
return output_str
def spawn(self, cmd, stdin_content="", stdin=False, shell=False, timeout=2):
"""
Spawn a new process using subprocess
"""
try:
if type(cmd) != list:
raise PJFInvalidType(type(cmd), list)
if type(stdin_content) != str:
raise PJFInvalidType(type(stdin_content), str)
if type(stdin) != bool:
raise PJFInvalidType(type(stdin), bool)
self._in = stdin_content
try:
self.process = subprocess.Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE, shell=shell)
self.finish_read(timeout, stdin_content, stdin)
if self.process.poll() is not None:
self.close()
except KeyboardInterrupt:
return
except OSError:
raise PJFProcessExecutionError("Binary <%s> does not exist" % cmd[0])
except Exception as e:
raise PJFBaseException(e.message if hasattr(e, "message") else str(e))
def spawn(self, lines, additional_args = [ '-p', ''], width = None):
(mouse_x, mouse_y) = get_mouse_location()
if not width:
width = 100 # some default width
width = max(width, 101) # width has to be 100 at least (rofi restriction)
# first, compute the top left corner of the menu
menu_x = min(max(mouse_x - width/2, self.x), self.x + self.panel_width - width)
menu_y = self.y
# then, specify these coordinates relative to the mouse cursor
menu_x -= mouse_x
menu_y -= mouse_y
# compile rofi arguments
cmd = ['rofi', '-dmenu', '-sep' , '\\0' ]
cmd += ['-monitor', '-3' ] # position relative to mouse cursor
cmd += ['-layout', '1' ] # specify top left corner of the menu
cmd += ['-width', str(width) ]
cmd += ['-xoffset', str(menu_x), '-yoffset', str(menu_y) ]
cmd += self.rofi_args
cmd += additional_args
rofi = subprocess.Popen(cmd,stdout=subprocess.PIPE,stdin=subprocess.PIPE)
for i in lines:
rofi.stdin.write(i.encode('utf-8'))
rofi.stdin.write(struct.pack('B', 0))
rofi.stdin.close()
rofi.wait()
def communicate(input, coro=None):
if platform.system() == 'Windows':
# asyncfile.Popen must be used instead of subprocess.Popen
pipe = asyncoro.asyncfile.Popen([r'\cygwin64\bin\sha1sum.exe'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
else:
pipe = subprocess.Popen(['sha1sum'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
# convert pipe to asynchronous version
async_pipe = asyncoro.asyncfile.AsyncPipe(pipe)
# 'communicate' takes either the data or file descriptor with data
# (if file is too large to read in full) as input
input = open(input)
stdout, stderr = yield async_pipe.communicate(input)
print('communicate sha1sum: %s' % stdout)
def custom_feeder(input, coro=None):
def write_proc(fin, pipe, coro=None):
while True:
data = yield os.read(fin.fileno(), 8*1024)
if not data:
break
n = yield pipe.write(data, full=True)
assert n == len(data)
fin.close()
pipe.stdin.close()
def read_proc(pipe, coro=None):
# output from sha1sum is small, so read until EOF
data = yield pipe.stdout.read()
pipe.stdout.close()
raise StopIteration(data)
if platform.system() == 'Windows':
# asyncfile.Popen must be used instead of subprocess.Popen
pipe = asyncoro.asyncfile.Popen([r'\cygwin64\bin\sha1sum.exe'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
else:
pipe = subprocess.Popen(['sha1sum'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
async_pipe = asyncoro.asyncfile.AsyncPipe(pipe)
reader = asyncoro.Coro(read_proc, async_pipe)
writer = asyncoro.Coro(write_proc, open(input), async_pipe)
stdout = yield reader.finish()
print(' feeder sha1sum: %s' % stdout)
# asyncoro.logger.setLevel(asyncoro.Logger.DEBUG)
# simpler version using 'communicate'
def communicate(input, coro=None):
if platform.system() == 'Windows':
# asyncfile.Popen must be used instead of subprocess.Popen
pipe = asyncoro.asyncfile.Popen([r'\cygwin64\bin\sha1sum.exe'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
else:
pipe = subprocess.Popen(['sha1sum'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
# convert pipe to asynchronous version
async_pipe = asyncoro.asyncfile.AsyncPipe(pipe)
# 'communicate' takes either the data or file descriptor with data
# (if file is too large to read in full) as input
input = open(input)
stdout, stderr = yield async_pipe.communicate(input)
print('communicate sha1sum: %s' % stdout)
def custom_feeder(input, coro=None):
def write_proc(fin, pipe, coro=None):
while True:
data = yield os.read(fin.fileno(), 8*1024)
if not data:
break
n = yield pipe.write(data, full=True)
assert n == len(data)
fin.close()
pipe.stdin.close()
def read_proc(pipe, coro=None):
# output from sha1sum is small, so read until EOF
data = yield pipe.stdout.read()
pipe.stdout.close()
raise StopIteration(data)
if platform.system() == 'Windows':
# asyncfile.Popen must be used instead of subprocess.Popen
pipe = asyncoro.asyncfile.Popen([r'\cygwin64\bin\sha1sum.exe'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
else:
pipe = subprocess.Popen(['sha1sum'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
async_pipe = asyncoro.asyncfile.AsyncPipe(pipe)
reader = asyncoro.Coro(read_proc, async_pipe)
writer = asyncoro.Coro(write_proc, open(input), async_pipe)
stdout = yield reader.finish()
print(' feeder sha1sum: %s' % stdout)
# asyncoro.logger.setLevel(asyncoro.Logger.DEBUG)
# simpler version using 'communicate'
def terminate(self):
"""Close pipe and terminate child process.
"""
self.close()
super(Popen, self).terminate()
def __init__(self, first, last=None):
"""'first' is a Popen object. 'last', if given, is another
Popen object that is the end of the joints to 'first'.
'write' operations send data to first's stdin and 'read'
operations get data from last's stdout/stderr.
"""
if not last:
last = first
self.first = first
self.last = last
if platform.system() == 'Windows':
if not isinstance(first, Popen) or not isinstance(last, Popen):
raise ValueError('argument must be asyncfile.Popen object')
if first.stdin:
self.stdin = first.stdin
else:
self.stdin = None
if last.stdout:
self.stdout = last.stdout
else:
self.stdout = None
if last.stderr:
self.stderr = last.stderr
else:
self.stderr = None
else:
if not isinstance(first, subprocess.Popen) or not isinstance(last, subprocess.Popen):
raise ValueError('argument must be subprocess.Popen object')
if first.stdin:
self.stdin = AsyncFile(first.stdin)
else:
self.stdin = None
if last.stdout:
self.stdout = AsyncFile(last.stdout)
else:
self.stdout = None
if last.stderr:
self.stderr = AsyncFile(last.stderr)
else:
self.stderr = None
def poll(self):
"""Similar to 'poll' of Popen.
"""
if self.last:
return self.last.poll()
elif self.first:
return self.first.poll()
def custom_feeder(input, coro=None):
def write_proc(fin, pipe, coro=None):
while True:
data = yield os.read(fin.fileno(), 8*1024)
if not data:
break
n = yield pipe.write(data, full=True)
assert n == len(data)
fin.close()
pipe.stdin.close()
def read_proc(pipe, coro=None):
# output from sha1sum is small, so read until EOF
data = yield pipe.stdout.read()
pipe.stdout.close()
raise StopIteration(data)
if platform.system() == 'Windows':
# asyncfile.Popen must be used instead of subprocess.Popen
pipe = asyncoro.asyncfile.Popen([r'\cygwin64\bin\sha1sum.exe'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
else:
pipe = subprocess.Popen(['sha1sum'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
async_pipe = asyncoro.asyncfile.AsyncPipe(pipe)
reader = asyncoro.Coro(read_proc, async_pipe)
writer = asyncoro.Coro(write_proc, open(input), async_pipe)
stdout = yield reader.finish()
print(' feeder sha1sum: %s' % stdout)
# asyncoro.logger.setLevel(asyncoro.Logger.DEBUG)
# simpler version using 'communicate'
def terminate(self):
"""Close pipe and terminate child process.
"""
self.close()
super(Popen, self).terminate()
def __init__(self, first, last=None):
"""'first' is a Popen object. 'last', if given, is another
Popen object that is the end of the joints to 'first'.
'write' operations send data to first's stdin and 'read'
operations get data from last's stdout/stderr.
"""
if not last:
last = first
self.first = first
self.last = last
if platform.system() == 'Windows':
if not isinstance(first, Popen) or not isinstance(last, Popen):
raise ValueError('argument must be asyncfile.Popen object')
if first.stdin:
self.stdin = first.stdin
else:
self.stdin = None
if last.stdout:
self.stdout = last.stdout
else:
self.stdout = None
if last.stderr:
self.stderr = last.stderr
else:
self.stderr = None
else:
if not isinstance(first, subprocess.Popen) or not isinstance(last, subprocess.Popen):
raise ValueError('argument must be subprocess.Popen object')
if first.stdin:
self.stdin = AsyncFile(first.stdin)
else:
self.stdin = None
if last.stdout:
self.stdout = AsyncFile(last.stdout)
else:
self.stdout = None
if last.stderr:
self.stderr = AsyncFile(last.stderr)
else:
self.stderr = None
def poll(self):
"""Similar to 'poll' of Popen.
"""
if self.last:
return self.last.poll()
elif self.first:
return self.first.poll()
def execute(cmd, success_msg="", failure_msg="", exitcode=-1):
"""
Generic wrapper to execute the CLI commands. Returns Output if success.
On success it can print message in stdout if specified.
On failure, exits after writing to stderr.
"""
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if p.returncode == 0:
if success_msg:
output_ok(success_msg)
return out
else:
err_msg = err if err else out
output_notok(failure_msg, err=err_msg, exitcode=exitcode)
def get_glusterd_workdir():
"""
Command to get Glusterd working dir. If failed returns the
default directory /var/lib/glusterd
"""
p = subprocess.Popen(["gluster", "system::", "getwd"],
stdout=subprocess.PIPE)
out, _ = p.communicate()
if p.returncode == 0:
return out.strip()
else:
return DEFAULT_GLUSTERD_WORKDIR