Python re 模块,search() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用re.search()。
def create_table(self, table_string):
lines = table_string.split("\n")
table = Table()
for line in lines:
if 'TABLE' in line:
table_name = re.search("`(\w+)`", line)
table.name = table_name.group(1)
if self.thesaurus_object is not None:
table.equivalences = self.thesaurus_object.get_synonyms_of_a_word(table.name)
elif 'PRIMARY KEY' in line:
primary_key_columns = re.findall("`(\w+)`", line)
for primary_key_column in primary_key_columns:
table.add_primary_key(primary_key_column)
else:
column_name = re.search("`(\w+)`", line)
if column_name is not None:
column_type = self.predict_type(line)
if self.thesaurus_object is not None:
equivalences = self.thesaurus_object.get_synonyms_of_a_word(column_name.group(1))
else:
equivalences = []
table.add_column(column_name.group(1), column_type, equivalences)
return table
def run(self):
Analyzer.run(self)
if self.data_type == 'domain' or self.data_type == 'url':
try:
pattern = re.compile("(?:Category: )([\w\s]+)")
baseurl = 'http://www.fortiguard.com/webfilter?q='
url = baseurl + self.getData()
req = requests.get(url)
category_match = re.search(pattern, req.content, flags=0)
self.report({
'category': category_match.group(1)
})
except ValueError as e:
self.unexpectedError(e)
else:
self.notSupported()
def loopback_devices():
'''
Parse through 'losetup -a' output to determine currently mapped
loopback devices. Output is expected to look like:
/dev/loop0: [0807]:961814 (/tmp/my.img)
:returns: dict: a dict mapping {loopback_dev: backing_file}
'''
loopbacks = {}
cmd = ['losetup', '-a']
devs = [d.strip().split(' ') for d in
check_output(cmd).splitlines() if d != '']
for dev, _, f in devs:
loopbacks[dev.replace(':', '')] = re.search('\((\S+)\)', f).groups()[0]
return loopbacks
def searchExploit(exploit_list, soft_name, soft_version):
""" Search affected packages in exploit_list """
result = []
version_search = versionVartions(soft_version, args.level)
for exploit in exploit_list:
if exploit[5] in valid_platforms and (args.dos or exploit[6]!='dos' or args.type == 'dos'): # Platform and DoS
if args.filter == None or args.filter.lower() in exploit[2].lower(): # Filter
if args.type == None or args.type == exploit[6]: # Type
query = "(^(\w*\s){0,%s}|/\s?)%s(\s|\s.*\s|\/).* -" % (args.level, soft_name.replace('+', '\+'))
if re.search(query, exploit[2],re.IGNORECASE):
affected_versions = extractVersions(exploit[2])
for affected_version in affected_versions:
if args.level == 5 or LooseVersion(version_search) <= LooseVersion(affected_version):
if args.duplicates == False: exploit_list.remove(exploit) # Duplicates
printOutput(exploit, soft_name, soft_version)
result.append([exploit, soft_name, soft_version])
break
return result
def extractVersions(title_string):
""" Extract all version numbers from a string """
search = re.search(r'\s-|\(|\&', title_string)
if search:
title_string = title_string[:search.span()[0]]
result = []
for possible_version in title_string.split():
if possible_version[0].isdigit():
if '/' in possible_version:
for multiversion in possible_version.split('/'):
if '-' in multiversion:
multiversion = '.'.join(multiversion.split('-')[0].split('.')[:-1]) + '.' + multiversion.split('-')[-1]
if purgeVersionString(multiversion):
result.append(purgeVersionString(multiversion))
elif '-' in possible_version:
result.append(purgeVersionString('.'.join(possible_version.split('-')[0].split('.')[:-1]) + '.' + possible_version.split('-')[-1]))
else:
result.append(purgeVersionString(possible_version))
return result
def test_oozie_workflow(self):
# given
input_data_dir = 'test/availability/input-data'
apps_streaming_dir = 'test/availability/apps/streaming'
cmd('hdfs dfs -rm -r %s' % input_data_dir)
cmd('hdfs dfs -mkdir -p %s' % input_data_dir)
cmd('hdfs dfs -put %s %s' % (sample_data, input_data_dir))
cmd('hdfs dfs -rm -r %s' % apps_streaming_dir)
cmd('hdfs dfs -mkdir -p %s' % apps_streaming_dir)
cmd('hdfs dfs -put resources/oozie/workflow.xml %s' % apps_streaming_dir)
# when
result = cmd('oozie job -oozie %s -config resources/oozie/job.properties -run' % oozie_host)
job_id = result.stdout.replace('job: ', '')
cmd('oozie job -oozie %s -poll %s -interval 1' % (oozie_host, job_id))
result = cmd('oozie job -oozie %s -info %s' % (oozie_host, job_id))
# than
status = re.search('Status\s+:\s+(.+)', result.stdout).group(1)
self.assertEqual('SUCCEEDED', status, result.stderr)
def commit_history(cli):
"""
Parse output of "show configuration history commit reverse detail"
"""
result = []
record = OrderedDict()
for line in cli.splitlines():
r = re.search(' ([A-Z][a-z]+(?: ID)?): (.*?) +([A-Z][a-z]+): (.*)', line)
if not r:
continue
record[r.group(1)] = r.group(2)
record[r.group(3)] = r.group(4)
if r.group(3) == 'Comment':
result.append(record)
record = OrderedDict()
return result
def _getmember(self, name, tarinfo=None, normalize=False):
"""Find an archive member by name from bottom to top.
If tarinfo is given, it is used as the starting point.
"""
# Ensure that all members have been loaded.
members = self.getmembers()
# Limit the member search list up to tarinfo.
if tarinfo is not None:
members = members[:members.index(tarinfo)]
if normalize:
name = os.path.normpath(name)
for member in reversed(members):
if normalize:
member_name = os.path.normpath(member.name)
else:
member_name = member.name
if name == member_name:
return member
def find_external_links(url, page):
"""Find rel="homepage" and rel="download" links in `page`, yielding URLs"""
for match in REL.finditer(page):
tag, rel = match.groups()
rels = set(map(str.strip, rel.lower().split(',')))
if 'homepage' in rels or 'download' in rels:
for match in HREF.finditer(tag):
yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
for tag in ("<th>Home Page", "<th>Download URL"):
pos = page.find(tag)
if pos != -1:
match = HREF.search(page, pos)
if match:
yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
def _getmember(self, name, tarinfo=None, normalize=False):
"""Find an archive member by name from bottom to top.
If tarinfo is given, it is used as the starting point.
"""
# Ensure that all members have been loaded.
members = self.getmembers()
# Limit the member search list up to tarinfo.
if tarinfo is not None:
members = members[:members.index(tarinfo)]
if normalize:
name = os.path.normpath(name)
for member in reversed(members):
if normalize:
member_name = os.path.normpath(member.name)
else:
member_name = member.name
if name == member_name:
return member
def _find_link_target(self, tarinfo):
"""Find the target member of a symlink or hardlink member in the
archive.
"""
if tarinfo.issym():
# Always search the entire archive.
linkname = "/".join(filter(None, (os.path.dirname(tarinfo.name), tarinfo.linkname)))
limit = None
else:
# Search the archive before the link, because a hard link is
# just a reference to an already archived file.
linkname = tarinfo.linkname
limit = tarinfo
member = self._getmember(linkname, tarinfo=limit, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def b16decode(s, casefold=False):
"""Decode the Base16 encoded bytes-like object or ASCII string s.
Optional casefold is a flag specifying whether a lowercase alphabet is
acceptable as input. For security purposes, the default is False.
The result is returned as a bytes object. A binascii.Error is raised if
s is incorrectly padded or if there are non-alphabet characters present
in the input.
"""
s = _bytes_from_decode_data(s)
if casefold:
s = s.upper()
if re.search(b'[^0-9A-F]', s):
raise binascii.Error('Non-base16 digit found')
return binascii.unhexlify(s)
#
# Ascii85 encoding/decoding
#
def compiler_is_clang(comp) :
print("check for clang compiler ...", end=' ')
try:
cc_output = subprocess.check_output(comp+['--version'],
stderr = subprocess.STDOUT, shell=False)
except OSError as ex:
print("compiler test call failed with error {0:d} msg: {1}".format(ex.errno, ex.strerror))
print("no")
return False
ret = re.search(b'clang', cc_output) is not None
if ret :
print("yes")
else:
print("no")
return ret
def validate_label(label):
# For now we can only handle [a-z ']
if "(" in label or \
"<" in label or \
"[" in label or \
"]" in label or \
"&" in label or \
"*" in label or \
"{" in label or \
re.search(r"[0-9]", label) != None:
return None
label = label.replace("-", "")
label = label.replace("_", "")
label = label.replace(".", "")
label = label.replace(",", "")
label = label.replace("?", "")
label = label.strip()
return label.lower()
def validate_label(label):
# For now we can only handle [a-z ']
if "(" in label or \
"<" in label or \
"[" in label or \
"]" in label or \
"&" in label or \
"*" in label or \
"{" in label or \
re.search(r"[0-9]", label) != None:
return None
label = label.replace("-", "")
label = label.replace("_", "")
label = label.replace(".", "")
label = label.replace(",", "")
label = label.replace("?", "")
label = label.strip()
return label.lower()
def string_find_id(string, regex):
"""Find a marker's ID using a regular expression
returns the ID as int if found, otherwise returns None
Args:
-string, any string
-regex, the regex pattern to match."""
if not string and regex:
raise AttributeError("Missing name or regex attribute")
import re
index = re.search(regex, string)
if index:
found_id = index.group(1)
return int(found_id) if found_id else None
return None
def main(cb, args):
powershells = cb.process_search_iter('process_name:powershell.exe')
for s in powershells:
if s['cmdline']:
encoded = re.search('\-[eE][nN][cC][oOdDeEcCmMaAnN]*\s([A-Za-z0-9\+/=]+)', s['cmdline'])
if encoded != None:
i = encoded.group(1)
if not re.search('[a-zA-Z0-9\+/]+={1,2}$', i):
trailingBytes = len(i) % 4
if trailingBytes == 3:
i = i + '='
elif trailingBytes == 2:
i = i + '=='
decodedCommand = base64.standard_b64decode(i)
try:
a = decodedCommand.encode('ascii', 'replace')
print "Powershell Decoded Command\n%s/#analyze/%s/1\n%s\n\n" % (
args['server_url'], s['id'], a.replace('\0', ""))
except UnicodeError:
print "Powershell Decoded Command\n%s/#analyze/%s/1\nNon-ASCII decoding, encoded form printed to assist more research\n%s\n" % (
args['server_url'], s['id'], s['cmdline'])
pass
def check_triggers(message):
global action_triggers
return_value = False
target_policy = ""
for policy in action_triggers:
# Need to make a copy here cause globals :(
rules = list(action_triggers[policy]['rules'])
while rules:
criteria = rules.pop(0)
expression = rules.pop(0)
if re.search(expression, message[criteria].lower()):
return_value = True
else:
return_value = False
rules = []
if return_value:
target_policy = action_triggers[policy]['targetpolicy']
break
return return_value,target_policy
def move_policy(sensor, targetPolicy):
global eptoken
global epserver
bit9 = bit9api.bit9Api(
"https://"+epserver, # Replace with actual Bit9 server URL
token=eptoken,
ssl_verify=False # Don't validate server's SSL certificate. Set to True unless using self-signed cert on IIS
)
# policy to send the naughty host to
targetPolicyName = targetPolicy
destPolicies = bit9.search('v1/policy', ['name:'+targetPolicyName])
if len(destPolicies)==0:
raise ValueError("Cannot find destination policy "+targetPolicyName)
# find the computer id
destComputer = bit9.search('v1/computer', ['cbSensorId:'+str(sensor)])
if len(destComputer)==0:
raise ValueError("Cannot find computer named "+hostname)
for c in destComputer:
print "Moving computer %s from policy %s to policy %s" % (c['name'], c['policyName'], targetPolicyName)
c['policyId'] = destPolicies[0]['id']
bit9.update('v1/computer', c)
def _parse_attribute(self, attribute_str):
"""Parses a attribute string and updates the CANBus
Args:
attribute_str: String with attribute
"""
pattern = 'BA_\s+"(?P<attr_name>\S+)"\s*(?P<node>BU_)?(?P<msg>BO_)?(?P<sig>SG_)?\s*'
pattern += '(?P<can_id>\d*)?\s*(?P<name>\S*)?\s+(?P<value>\S+)\s*;'
reg = re.search(pattern, attribute_str)
can_object = self._can_network
if reg.group('node'):
can_object = self._can_network.nodes[reg.group('name')]
elif reg.group('msg'):
can_object = self._can_network.get_message(int(reg.group('can_id')))
elif reg.group('sig'):
can_object = self._can_network.get_signal(int(reg.group('can_id')), reg.group('name'))
cad = self._can_network.attributes.definitions[reg.group('attr_name')]
can_object.attributes.add(CANAttribute(cad, value=self._parse_attribute_value(cad, reg.group('value'))))
def ssh_setup_agent(config, envkeys=None):
"""
Starts the ssh-agent
"""
envkeys = envkeys or ['SSH_PRIVATE_KEY']
output = os.popen('ssh-agent -s').readlines()
for line in output:
matches = re.search(r"(\S+)\=(\S+)\;", line)
if matches:
config.environ[matches.group(1)] = matches.group(2)
for envkey in envkeys:
key = os.environ.get(envkey)
if key:
ssh_add_key(config.environ, key)
else:
logging.warning('%s is missing', envkey)
def _getmember(self, name, tarinfo=None, normalize=False):
"""Find an archive member by name from bottom to top.
If tarinfo is given, it is used as the starting point.
"""
# Ensure that all members have been loaded.
members = self.getmembers()
# Limit the member search list up to tarinfo.
if tarinfo is not None:
members = members[:members.index(tarinfo)]
if normalize:
name = os.path.normpath(name)
for member in reversed(members):
if normalize:
member_name = os.path.normpath(member.name)
else:
member_name = member.name
if name == member_name:
return member
def find_external_links(url, page):
"""Find rel="homepage" and rel="download" links in `page`, yielding URLs"""
for match in REL.finditer(page):
tag, rel = match.groups()
rels = set(map(str.strip, rel.lower().split(',')))
if 'homepage' in rels or 'download' in rels:
for match in HREF.finditer(tag):
yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
for tag in ("<th>Home Page", "<th>Download URL"):
pos = page.find(tag)
if pos != -1:
match = HREF.search(page, pos)
if match:
yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
def next_unbalanced_tag(view, search=None, search_args={}, restart_at=None, tags=[]):
assert search and restart_at, 'wrong call'
region, tag, is_end_tag = search(view, **search_args)
if not region:
return None, None
if not is_end_tag:
tags.append(tag)
search_args.update(restart_at(region))
return next_unbalanced_tag(view, search, search_args, restart_at, tags)
if not tags or (tag not in tags):
return region, tag
while tag != tags.pop():
continue
search_args.update(restart_at(region))
return next_unbalanced_tag(view, search, search_args, restart_at, tags)
def _try_config_test(self, logcfg, epattern, foundTest=None ):
import ossie.utils.log4py.config
with stdout_redirect(cStringIO.StringIO()) as new_stdout:
ossie.utils.log4py.config.strConfig(logcfg,None)
new_stdout.seek(0)
found = []
epats=[]
if type(epattern) == str:
epats.append(epattern)
else:
epats = epattern
if foundTest == None:
foundTest = len(epats)*[True]
for x in new_stdout.readlines():
for epat in epats:
m=re.search( epat, x )
if m :
found.append( True )
self.assertEqual(found, foundTest )
def get_human(self, attr):
val = getattr(self, attr)
val = val.replace("_", " ")
product_mapping = {
"ie": "Internet Explorer"
}
if attr == "product" and val in product_mapping:
val = product_mapping[val]
# if there's lowercase letters in the value, make it a title
# (if there'FAILEDs not, leave it alone - e.g. SP3)
if re.search('[a-z]', val) is not None:
val = val.title()
if val.upper() in ["SP0", "SP1", "SP2", "SP3", "SP4", "SP5", "SP6"]:
val = val.upper()
if val.lower() in ["x86", "x64"]:
val = val.lower()
return val
def handle(self, player, action, values, **kwargs): # pragma: no cover
await self.close(player)
# Try to parse the button id instead of the whole action string.
button = action
try:
match = re.search('button_([0-9]+)$', action)
if len(match.groups()) == 1:
button = match.group(1)
except:
pass
if not self.response_future.done():
self.response_future.set_result(button)
if self.target:
await self.target(player, action, values, **kwargs)
def loopback_devices():
'''
Parse through 'losetup -a' output to determine currently mapped
loopback devices. Output is expected to look like:
/dev/loop0: [0807]:961814 (/tmp/my.img)
:returns: dict: a dict mapping {loopback_dev: backing_file}
'''
loopbacks = {}
cmd = ['losetup', '-a']
devs = [d.strip().split(' ') for d in
check_output(cmd).splitlines() if d != '']
for dev, _, f in devs:
loopbacks[dev.replace(':', '')] = re.search('\((\S+)\)', f).groups()[0]
return loopbacks
def __call__(self):
settings = utils.get_settings('os')
with open('/proc/cpuinfo', 'r') as fd:
cpuinfo = fd.readlines()
for line in cpuinfo:
match = re.search(r"^vendor_id\s+:\s+(.+)", line)
if match:
vendor = match.group(1)
if vendor == "GenuineIntel":
vendor = "intel"
elif vendor == "AuthenticAMD":
vendor = "amd"
ctxt = {'arch': platform.processor(),
'cpuVendor': vendor,
'desktop_enable': settings['general']['desktop_enable']}
return ctxt
def loopback_devices():
'''
Parse through 'losetup -a' output to determine currently mapped
loopback devices. Output is expected to look like:
/dev/loop0: [0807]:961814 (/tmp/my.img)
:returns: dict: a dict mapping {loopback_dev: backing_file}
'''
loopbacks = {}
cmd = ['losetup', '-a']
devs = [d.strip().split(' ') for d in
check_output(cmd).splitlines() if d != '']
for dev, _, f in devs:
loopbacks[dev.replace(':', '')] = re.search('\((\S+)\)', f).groups()[0]
return loopbacks
def sync_one(cls, external_id, last_error=None):
post_data = cls.pipedrive_api_client.get_instance(external_id)
# Error code from the API
if not post_data[u'success']:
logging.error(post_data)
raise UnableToSyncException(cls, external_id)
try:
return cls.update_or_create_entity_from_api_post(post_data[u'data'])
except IntegrityError as e:
logging.warning(e)
if e.message == last_error:
raise SameErrorTwiceSyncException(cls, external_id, e.message)
match = re.search('.*Key \((.*)\)=\((.*)\).*', e.message)
if match:
field_name = match.group(1)
field_id = match.group(2)
model = cls.field_model_map(field_name)
model.sync_one(field_id)
return cls.sync_one(external_id, e.message)
else:
raise Exception("Could not handle error message")
def addToCart(self):
print '\nADD TO CART -----------------'
session_get = self.user_session.get(self.URL_product_url, headers=self.get_headers)
#print session_get.content
soup = BeautifulSoup(session_get.content, 'lxml')
results = soup.find_all('select', class_='size-select')
#print results
for item in results[0].select('option'):
re_result = re.sub(self.sub_pattern, '', item.string)
#print re_result
matchObj = re.search(r"^%s+$" % self.user_size, re_result)
if matchObj:
self.post_data_addToCart['pid'] = item['value']
self.post_data_addToCart['masterPID'] = item['value'].partition("_")[0]
print self.post_data_addToCart
break
session_post = self.user_session.post(url=self.URL_cart_post_url, headers=self.post_headers, data=self.post_data_addToCart)
print 'Add To Cart Status: ' + str(session_post.status_code)
def git_get_keywords(versionfile_abs):
"""Extract version information from the given file."""
# the code embedded in _version.py can just fetch the value of these
# keywords. When used from setup.py, we don't want to import _version.py,
# so we do it with a regexp instead. This function is not used from
# _version.py.
keywords = {}
try:
f = open(versionfile_abs, "r")
for line in f.readlines():
if line.strip().startswith("git_refnames ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["refnames"] = mo.group(1)
if line.strip().startswith("git_full ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["full"] = mo.group(1)
f.close()
except EnvironmentError:
pass
return keywords
def git_get_keywords(versionfile_abs):
"""Extract version information from the given file."""
# the code embedded in _version.py can just fetch the value of these
# keywords. When used from setup.py, we don't want to import _version.py,
# so we do it with a regexp instead. This function is not used from
# _version.py.
keywords = {}
try:
f = open(versionfile_abs, "r")
for line in f.readlines():
if line.strip().startswith("git_refnames ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["refnames"] = mo.group(1)
if line.strip().startswith("git_full ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["full"] = mo.group(1)
f.close()
except EnvironmentError:
pass
return keywords
def git_get_keywords(versionfile_abs):
"""Extract version information from the given file."""
# the code embedded in _version.py can just fetch the value of these
# keywords. When used from setup.py, we don't want to import _version.py,
# so we do it with a regexp instead. This function is not used from
# _version.py.
keywords = {}
try:
f = open(versionfile_abs, "r")
for line in f.readlines():
if line.strip().startswith("git_refnames ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["refnames"] = mo.group(1)
if line.strip().startswith("git_full ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["full"] = mo.group(1)
f.close()
except EnvironmentError:
pass
return keywords
def git_get_keywords(versionfile_abs):
"""Extract version information from the given file."""
# the code embedded in _version.py can just fetch the value of these
# keywords. When used from setup.py, we don't want to import _version.py,
# so we do it with a regexp instead. This function is not used from
# _version.py.
keywords = {}
try:
f = open(versionfile_abs, "r")
for line in f.readlines():
if line.strip().startswith("git_refnames ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["refnames"] = mo.group(1)
if line.strip().startswith("git_full ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["full"] = mo.group(1)
f.close()
except EnvironmentError:
pass
return keywords
def get_sqls(self):
"""This function extracts sqls from mysql general log file.
Returns:
A list of :class:`SQL`. For example:
[SQL('', u'select a.id, b.name from db.ac a join db.bc b on a.id=b.id or a.id=b.iid where a.cnt > 10')]
"""
general_log = open(self.log_path)
log = GeneralQueryLog(general_log)
session_db_map = {}
sqls = []
for entry in log:
if entry['command'] == 'Connect':
m = re.search('\s+on\s(?P<name>\w+)', entry['argument'])
if m:
session_db_map[entry['session_id']] = m.groupdict()['name'].strip()
elif entry['command'] == 'Init DB':
session_db_map[entry['session_id']] = entry['argument'].strip()
elif entry['command'] == 'Query':
sql = entry['argument']
if sql.strip()[:6].lower() == 'select':
yield SQL(session_db_map.get(entry['session_id'], ''), sql)
def git_get_keywords(versionfile_abs):
# the code embedded in _version.py can just fetch the value of these
# keywords. When used from setup.py, we don't want to import _version.py,
# so we do it with a regexp instead. This function is not used from
# _version.py.
keywords = {}
try:
f = open(versionfile_abs, "r")
for line in f.readlines():
if line.strip().startswith("git_refnames ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["refnames"] = mo.group(1)
if line.strip().startswith("git_full ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["full"] = mo.group(1)
f.close()
except EnvironmentError:
pass
return keywords
def git_get_keywords(versionfile_abs):
# the code embedded in _version.py can just fetch the value of these
# keywords. When used from setup.py, we don't want to import _version.py,
# so we do it with a regexp instead. This function is not used from
# _version.py.
keywords = {}
try:
f = open(versionfile_abs, "r")
for line in f.readlines():
if line.strip().startswith("git_refnames ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["refnames"] = mo.group(1)
if line.strip().startswith("git_full ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["full"] = mo.group(1)
f.close()
except EnvironmentError:
pass
return keywords
def scan_thread(keyword, catalog_json):
# Check each thread, threads who contains the keyword are returned
matched_threads = []
for i in range(len(catalog_json)):
for thread in catalog_json[i]["threads"]:
regex = r'\b{0}\b'.format(keyword)
# Search thread title
if 'sub' in thread:
if re.search(regex, str(thread["sub"]), re.IGNORECASE):
matched_threads.append(thread["no"])
# Search OPs post body
if 'com' in thread:
if re.search(regex, str(thread["com"]), re.IGNORECASE):
matched_threads.append(thread["no"])
return matched_threads
def alter_table(self, alter_string):
lines = alter_string.replace('\n', ' ').split(';')
for line in lines:
if 'PRIMARY KEY' in line:
table_name = re.search("TABLE `(\w+)`", line).group(1)
table = self.get_table_by_name(table_name)
primary_key_columns = re.findall("PRIMARY KEY \(`(\w+)`\)", line)
for primary_key_column in primary_key_columns:
table.add_primary_key(primary_key_column)
elif 'FOREIGN KEY' in line:
table_name = re.search("TABLE `(\w+)`", line).group(1)
table = self.get_table_by_name(table_name)
foreign_keys_list = re.findall("FOREIGN KEY \(`(\w+)`\) REFERENCES `(\w+)` \(`(\w+)`\)", line)
for column, foreign_table, foreign_column in foreign_keys_list:
table.add_foreign_key(column, foreign_table, foreign_column)
def allocate(self, ip_addr, name, platform, cpus, memory, disk):
"""When a node is found, scheduler calls this method with IP address,
name, CPUs, memory and disk available on that node. This method should
return a number indicating number of CPUs to use. If return value is 0,
the node is not used; if the return value is < 0, this allocation is
ignored (next allocation in the 'node_allocations' list, if any, is
applied).
"""
if not re.match(self.ip_rex, ip_addr):
return -1
if (self.platform and not re.search(self.platform, platform)):
return -1
if ((self.memory and memory and self.memory > memory) or
(self.disk and disk and self.disk > disk)):
return 0
if self.cpus > 0:
if self.cpus > cpus:
return 0
return self.cpus
elif self.cpus == 0:
return 0
else:
cpus += self.cpus
if cpus < 0:
return 0
return cpus
def allocate(self, ip_addr, name, platform, cpus, memory, disk):
"""When a node is found, scheduler calls this method with IP address,
name, CPUs, memory and disk available on that node. This method should
return a number indicating number of CPUs to use. If return value is 0,
the node is not used; if the return value is < 0, this allocation is
ignored (next allocation in the 'node_allocations' list, if any, is
applied).
"""
if not re.match(self.ip_rex, ip_addr):
return -1
if (self.platform and not re.search(self.platform, platform)):
return -1
if ((self.memory and memory and self.memory > memory) or
(self.disk and disk and self.disk > disk)):
return 0
if self.cpus > 0:
if self.cpus > cpus:
return 0
return self.cpus
elif self.cpus == 0:
return 0
else:
cpus += self.cpus
if cpus < 0:
return 0
return cpus
def Resolver(obj, path, full = False):
m = re.search('#/(.*)/(.*)', path)
x = None
if full:
b = obj[m.group(1)]
x = b[m.group(2)]
else:
x = m.group(2)
return x
def list_nics(nic_type=None):
"""Return a list of nics of given type(s)"""
if isinstance(nic_type, six.string_types):
int_types = [nic_type]
else:
int_types = nic_type
interfaces = []
if nic_type:
for int_type in int_types:
cmd = ['ip', 'addr', 'show', 'label', int_type + '*']
ip_output = subprocess.check_output(cmd).decode('UTF-8')
ip_output = ip_output.split('\n')
ip_output = (line for line in ip_output if line)
for line in ip_output:
if line.split()[1].startswith(int_type):
matched = re.search('.*: (' + int_type +
r'[0-9]+\.[0-9]+)@.*', line)
if matched:
iface = matched.groups()[0]
else:
iface = line.split()[1].replace(":", "")
if iface not in interfaces:
interfaces.append(iface)
else:
cmd = ['ip', 'a']
ip_output = subprocess.check_output(cmd).decode('UTF-8').split('\n')
ip_output = (line.strip() for line in ip_output if line)
key = re.compile('^[0-9]+:\s+(.+):')
for line in ip_output:
matched = re.search(key, line)
if matched:
iface = matched.group(1)
iface = iface.partition("@")[0]
if iface not in interfaces:
interfaces.append(iface)
return interfaces
def is_device_mounted(device):
'''Given a device path, return True if that device is mounted, and False
if it isn't.
:param device: str: Full path of the device to check.
:returns: boolean: True if the path represents a mounted device, False if
it doesn't.
'''
try:
out = check_output(['lsblk', '-P', device]).decode('UTF-8')
except:
return False
return bool(re.search(r'MOUNTPOINT=".+"', out))
def parseDebian(packages_file):
""" Parse debian package list to dict (name:version) """
result = {}
if args.clean==True: first_field = 0
else: first_field = 1
for line in packages_file:
if args.clean==True or line[:2] == 'ii':
fields = line.split()
if len(fields) < 2 + first_field: continue
# Software name
search = fields[first_field].find(':')
if search != -1:
soft_name = cleanName(fields[first_field][:search])
else:
soft_name = cleanName(fields[first_field])
# Version
search = re.search(r"-|\+|~", fields[first_field + 1])
if search:
soft_version = fields[first_field + 1][:search.span()[0]]
else:
soft_version = fields[first_field + 1]
search = soft_version.find(':')
if search != -1:
soft_version = soft_version[search + 1:]
soft_version = purgeVersionString(soft_version)
# Format check
if not soft_name or not soft_version: continue
# Intense package name split
if args.intense and '-' in soft_name:
for sub_package in soft_name.split('-'):
if len(sub_package)>2 and '.' not in sub_package and sub_package not in badpackages: result[sub_package] = soft_version
else:
if soft_name not in badpackages: result[soft_name] = soft_version
return result
def purgeVersionString(version_string):
""" Eliminate invalid characters and last dot from version string """
search = re.search(r'[^0-9.]', version_string)
if search: result = version_string[:search.span()[0]]
else: result = version_string
if len(result) > 0 and result[-1] == '.': result = result[:-1]
return result
def _search_for_query(self, query):
if query in self._search_pattern_cache:
return self._search_pattern_cache[query]
# Build pattern: include all characters
pattern = []
for c in query:
# pattern.append('[^{0}]*{0}'.format(re.escape(c)))
pattern.append('.*?{0}'.format(re.escape(c)))
pattern = ''.join(pattern)
search = re.compile(pattern, re.IGNORECASE).search
self._search_pattern_cache[query] = search
return search
def get_password(self, account, service=None):
"""Retrieve the password saved at ``service/account``.
Raise :class:`PasswordNotFound` exception if password doesn't exist.
:param account: name of the account the password is for, e.g.
"Pinboard"
:type account: ``unicode``
:param service: Name of the service. By default, this is the workflow's
bundle ID
:type service: ``unicode``
:returns: account password
:rtype: ``unicode``
"""
if not service:
service = self.bundleid
output = self._call_security('find-generic-password', service,
account, '-g')
# Parsing of `security` output is adapted from python-keyring
# by Jason R. Coombs
# https://pypi.python.org/pypi/keyring
m = re.search(
r'password:\s*(?:0x(?P<hex>[0-9A-F]+)\s*)?(?:"(?P<pw>.*)")?',
output)
if m:
groups = m.groupdict()
h = groups.get('hex')
password = groups.get('pw')
if h:
password = unicode(binascii.unhexlify(h), 'utf-8')
self.logger.debug('Got password : %s:%s', service, account)
return password