Python subprocess 模块,getoutput() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用subprocess.getoutput()。
def get_proc_etime(self,pid):
fmt = subprocess.getoutput("ps -A -opid,etime | grep '^ *%d ' | awk '{print $NF}'" % pid).strip()
if fmt == '':
return -1
parts = fmt.split('-')
days = int(parts[0]) if len(parts) == 2 else 0
fmt = parts[-1]
parts = fmt.split(':')
hours = int(parts[0]) if len(parts) == 3 else 0
parts = parts[len(parts)-2:]
minutes = int(parts[0])
seconds = int(parts[1])
return ((days * 24 + hours) * 60 + minutes) * 60 + seconds
# compute the billing val this running hour
# if isreal is True, it will also make users' beans decrease to pay for the bill.
# return the billing value in this running hour
def extend_swap(size):
if size < 0:
(mem_free, mem_total) = system_manager.get_memory_sample()
size = (mem_total + mem_total // 8) // 1024
nid = 128
while subprocess.getoutput("cat /proc/swaps | grep cg-loop | awk '{print $1}' | awk -F\- '{print $NF}' | grep %d$" % nid) != "":
nid = nid + 1
start_time = time.time()
# setup
os.system('dd if=/dev/zero of=/tmp/cg-swap-%d bs=1G count=0 seek=%d >/dev/null 2>&1' % (nid, size))
os.system('mknod -m 0660 /dev/cg-loop-%d b 7 %d >/dev/null 2>&1' % (nid, nid))
os.system('losetup /dev/cg-loop-%d /tmp/cg-swap-%d >/dev/null 2>&1' % (nid, nid))
os.system('mkswap /dev/cg-loop-%d >/dev/null 2>&1' % nid)
success = os.system('swapon /dev/cg-loop-%d >/dev/null 2>&1' % nid) == 0
# detach
# os.system('swapoff /dev/cg-loop-%d >/dev/null 2>&1' % nid)
# os.system('losetup -d /dev/cg-loop-%d >/dev/null 2>&1' % nid)
# os.system('rm -f /dev/cg-loop-%d /tmp/cg-swap-%d >/dev/null 2>&1' % (nid, nid))
end_time = time.time()
return {"setup": success, "time": end_time - start_time }
def scan(network):
myip_subnet = subprocess.getoutput("/sbin/ip -o -f inet addr show | awk '/scope global/ {print $4}'")
nm.scan(hosts=myip_subnet, arguments='nmap -sn')
iplist = []
# add localhost
iplist.append({'ip_addr': '127.0.0.1', 'host': 'localhost'})
for host in nm.all_hosts():
try:
ip_a = (nm[host]['addresses']['ipv4'])
except KeyError:
ip_a = "[Unknown IP]"
try:
host_name = nm[host].hostname()
except KeyError:
host_name = "[Unknown hostname]"
iplist.append({'ip_addr': ip_a, 'host': host_name})
return iplist
def test_getoutput(self):
self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
(0, 'xyzzy'))
# we use mkdtemp in the next line to create an empty directory
# under our exclusive control; from that, we can invent a pathname
# that we _know_ won't exist. This is guaranteed to fail.
dir = None
try:
dir = tempfile.mkdtemp()
name = os.path.join(dir, "foo")
status, output = subprocess.getstatusoutput('cat ' + name)
self.assertNotEqual(status, 0)
finally:
if dir is not None:
os.rmdir(dir)
def hex_dump(hex_cmd):
"""
return hexdump in html formatted data
:param hex_cmd:
:return: str
"""
hex_string = getoutput(hex_cmd)
# Format the data
html_string = ''
hex_rows = hex_string.split('\n')
for row in hex_rows:
if len(row) > 9:
off_str = row[0:8]
hex_str = row[9:58]
asc_str = row[58:78]
asc_str = asc_str.replace('"', '"')
asc_str = asc_str.replace('<', '<')
asc_str = asc_str.replace('>', '>')
html_string += '<div class="row"><span class="text-info mono">{0}</span> ' \
'<span class="text-primary mono">{1}</span> <span class="text-success mono">' \
'{2}</span></div>'.format(off_str, hex_str, asc_str)
# return the data
return html_string
def test_getoutput(self):
self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
(0, 'xyzzy'))
# we use mkdtemp in the next line to create an empty directory
# under our exclusive control; from that, we can invent a pathname
# that we _know_ won't exist. This is guaranteed to fail.
dir = None
try:
dir = tempfile.mkdtemp()
name = os.path.join(dir, "foo")
status, output = subprocess.getstatusoutput('cat ' + name)
self.assertNotEqual(status, 0)
finally:
if dir is not None:
os.rmdir(dir)
def api_synced():
root_meta_dict = dict()
root_data_dict = dict()
root_links_dict = dict()
root_meta_dict["version"] = 1
root_meta_dict["name"] = "Synced"
root_meta_dict["state"] = "In Progress"
root_meta_dict["Underlying Command"] = "electrum is_synchronized"
synced = subprocess.getoutput(root_meta_dict["Underlying Command"])
root_data_dict["electrum_synced"] = synced
return jsonify(data=root_data_dict, meta=root_meta_dict, links=root_links_dict)
def git_info():
head, diff, remote = None, None, None
try:
head = subprocess.getoutput('git rev-parse HEAD').strip()
except subprocess.CalledProcessError:
pass
try:
diff = subprocess.getoutput('git diff --no-color')
except subprocess.CalledProcessError:
pass
try:
remote = subprocess.getoutput('git remote -v').strip()
except subprocess.CalledProcessError:
pass
git_dict = {'head': head or 'Unknown',
'diff': diff or 'Unknown',
'remote': remote or 'Unknown'}
return git_dict
def add_to_desktop(self, widget, desktopEntry):
try:
# Determine where the Desktop folder is (could be localized)
import subprocess
#sys.path.append('/usr/lib/ubuntu-mate/common')
from configobj import ConfigObj
config = ConfigObj(GLib.get_home_dir() + "/.config/user-dirs.dirs")
desktopDir = GLib.get_home_dir() + "/Desktop"
tmpdesktopDir = config['XDG_DESKTOP_DIR']
tmpdesktopDir = subprocess.getoutput("echo " + tmpdesktopDir)
if os.path.exists(tmpdesktopDir):
desktopDir = tmpdesktopDir
# Copy the desktop file to the desktop
os.system("cp \"%s\" \"%s/\"" % (desktopEntry.desktopFile, desktopDir))
os.system("chmod a+rx %s/*.desktop" % (desktopDir))
except Exception as detail:
print (detail)
def get_workspaces(self):
Workspaces = namedtuple('Workspaces', 'horz vert total')
dconf_horz = 'dconf read /org/compiz/profiles/unity/plugins/core/hsize'
dconf_vert = 'dconf read /org/compiz/profiles/unity/plugins/core/vsize'
workspaces_horz = subprocess.getoutput(dconf_horz)
workspaces_vert = subprocess.getoutput(dconf_vert)
# If workspace changer hasn't been enabled h- and v-size doesn't seem to be set.
if not workspaces_horz:
self.logger.debug("unity/plugins/core/hsize not set - setting horisontal workspaces to '1'")
workspaces_horz = '1'
if not workspaces_vert:
self.logger.debug("unity/plugins/core/vsize not set - setting vertical workspaces to '1'")
workspaces_vert = '1'
workspaces_total = int(workspaces_vert) * int(workspaces_horz)
self.logger.debug("Horisontal number of workspaces is: " + str(workspaces_horz))
self.logger.debug("Vertical number of workspaces is: " + str(workspaces_vert))
self.logger.debug("Total number of workspaces is: " + str(workspaces_total))
workspaces = Workspaces(
int(workspaces_horz),
int(workspaces_vert),
workspaces_total)
return workspaces
def test_getoutput(self):
self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
(0, 'xyzzy'))
# we use mkdtemp in the next line to create an empty directory
# under our exclusive control; from that, we can invent a pathname
# that we _know_ won't exist. This is guaranteed to fail.
dir = None
try:
dir = tempfile.mkdtemp()
name = os.path.join(dir, "foo")
status, output = subprocess.getstatusoutput(
("type " if mswindows else "cat ") + name)
self.assertNotEqual(status, 0)
finally:
if dir is not None:
os.rmdir(dir)
def run(self, edit, query):
## get the contents of the tab that is the focus in sublime
allcontent = sublime.Region(0, self.view.size())
## convert to string
allcontent = self.view.substr(allcontent)
## get a unique string for use in the tmp files
unique_filename = str(uuid.uuid4())
## open db and query files in /tmp and write allcontent (i.e. the 'page' open, with the sublime console at the bottom)
db_handle = '/tmp/%s.sublime_blast.tmp_db.fa' % unique_filename
with open(db_handle, 'w') as fo:
fo.write(allcontent)
query_handle = '/tmp/%s.sublime_blast.tmp_query.fa' % unique_filename
with open(query_handle, 'w') as fo:
fo.write('>query\n%s\n' % query)
## make a blastdb of the page open in sublime
subprocess.call(['/usr/local/bin/makeblastdb', '-dbtype', 'nucl', '-in', '%s' % db_handle])
## run blast, taking the query as what is input in the console
blast_output = subprocess.getoutput(['/usr/local/bin/blastn -db {0} -query {1} -outfmt 6'.format(db_handle, query_handle)])
## add the headers and the blast output to the end of the window of focus
## self.view.size() is where you want to insert the text, in this case, at the end of the file so we use self.view.size() which is the total size
## edit is a token which classes the insert as a single edit which can e.g. be undone with one 'undo'
self.view.insert(edit, self.view.size(), '\nqseqid sseqid pident length mismatch gapopen qstart qend sstart send evalue bitscore\n')
self.view.insert(edit, self.view.size(), blast_output)
def __get_remote_index():
"""
Gets the index of news crawl files from commoncrawl.org and returns an array of names
:return:
"""
# cleanup
subprocess.getoutput("rm tmpaws.txt")
# get the remote info
cmd = "aws s3 ls --recursive s3://commoncrawl/crawl-data/CC-NEWS/ --no-sign-request > .tmpaws.txt && " \
"awk '{ print $4 }' .tmpaws.txt && " \
"rm .tmpaws.txt"
__logger.info('executing: %s', cmd)
stdout_data = subprocess.getoutput(cmd)
lines = stdout_data.splitlines()
return lines
def __get_remote_index(self):
"""
Gets the index of news crawl files from commoncrawl.org and returns an array of names
:return:
"""
# cleanup
subprocess.getoutput("rm tmpaws.txt")
# get the remote info
cmd = "aws s3 ls --recursive s3://commoncrawl/crawl-data/CC-NEWS/ --no-sign-request > tmpaws.txt && " \
"awk '{ print $4 }' tmpaws.txt && " \
"rm tmpaws.txt"
self.__logger.info('executing: %s', cmd)
stdout_data = subprocess.getoutput(cmd)
print(stdout_data)
lines = stdout_data.splitlines()
return lines
def test_getoutput(self):
self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
(0, 'xyzzy'))
# we use mkdtemp in the next line to create an empty directory
# under our exclusive control; from that, we can invent a pathname
# that we _know_ won't exist. This is guaranteed to fail.
dir = None
try:
dir = tempfile.mkdtemp()
name = os.path.join(dir, "foo")
status, output = subprocess.getstatusoutput(
("type " if mswindows else "cat ") + name)
self.assertNotEqual(status, 0)
finally:
if dir is not None:
os.rmdir(dir)
def get_our_pids(this_base):
''' Runs ps ax, returns the list of PIDs whose comand starts with "/res_binaries/ '''
# Get the id of the running box; Vagrant should accept the name but doesn't; https://github.com/mitchellh/vagrant/issues/8691
id_of_base = ""
for this_line in (subprocess.getoutput("vagrant global-status")).splitlines():
if this_base in this_line:
(id_of_base, _) = this_line.split(" ", 1)
break
if not id_of_base:
die("No id could be found for {}.".format(this_base))
this_command = "vagrant ssh --command \"ps ax o pid,args --no-headers\" {}".format(id_of_base)
this_ps = subprocess.getoutput(this_command)
pids_to_return = []
for this_line in this_ps.splitlines():
(this_pid, this_cmd) = (this_line.strip()).split(" ", 1)
if this_cmd.startswith("/res_binaries/"):
pids_to_return.append((this_pid, this_cmd))
return pids_to_return
def detect(hostname):
"""
Performs CDN detection through the DNS, using the nslookup command.
Parameters
----------
hostname : str
Hostname to assess
"""
print('[+] DNS detection\n')
hostname = urlparse.urlparse(hostname).netloc
regexp = re.compile('\\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\\b')
out = commands.getoutput("host " + hostname)
addresses = regexp.finditer(out)
for addr in addresses:
CDNEngine.find(commands.getoutput('nslookup ' + addr.group()))
def detect(hostname):
"""
Performs CDN detection through whois command's.
Parameters
----------
hostname : str
Hostname to assess
"""
print('[+] Whois detection\n')
hostname = urlparse.urlparse(hostname).netloc
out = commands.getoutput("whois " + hostname)
CDNEngine.find(out.lower())
def detect(hostname):
"""
Performs CDN detection by trying to access the cdn subdomain of the
specified hostname.
Parameters
----------
hostname : str
Hostname to assess
"""
print('[+] CDN subdomain detection\n')
hostname = "cdn." + urlparse.urlparse(hostname).netloc
out = commands.getoutput("host -a " + hostname)
CDNEngine.find(out.lower())
def detect(hostname):
"""
Performs CDN detection thanks to information disclosure from server error.
Parameters
----------
hostname : str
Hostname to assess
"""
print('[+] Error server detection\n')
hostname = urlparse.urlparse(hostname).netloc
regexp = re.compile('\\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\\b')
out = commands.getoutput("host " + hostname)
addresses = regexp.finditer(out)
for addr in addresses:
res = request.do('http://' + addr.group())
if res is not None and res.status_code == 500:
CDNEngine.find(res.text.lower())
def getcommands(helpcmd):
"""Recursively crawl all subgroups, starting from specified help command.
Passed command assumed to end with -h. For example, 'az vm -h'
"""
indentation = 4 * (len(helpcmd.split(' ')) - 3)
print(indentation*' ' + helpcmd)
stdoutdata = subprocess.getoutput(helpcmd) # capture help command output
subgroups = False # flag to track whether inside the subgroup section
for line in stdoutdata.split('\n'):
if line.strip() == 'Subgroups:':
subgroups = True # found start of subgroup section
continue # skip the 'Subgroups:' line
if not subgroups:
continue # skip all lines before subgroup section
if subgroups and (not line.strip() or line.lstrip() == line):
break # blank or non-indented line is end of subgroup section
subhelp = subcommand(helpcmd, line) # create help command for subgroup
getcommands(subhelp) # enumerate help commands for this subgroup
def _deSubstPath(path):
"""desubstitude craft short path"""
if not OsDetection.isWin():
return path
drive, tail = os.path.splitdrive(path)
drive = drive.upper()
if CraftStandardDirs._SUBST == None:
tmp = subprocess.getoutput("subst").split("\n")
CraftStandardDirs._SUBST = {}
for s in tmp:
if s != "":
key, val = s.split("\\: => ")
CraftStandardDirs._SUBST[key] = val
if drive in CraftStandardDirs._SUBST:
deSubst = CraftStandardDirs._SUBST[drive] + tail
return deSubst
return path
def __init__(self,test=False):
global laststopcpuval
global workercinfo
threading.Thread.__init__(self)
self.thread_stop = False
self.interval = 2
self.billingtime = 3600 # billing interval
self.test = test
self.cpu_last = {}
self.cpu_quota = {}
self.mem_quota = {}
self.net_stats = {}
self.cores_num = int(subprocess.getoutput("grep processor /proc/cpuinfo | wc -l"))
containers = self.list_container()
for container in containers: # recovery
if not container == '':
try:
vnode = VNode.query.get(container)
laststopcpuval[container] = vnode.laststopcpuval
laststopruntime[container] = vnode.laststopruntime
workercinfo[container] = {}
workercinfo[container]['basic_info'] = {}
workercinfo[container]['basic_info']['billing'] = vnode.billing
workercinfo[container]['basic_info']['billing_history'] = get_billing_history(container)
workercinfo[container]['basic_info']['RunningTime'] = vnode.laststopruntime
workercinfo[container]['basic_info']['a_cpu'] = a_cpu
workercinfo[container]['basic_info']['b_mem'] = b_mem
workercinfo[container]['basic_info']['c_disk'] = c_disk
workercinfo[container]['basic_info']['d_port'] = d_port
except:
laststopcpuval[container] = 0
laststopruntime[container] = 0
return
# list containers on this worker
def user_live_add(form, args):
if not os.path.exists('/var/lib/docklet/global/users/%s' % form['user']):
return False
subprocess.getoutput('echo live > /var/lib/docklet/global/users/%s/status' % form['user'])
return True
# curl -L -X POST -F user=docklet http://0.0.0.0:1728/v1/user/live/remove
def user_live_remove(form, args):
subprocess.getoutput('rm -f /var/lib/docklet/global/users/%s/status' % form['user'])
return True
# curl -L -X POST http://0.0.0.0:1728/v1/user/live/list
def user_live_list(form, args):
return subprocess.getoutput('ls -1 /var/lib/docklet/global/users/*/status 2>/dev/null | awk -F\/ \'{print $(NF-1)\'}').split()
def get_cpu_sample():
[a, b, c, d] = subprocess.getoutput("cat /proc/stat | grep ^cpu\ | awk '{print $2, $3, $4, $6}'").split()
cpu_time = int(a) + int(b) + int(c) + int(d)
return (cpu_time, time.time())
def get_memory_sample():
mem_free = int(subprocess.getoutput("awk '{if ($1==\"MemAvailable:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024
mem_total = int(subprocess.getoutput("awk '{if ($1==\"MemTotal:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024
return (mem_free, mem_total)
def get_swap_sample():
swap_free = int(subprocess.getoutput("awk '{if ($1==\"SwapFree:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024
swap_total = int(subprocess.getoutput("awk '{if ($1==\"SwapTotal:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024
return (swap_free, swap_total)
def get_system_loads():
if 'last_cpu_sample' not in system_manager.__dict__:
system_manager.last_cpu_sample = system_manager.get_cpu_sample()
time.sleep(1)
cpu_sample = system_manager.get_cpu_sample()
(mem_free, mem_total) = system_manager.get_memory_sample()
(swap_free, swap_total) = system_manager.get_swap_sample()
ncpus = int(subprocess.getoutput("grep processor /proc/cpuinfo | wc -l"))
cpu_free = ncpus - (cpu_sample[0] - system_manager.last_cpu_sample[0]) * 0.01 / (cpu_sample[1] - system_manager.last_cpu_sample[1])
cpu_free = 0.0 if cpu_free <= 0.0 else cpu_free
system_manager.last_cpu_sample = cpu_sample
return {"mem_free": mem_free, "mem_total": mem_total, "swap_free": swap_free, "swap_total": swap_total, "cpu_free": cpu_free, "cpu_total": ncpus }
def get_cgroup_containers():
containers = subprocess.getoutput("find %s -type d 2>/dev/null | awk -F\/ '{print $(NF-1)}'" % (cgroup_manager.__default_prefix__ % ('cpu', '*', '.'))).split()
uuids = []
for item in containers:
if item.startswith('docker-') and item.endswith('.scope') and len(item) > 64:
uuids.append(item[7:-6])
else:
uuids.append(item)
return uuids
def signal_handler(signal, frame):
if sys.argv[1] == 'master':
subprocess.getoutput('ovs-vsctl del-br ovs-master >/dev/null 2>&1')
else:
subprocess.getoutput('ovs-vsctl del-br ovs-minion >/dev/null 2>&1')
sys.exit(0)
def get_score_by_uuid(uuid):
user = uuid.split('-')[0]
online = subprocess.getoutput('cat /var/lib/docklet/global/users/%s/status 2>/dev/null' % user) == 'live'
return 10.0 if online else 1.0
def mpv_pause_status():
stdoutdata = subprocess.getoutput('echo \'{ "command": ["get_property", "pause"] }\' | socat - "' + mpv_socket + '"')
try:
return loads(stdoutdata)['data']
except:
return mpv_pause_status()
def mpv_fullscreen_status():
stdoutdata = subprocess.getoutput('echo \'{ "command": ["get_property", "fullscreen"] }\' | socat - "' + mpv_socket + '"')
try:
return loads(stdoutdata)['data']
except:
return mpv_fullscreen_status()
def create_jp2(img_name, img_id):
image = Image.open('{}.tiff'.format(img_name))
if image.mode == 'RGB':
kdu_com = kdu_command_rgb.format(img_name, img_name)
else:
kdu_com = kdu_command.format(img_name, img_name)
kdu_com = kdu_com.replace('{', '\{')
kdu_com = kdu_com.replace('}', '\}')
res = subprocess.getoutput(kdu_com)
if res.startswith('Kakadu Error'):
# Probably not uncompressed tiff
print('probably not uncompressed tiff')
print(res)
subprocess.getoutput('mv {}.tiff {}-2.tiff'.format(img_name, img_name))
subprocess.getoutput(convert_command.format(img_name, img_name))
kdu_com2 = kdu_command.format(img_name, img_name)
kdu_com2 = kdu_com2.replace('{', '\{')
kdu_com2 = kdu_com2.replace('}', '\}')
res = subprocess.getoutput(kdu_com2)
print('new response')
print(res)
if res.startswith('Kakadu Error') or res.startswith('Kakadu Core Error'):
print('Still broken :(')
raise ValueError(img_name)
k = Key(b, '{}{}.jp2'.format(image_base, img_id))
k.set_contents_from_filename('{}.jp2'.format(img_name))
def get_gitversion() -> dict:
"""
Uses GitVersion (https://github.com/GitTools/GitVersion) to infer project's current version
Returns Gitversion JSON output as a dict
"""
if os.environ.get('APPVEYOR'):
exe = find_executable('gitversion', r'C:\ProgramData\chocolatey\bin')
else:
exe = find_executable('gitversion')
if not exe:
click.secho(
'"gitversion.exe" not been found in your PATH.\n'
'GitVersion is used to infer the current version from the Git repository.\n'
'setuptools_scm plans on switching to using the Semver scheme in the future; when that happens, '
'I\'ll remove the dependency to GitVersion.\n'
'In the meantime, GitVersion can be obtained via Chocolatey (recommended): '
'https://chocolatey.org/packages/GitVersion.Portable\n'
'If you already have chocolatey installed, you can simply run the following command (as admin):\n\n'
'\t\t"choco install gitversion.portable -pre -y"\n\n'
'If you\'re not comfortable using the command line, there is a GUI tool for Chocolatey available at:\n\n'
'\t\thttps://github.com/chocolatey/ChocolateyGUI/releases\n\n'
'Or you can install directly from :\n\n'
'\t\thttps://github.com/GitTools/GitVersion/releases',
err=True,
)
exit(-1)
return loads(subprocess.getoutput([exe]).rstrip())
def get_gitversion() -> dict:
"""
Runs the gitversion executable and returns a dictionary of values
Example "gitversion" output::
"Major":0,
"Minor":4,
"Patch":4,
"PreReleaseTag":"dev.11",
"PreReleaseTagWithDash":"-dev.11",
"PreReleaseLabel":"dev",
"PreReleaseNumber":11,
"BuildMetaData":"",
"BuildMetaDataPadded":"",
"FullBuildMetaData":"Branch.develop.Sha.b22387288a19ac67641fac2711b940c4cab6d021",
"MajorMinorPatch":"0.4.4",
"SemVer":"0.4.4-dev.11",
"LegacySemVer":"0.4.4-dev11",
"LegacySemVerPadded":"0.4.4-dev0011",
"AssemblySemVer":"0.4.4.0",
"FullSemVer":"0.4.4-dev.11",
"InformationalVersion":"0.4.4-dev.11+Branch.develop.Sha.b22387288a19ac67641fac2711b940c4cab6d021",
"BranchName":"develop",
"Sha":"b22387288a19ac67641fac2711b940c4cab6d021",
"NuGetVersionV2":"0.4.4-dev0011",
"NuGetVersion":"0.4.4-dev0011",
"NuGetPreReleaseTagV2":"dev0011",
"NuGetPreReleaseTag":"dev0011",
"CommitsSinceVersionSource":11,
"CommitsSinceVersionSourcePadded":"0011",
"CommitDate":"2017-07-18"
"""
# This is a potential security breach, but I'm leaving it as is as it should only be running either from a dev
# machine or on Appveyor
cmd = r'C:\ProgramData\chocolatey\bin\gitversion.exe' if os.environ.get('APPVEYOR') else 'gitversion'
return loads(subprocess.getoutput([cmd]).rstrip())
def _minify(self, extension):
for root, dirs, files in os.walk(settings.STATIC_ROOT):
for f in files:
path = os.path.join(root, f)
if path.endswith(extension):
if getattr(self, 'dry_run', False):
print('[dry run] skipping minifying ' + path + '...')
continue
mini = getoutput('yui-compressor ' + path)
print('minifying "' + path + '" ... ', end='')
with open(path, 'w') as p:
p.write(mini)
print('done')
def time(self):
output=subprocess.getoutput("ps aux")
if u.uid == "":
if "google" in output:
subprocess.getoutput("killall -9 chrome")
from gir import Ui_Dialog
elif "firefox" in output:
subprocess.getoutput("killall -9 firefox")
from gir import Ui_Dialog
def resolve(self,request,handler):
reply = request.reply()
qname = request.q.qname
cmd = self.routes.get(qname)
if cmd:
output = getoutput(cmd).encode()
reply.add_answer(RR(qname,QTYPE.TXT,ttl=self.ttl,
rdata=TXT(output[:254])))
else:
reply.header.rcode = RCODE.NXDOMAIN
return reply
def resolve(self,request,handler):
reply = request.reply()
qname = request.q.qname
cmd = self.routes.get(qname)
if cmd:
output = getoutput(cmd).encode()
reply.add_answer(RR(qname,QTYPE.TXT,ttl=self.ttl,
rdata=TXT(output[:254])))
else:
reply.header.rcode = RCODE.NXDOMAIN
return reply
def _update_info(self):
"""Scan the network for devices.
Returns boolean if scanning successful.
"""
_LOGGER.debug("Update_info called")
options = self._options
last_results = []
exclude_hosts = self.exclude
scandata = subprocess.getoutput("arp-scan "+options)
now = dt_util.now()
for line in scandata.splitlines():
ipv4 = re.findall(r'[0-9]+(?:\.[0-9]+){3}', line)
if not ipv4:
continue
parts = line.split()
ipv4 = parts[0]
for exclude in exclude_hosts:
if exclude == ipv4:
_LOGGER.debug("Excluded %s", exclude)
continue
name = ipv4
mac = parts[1]
last_results.append(Device(mac, name, ipv4, now))
self.last_results = last_results
_LOGGER.debug("Update_info successful")
return True
def resolve(self,request,handler):
reply = request.reply()
qname = request.q.qname
cmd = self.routes.get(qname)
if cmd:
output = getoutput(cmd).encode()
reply.add_answer(RR(qname,QTYPE.TXT,ttl=self.ttl,
rdata=TXT(output[:254])))
else:
reply.header.rcode = RCODE.NXDOMAIN
return reply
def test_terminate(self):
self._kill_process('terminate')
# The module says:
# "NB This only works (and is only relevant) for UNIX."
#
# Actually, getoutput should work on any platform with an os.popen, but
# I'll take the comment as given, and skip this suite.
def get_token(numero):
headers = {
"x-requested-with": "com.services.movistar.ar",
"Content-Type": "application/x-www-form-urlencoded"
}
encoded_number = subprocess.getoutput(
"java -jar Movistar_Exploit_V2-1.0-SNAPSHOT-jar-with-dependencies.jar %s" % numero)
print("Numero codificado: %s" % encoded_number)
data = {
"grant_type": "mobile",
"username": encoded_number,
"client_id": "appcontainer",
"client_secret": "YXBwY29udGFpbmVy"
}
r = requests.post(
url=MOVISTAR_API_ROOT + "oauth/token",
headers=headers,
data=data
)
if r.status_code == requests.codes.ok:
parsed_response = r.json()
token = parsed_response["access_token"]
print("Token: %s" % token)
return token
else:
print("Error 1")
return None
def test_that_verifies_if_mincheader_tool_is_callable(self):
self.actualMincHeaderOutput = subprocess.getoutput(['mincheader'])
self.assertEqual(self.expectedMincHeaderOutput, self.actualMincHeaderOutput)
def exc(ctx, *args):
if Henry(ctx):
query = " ".join(args)
await bot.say("```"+subprocess.getoutput(query)+"```")
else:
await bot.say("You ain't my master! Shoo!")
def exc(ctx, *args):
if Henry(ctx):
query = " ".join(args)
await bot.say("```"+subprocess.getoutput(query)+"```")
else:
await bot.say("You ain't my master! Shoo!")
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--toc-maker", help="path to ToC making tool")
parser.add_argument("--twitter-poster", default="t update", help="twitter poster command")
parser.add_argument("-t", "--use-twitter", action="store_true")
known_args, unknown_args = parser.parse_known_args()
if not known_args.toc_maker:
known_args.toc_maker = "./gh-md-toc"
if not os.path.isfile(known_args.toc_maker):
s = cmd.getoutput("uname -s").lower()
f = "gh-md-toc.%s.amd64.tgz" % s
URL = "https://github.com/ekalinin/github-markdown-toc.go/releases/download/0.6.0/%s" % f
if not os.path.isfile(f):
if cmd.getstatusoutput("wget %s" % URL)[0] != 0:
raise EnvironmentError("Cannot download toc maker from URL: %s" % URL)
if cmd.getstatusoutput("tar xzf %s" % f)[0] != 0:
raise EnvironmentError("Cannot untar toc maker from file %s" % f)
os.remove(f)
current_permissions = stat.S_IMODE(os.lstat(known_args.toc_maker).st_mode)
os.chmod(known_args.toc_maker, current_permissions & stat.S_IXUSR)
if unknown_args:
filepath = unknown_args[0]
else:
print("You should specify the path for file to work with!")
quit(1)
return known_args, filepath