Python re 模块,findall() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用re.findall()。
def create_table(self, table_string):
lines = table_string.split("\n")
table = Table()
for line in lines:
if 'TABLE' in line:
table_name = re.search("`(\w+)`", line)
table.name = table_name.group(1)
if self.thesaurus_object is not None:
table.equivalences = self.thesaurus_object.get_synonyms_of_a_word(table.name)
elif 'PRIMARY KEY' in line:
primary_key_columns = re.findall("`(\w+)`", line)
for primary_key_column in primary_key_columns:
table.add_primary_key(primary_key_column)
else:
column_name = re.search("`(\w+)`", line)
if column_name is not None:
column_type = self.predict_type(line)
if self.thesaurus_object is not None:
equivalences = self.thesaurus_object.get_synonyms_of_a_word(column_name.group(1))
else:
equivalences = []
table.add_column(column_name.group(1), column_type, equivalences)
return table
def getDetailList(self,content):
s2 = r'<h2><a target="_blank" href="(.*?)" title="(.*?)"'
pattern =re.compile(s2 , re.S
)
result = re.findall(pattern, content)
with open('file.txt','w',encoding='gbk') as f:
f.write(content)
if not result:
print('???????..............')
threadsList=[]
for item in result:
t = threading.Thread(target = workthread, args=(item, self.user_agent, self.path))
threadsList.append(t)
t.start()
for threadid in threadsList:
threadid.join()
def ParseHtml(self, html):
soup = BeautifulSoup(html)
links = soup.findAll('a', attrs={'class': 'ulink'})
#print len(links)
if len(links) == 0: #the js return
# tmp_js = soup.find(name='script', attrs={'language': 'javascript'})
js_str = soup.script.string #two ways to get the <script></script>
new_url = js_str[16:-1] #get the new url
new_url = eval(new_url) #eval:??????????
self.ParseHtml(self.LoadPage(new_url))
else:
# print type(links)
for link in links:
# print type(link)
# print type(link.string)
# print unicode(link.string)
titles = re.findall(r'?(.+?)?', str(link.string)) #unicode(link.string))
if len(titles) <> 0:
print titles[0]
# print 'url is %s, title is %s.' %(link['href'], titles[0])
def get_profiles():
passwd=''
netsh_output = run_command("netsh wlan show profiles")
if "not running" in netsh_output:
net_wlan = run_command("net start wlansvc")
if "started successfully" in net_wlan:
netsh_output = run_command("netsh wlan show profiles")
else:
return net_wlan
if "no wireless interface" in netsh_output:
return netsh_output
else:
profiles=re.findall(': (.*)\r',netsh_output)
for x in profiles:
output= run_command('netsh wlan show profiles "{}" key=clear'.format(x))
#output=re.findall('(Key Content.*)\r',proc)
if output:
passwd += "\n{}\n{}\n\n".format(x,output)
return passwd
def getDetailList(self,content):
s2 = r'<h2><a target="_blank" href="(.*?)" title="(.*?)"'
pattern =re.compile(s2 , re.S
)
result = re.findall(pattern, content)
with open('file.txt','w',encoding='gbk') as f:
f.write(content)
if not result:
print('???????..............')
threadsList=[]
for item in result:
t = threading.Thread(target = workthread, args=(item, self.user_agent, self.path))
threadsList.append(t)
t.start()
for threadid in threadsList:
threadid.join()
def reassign_atom_mapping(transform):
'''This function takes an atom-mapped reaction and reassigns
the atom-mapping labels (numbers) from left to right, once
that transform has been canonicalized.'''
all_labels = re.findall('\:([0-9]+)\]', transform)
# Define list of replacements which matches all_labels *IN ORDER*
replacements = []
replacement_dict = {}
counter = 1
for label in all_labels: # keep in order! this is important
if label not in replacement_dict:
replacement_dict[label] = str(counter)
counter += 1
replacements.append(replacement_dict[label])
# Perform replacements in order
transform_newmaps = re.sub('\:[0-9]+\]',
lambda match: (':' + replacements.pop(0) + ']'),
transform)
return transform_newmaps
def _read_from_header(self):
a, b, c = self._get_header()
header = a
header['data_offset'] = b
header['nb_channels'] = c
#header['dtype_offset'] = int(header['ADC zero'])
header['gain'] = float(re.findall("\d+\.\d+", header['El'])[0])
header['data_dtype'] = self.params['data_dtype']
self.data = numpy.memmap(self.file_name, offset=header['data_offset'], dtype=header['data_dtype'], mode='r')
self.size = len(self.data)
self._shape = (self.size//header['nb_channels'], header['nb_channels'])
del self.data
return header
def title_command(bot, trigger):
"""
Show the title or URL information for the given URL, or the last URL seen
in this channel.
"""
if not trigger.group(2):
if trigger.sender not in bot.memory['last_seen_url']:
return
matched = check_callbacks(bot, trigger,
bot.memory['last_seen_url'][trigger.sender],
True)
if matched:
return
else:
urls = [bot.memory['last_seen_url'][trigger.sender]]
else:
urls = re.findall(url_finder, trigger)
results = process_urls(bot, trigger, urls)
for title, domain in results[:4]:
bot.reply('[ %s ] - %s' % (title, domain))
def test_trigger_single_event(self):
"""Test: Trigger click event on button, validate dispatched"""
regex = '([0-9]{1,3})'
original = eval(re.findall(regex, self.page.counter_label.text)[0])
self.page.js.trigger_event(
element=self.page.add_counter_button,
event='click'
)
for i in range(10):
if (original == eval(re.findall(regex, self.page.counter_label.text)[0])):
time.sleep(1)
else:
break
modified = eval(re.findall(regex, self.page.counter_label.text)[0])
self.assertEqual(
modified, original+1,
'Counter label was not modified as expected; %s clicks' % modified
)
def test_trigger_multiple_events(self):
"""Test: Trigger click event on button twice, validate dispatched"""
regex = '([0-9]{1,3})'
original = eval(re.findall(regex, self.page.counter_label.text)[0])
self.page.js.trigger_event(
element=self.page.add_counter_button,
event=('click', 'click')
)
for i in range(10):
if (original == eval(re.findall(regex, self.page.counter_label.text)[0])):
time.sleep(1)
else:
break
modified = eval(re.findall(regex, self.page.counter_label.text)[0])
self.assertEqual(
modified, original+2,
'Counter label was not modified as expected; %s clicks' % modified
)
def test_trigger_multiple_events_multiple_elements(self):
"""Test: Trigger click event on two buttons twice, validate dispatched"""
regex = '([0-9]{1,3})'
num_counter_original = eval(re.findall(regex, self.page.counter_label.text)[0])
num_users_original = len(self.page.user_cards)
self.page.js.trigger_event(
element=(self.page.add_counter_button, self.page.add_user_button),
event=('click', 'click')
)
for i in range(10):
if (num_counter_original == eval(re.findall(regex, self.page.counter_label.text)[0])):
time.sleep(1)
else:
break
num_counter_modified = eval(re.findall(regex, self.page.counter_label.text)[0])
self.assertEqual(
num_counter_modified, num_counter_original+2,
'Counter label was not modified as expected; %s clicks' % num_counter_modified
)
self.assertEqual(
len(self.page.user_cards), num_users_original+2,
'Expected %s user cards found %s' % (
num_users_original+2, len(self.page.user_cards)
)
)
def calc_rs_modality(self) -> Dict[str, float]:
modality_counter = Counter()
for i, s in enumerate(self.sentences):
chunks = []
for bnst in self.knp.parse(s).bnst_list():
chunk = Chunk(chunk_id=bnst.bnst_id,
link=bnst.parent,
description=bnst.fstring)
chunks.append(chunk)
s = "".join([chunk.description for chunk in chunks])
ms = set(re.findall("<?????-(.+?)>", s))
modality_counter += Counter(ms)
n = len(self.sentences)
return dict([(k, float(c) / n)
for k, c in modality_counter.items()])
def get_transcript_gc_content(self, transcript_obj):
pattern = re.compile('[cCgG]')
gc, length = 0, 0
for interval in transcript_obj.intervals:
if interval.chrom not in self.chroms:
continue
seq = self.chroms[interval.chrom][interval.start:interval.end]
gc += len(re.findall(pattern, seq))
length += interval.length
if length > 0:
return float(gc) / float(length)
else:
return 0
# NOTE: these stub classes are necessary to maintain backwards compatibility with old refdata (1.2 or older)
def get_info(self):
'''
????????????url???url
Get informations of the comic
return:
comic title,description,cover url,chapters' urls
'''
headers={'use-agent':"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36",'Referer':'http://manhua.dmzj.com/tags/s.shtml'}
root='http://manhua.dmzj.com'
r_title=r'<span class="anim_title_text"><a href=".*?"><h1>(.*?)</h1></a></span>'
r_des=r'<meta name=\'description\' content=".*?(??.*?)"/>'#????
r_cover=r'src="(.*?)" id="cover_pic"/></a>'#??url??
r_cb=r'<div class="cartoon_online_border" >([\s\S]*?)<div class="clearfix"></div>'#??border
r_cs=r'<li><a title="(.*?)" href="(.*?)" .*?>.*?</a>'#??????
try:
text=requests.get(self.comic_url,headers=headers).text
except ConnectionError:
traceback.print_exc()
raise ConnectionError
title=re.findall(r_title,text)[0]
cb=re.findall(r_cb,text)[0]
chapter_urls=[(c[0],root+c[1]+'#@page=1') for c in re.findall(r_cs,cb)]
cover_url=re.findall(r_cover,text)[0]
des=re.findall(r_des,text)
return title,des,cover_url,chapter_urls
def make_html_content_and_add_tags(self):
# ????? ???? ?????
p = re.compile(r'(#\w+)')
# findall???? ???? ????? ???
tag_name_list = re.findall(p, self.content)
# ?? content(Comment??)? ??? ??
ori_content = self.content
# ????? ????
for tag_name in tag_name_list:
# Tag??? ????? ??, ????? ???? ????? _??
tag, _ = Tag.objects.get_or_create(name=tag_name.replace('#', ''))
# ?? content? ??? ??
change_tag = '<a href="{url}" class="hash-tag">{tag_name}</a>'.format(
# url=reverse('post:hashtag_post_list', args=[tag_name.replace('#', '')]),
url=reverse('post:hashtag_post_list',
kwargs={'tag_name': tag_name.replace('#', '')}),
tag_name=tag_name
)
ori_content = re.sub(r'{}(?![<\w])'.format(tag_name), change_tag, ori_content, count=1)
# content? ??? Tag??? ??? tags??? ??
if not self.tags.filter(pk=tag.pk).exists():
self.tags.add(tag)
# ??? ??? ???? html_content? ??
self.html_content = ori_content
super().save(update_fields=['html_content'])
def parse_sitemap(content):
if not isinstance(content, six.text_type):
content = content.decode('utf-8')
urlset_match = re.search(
r'<urlset[^>]*>(?P<urls>[\s\S]*)</urlset>', content
)
if urlset_match:
results = []
urlset_content = urlset_match.groupdict()['urls']
for url_content in re.findall(r'<url>([\s\S]+)</url>', urlset_content):
results.append(
dict(
re.findall(r'<([^>]+)>([^<]*)</[^>]+>', url_content)
)
)
else:
results = None
return results
def get_players():
with open("../../junk/iemoakland.html") as f:
html = f.read()
output = []
teams = re.findall('(?s)<div class="influencer-card">(.*?)<!-- Card End -->', html)
counter = 1
for t in teams:
team_name = re.search('<h1 class="influencer-name">([^<]+)</h1>', t).group(1)
player_section = re.search('(?s)<p class="influencer-description">(.*?)</p>', t).group(1)
players = re.findall('(?:<a[^>]+>)?\s*(.*?)(?:</a>)?<br />', player_section)
if len(players) < 4:
print(team_name)
print(players)
for player in players:
if '<a hre' in player:
player = re.search('<a[^>]+>([^<]+)', player).group(1)
output.append({"id": counter, "name": player, "team": team_name, "value": 10.0})
counter += 1
with open("../../lib/pubg_players.py", "w+") as f:
f.write("pubg_init = " + repr(output))
return
def get_summary(url):
""" ?????????? """
res = CONNECTION.get(url).text
# ???????
term_id = re.search(r'termId : "(\d+)"', res).group(1)
names = re.findall(r'name:"(.+)"', res)
# ????
course_name = names[0]
# ????
institution = names[1]
# ?????
dir_name = REG_FILE.sub('', course_name + ' - ' + institution)
print(dir_name)
return term_id, dir_name
def get_summary(url):
""" ?????????? """
res = CONNECTION.get(url).text
# ???????
term_id = re.search(r'termId : "(\d+)"', res).group(1)
names = re.findall(r'name:"(.+)"', res)
# ????
course_name = names[0]
# ????
institution = names[1]
# ?????
dir_name = REG_FILE.sub('', course_name + ' - ' + institution)
print(dir_name)
return term_id, dir_name
def get_announce(term_id):
""" ??????? """
# batchId ?????? str(int(time.time() * 1000))
post_data = {'callCount': '1', 'scriptSessionId': '${scriptSessionId}190', 'httpSessionId': 'dba4977be78d42a78a6e2c2dd2b9bb42', 'c0-scriptName': 'CourseBean', 'c0-methodName': 'getAllAnnouncementByTerm', 'c0-id': '0', 'c0-param0': 'number:' + term_id, 'c0-param1': 'number:1', 'batchId': str(int(time.time() * 1000))}
res = CONNECTION.post('http://mooc.study.163.com/dwr/call/plaincall/CourseBean.getAllAnnouncementByTerm.dwr', data=post_data).text
announcements = re.findall(r'content="(.*?[^\\])".*title="(.*?[^\\])"', res)
with open(os.path.join(BASE_DIR, 'Announcements.html'), 'w', encoding='utf-8') as announce_file:
for announcement in announcements:
# ????
announce_content = announcement[0].encode('utf-8').decode('unicode_escape')
# ????
announce_title = announcement[1].encode('utf-8').decode('unicode_escape')
announce_file.write('<h1>' + announce_title + '</h1>\n' + announce_content + '\n')
def pdf_as_matrix(buff, border):
"""\
Reads the path in the PDF and returns it as list of 0, 1 lists.
:param io.BytesIO buff: Buffer to read the matrix from.
"""
pdf = buff.getvalue()
h, w = re.search(br'/MediaBox \[0 0 ([0-9]+) ([0-9]+)\]', pdf,
flags=re.MULTILINE).groups()
if h != w:
raise ValueError('Expected equal height/width, got height="{}" width="{}"'.format(h, w))
size = int(w) - 2 * border
graphic = _find_graphic(buff)
res = [[0] * size for i in range(size)]
for x1, y1, x2, y2 in re.findall(r'\s*(\-?\d+)\s+(\-?\d+)\s+m\s+'
r'(\-?\d+)\s+(\-?\d+)\s+l', graphic):
x1, y1, x2, y2 = [int(i) for i in (x1, y1, x2, y2)]
y = abs(y1)
res[y][x1:x2] = [1] * (x2 - x1)
return res
def get(self, netloc, ua, timeout):
try:
headers = {'User-Agent': ua, 'Referer': netloc}
result = _basic_request(netloc, headers=headers, timeout=timeout)
match = re.findall('xhr\.open\("GET","([^,]+),', result)
if not match:
return False
url_Parts = match[0].split('"')
url_Parts[1] = '1680'
url = urlparse.urljoin(netloc, ''.join(url_Parts))
match = re.findall('rid=([0-9a-zA-Z]+)', url_Parts[0])
if not match:
return False
headers['Cookie'] = 'rcksid=%s' % match[0]
result = _basic_request(url, headers=headers, timeout=timeout)
return self.getCookieString(result, headers['Cookie'])
except:
return
# not very robust but lazieness...
def get(self, result):
try:
s = re.compile("S\s*=\s*'([^']+)").findall(result)[0]
s = base64.b64decode(s)
s = s.replace(' ', '')
s = re.sub('String\.fromCharCode\(([^)]+)\)', r'chr(\1)', s)
s = re.sub('\.slice\((\d+),(\d+)\)', r'[\1:\2]', s)
s = re.sub('\.charAt\(([^)]+)\)', r'[\1]', s)
s = re.sub('\.substr\((\d+),(\d+)\)', r'[\1:\1+\2]', s)
s = re.sub(';location.reload\(\);', '', s)
s = re.sub(r'\n', '', s)
s = re.sub(r'document\.cookie', 'cookie', s)
cookie = '' ; exec(s)
self.cookie = re.compile('([^=]+)=(.*)').findall(cookie)[0]
self.cookie = '%s=%s' % (self.cookie[0], self.cookie[1])
return self.cookie
except:
pass
def odnoklassniki(url):
try:
media_id = re.compile('//.+?/.+?/([\w]+)').findall(url)[0]
result = client.request('http://ok.ru/dk', post={'cmd': 'videoPlayerMetadata', 'mid': media_id})
result = re.sub(r'[^\x00-\x7F]+', ' ', result)
result = json.loads(result).get('videos', [])
hd = []
for name, quali in {'ultra': '4K', 'quad': '1440p', 'full': '1080p', 'hd': 'HD'}.items():
hd += [{'quality': quali, 'url': i.get('url')} for i in result if i.get('name').lower() == name]
sd = []
for name, quali in {'sd': 'SD', 'low': 'SD', 'lowest': 'SD', 'mobile': 'SD'}.items():
sd += [{'quality': quali, 'url': i.get('url')} for i in result if i.get('name').lower() == name]
url = hd + sd[:1]
if not url == []: return url
except:
return
def cldmailru(url):
try:
v = url.split('public')[-1]
r = client.request(url)
r = re.sub(r'[^\x00-\x7F]+', ' ', r)
tok = re.findall('"tokens"\s*:\s*{\s*"download"\s*:\s*"([^"]+)', r)[0]
url = re.findall('"weblink_get"\s*:\s*\[.+?"url"\s*:\s*"([^"]+)', r)[0]
url = '%s%s?key=%s' % (url, v, tok)
return url
except:
return
def yandex(url):
try:
cookie = client.request(url, output='cookie')
r = client.request(url, cookie=cookie)
r = re.sub(r'[^\x00-\x7F]+', ' ', r)
sk = re.findall('"sk"\s*:\s*"([^"]+)', r)[0]
idstring = re.findall('"id"\s*:\s*"([^"]+)', r)[0]
idclient = binascii.b2a_hex(os.urandom(16))
post = {'idClient': idclient, 'version': '3.9.2', 'sk': sk, '_model.0': 'do-get-resource-url', 'id.0': idstring}
post = urllib.urlencode(post)
r = client.request('https://yadi.sk/models/?_m=do-get-resource-url', post=post, cookie=cookie)
r = json.loads(r)
url = r['models'][0]['data']['file']
return url
except:
return
def geturl(url):
try:
r = client.request(url, output='geturl')
if r == None: return r
host1 = re.findall('([\w]+)[.][\w]+$', urlparse.urlparse(url.strip().lower()).netloc)[0]
host2 = re.findall('([\w]+)[.][\w]+$', urlparse.urlparse(r.strip().lower()).netloc)[0]
if host1 == host2: return r
proxies = sorted(get(), key=lambda x: random.random())
proxies = sorted(proxies, key=lambda x: random.random())
proxies = proxies[:3]
for p in proxies:
p += urllib.quote_plus(url)
r = client.request(p, output='geturl')
if not r == None: return parse(r)
except:
pass
def movie(self, imdb, title, localtitle, aliases, year):
try:
t = 'http://www.imdb.com/title/%s' % imdb
t = client.request(t, headers={'Accept-Language': 'es-AR'})
t = client.parseDOM(t, 'title')[0]
t = re.sub('(?:\(|\s)\d{4}.+', '', t).strip().encode('utf-8')
q = self.search_link % urllib.quote_plus(t)
q = urlparse.urljoin(self.base_link, q)
r = client.request(q)
r = client.parseDOM(r, 'div', attrs = {'class': 'item'})
r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'span', attrs = {'class': 'tt'}), client.parseDOM(i, 'span', attrs = {'class': 'year'})) for i in r]
r = [(i[0][0], i[1][0], i[2][0]) for i in r if len(i[0]) > 0 and len(i[1]) > 0 and len(i[2]) > 0]
r = [i[0] for i in r if cleantitle.get(t) == cleantitle.get(i[1]) and year == i[2]][0]
url = re.findall('(?://.+?|)(/.+)', r)[0]
url = client.replaceHTMLCodes(url)
url = url.encode('utf-8')
return url
except:
pass
def searchMovie(self, title, year, aliases, headers):
try:
title = cleantitle.normalize(title)
url = urlparse.urljoin(self.base_link, self.search_link % urllib.quote_plus(cleantitle.getsearch(title)))
r = client.request(url, headers=headers, timeout='15')
r = client.parseDOM(r, 'div', attrs={'class': 'ml-item'})
r = zip(client.parseDOM(r, 'a', ret='href'), client.parseDOM(r, 'a', ret='title'))
results = [(i[0], i[1], re.findall('\((\d{4})', i[1])) for i in r]
try:
r = [(i[0], i[1], i[2][0]) for i in results if len(i[2]) > 0]
url = [i[0] for i in r if self.matchAlias(i[1], aliases) and (year == i[2])][0]
except:
url = None
pass
if (url == None):
url = [i[0] for i in results if self.matchAlias(i[1], aliases)][0]
return url
except:
return
def searchMovie(self, title, year, aliases, headers):
try:
title = cleantitle.normalize(title)
url = urlparse.urljoin(self.base_link, self.search_link % urllib.quote_plus(cleantitle.getsearch(title)))
r = client.request(url, headers=headers, timeout='15')
r = client.parseDOM(r, 'div', attrs={'class': 'item-detail'})
r = zip(client.parseDOM(r, 'a', ret='href'), client.parseDOM(r, 'a', ret='title'))
results = [(i[0], i[1], re.findall('\((\d{4})', i[1])) for i in r]
try:
r = [(i[0], i[1], i[2][0]) for i in results if len(i[2]) > 0]
url = [i[0] for i in r if self.matchAlias(i[1], aliases) and (year == i[2])][0]
except:
url = None
pass
if (url == None):
url = [i[0] for i in results if self.matchAlias(i[1], aliases)][0]
url = '%s/watch' % url
return url
except:
return
def movie(self, imdb, title, localtitle, aliases, year):
try:
t = cleantitle.get(title)
p = self.post_link % urllib.quote_plus(cleantitle.query(title))
q = urlparse.urljoin(self.base_link, self.search_link)
r = proxy.request(q, 'playing top', post=p, XHR=True)
r = client.parseDOM(r, 'li')
r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'a')) for i in r]
r = [(i[0][0], i[1][0]) for i in r if i[0] and i[1]]
r = [(i[0], re.findall('(.+?)\((\d{4})', i[1])) for i in r]
r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if i[1]]
r = [i for i in r if t == cleantitle.get(i[1]) and str(year) == i[2]]
url = proxy.parse(r[0][0])
url = re.findall('(?://.+?|)(/.+)', url)[0]
url = client.replaceHTMLCodes(url)
url = url.encode('utf-8')
return url
except:
pass
def searchMovie(self, title, year, aliases, headers):
try:
title = cleantitle.normalize(title)
url = urlparse.urljoin(self.base_link, self.search_link % cleantitle.geturl(title))
r = client.request(url, headers=headers, timeout='15')
r = client.parseDOM(r, 'div', attrs={'class': 'ml-item'})
r = zip(client.parseDOM(r, 'a', ret='href'), client.parseDOM(r, 'a', ret='title'))
results = [(i[0], i[1], re.findall('\((\d{4})', i[1])) for i in r]
try:
r = [(i[0], i[1], i[2][0]) for i in results if len(i[2]) > 0]
url = [i[0] for i in r if self.matchAlias(i[1], aliases) and (year == i[2])][0]
except:
url = None
pass
if (url == None):
url = [i[0] for i in results if self.matchAlias(i[1], aliases)][0]
url = '%s/watch' % url
return url
except:
return
def searchMovie(self, title, year, aliases):
try:
url = '%s/%s-%s/' % (self.base_link, cleantitle.geturl(title), year)
url = client.request(url, output='geturl')
if url == None:
t = cleantitle.get(title)
q = '%s %s' % (title, year)
q = urlparse.urljoin(self.base_link, self.search_link % urllib.quote_plus(q))
r = client.request(q)
r = client.parseDOM(r, 'div', attrs={'class': 'inner'})
r = client.parseDOM(r, 'div', attrs={'class': 'info'})
r = zip(client.parseDOM(r, 'a', ret='href'), client.parseDOM(r, 'a', ret='title'))
r = [(i[0], re.findall('(?:^Watch Movie |^Watch movies |^Watch |)(.+?)\((\d{4})', i[1])) for i in r]
r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if i[1]]
url = [i[0] for i in r if self.matchAlias(i[1], aliases) and year == i[2]][0]
if url == None: raise Exception()
return url
except:
return
def searchMovie(self, title, year, aliases, headers):
try:
title = cleantitle.normalize(title)
url = urlparse.urljoin(self.base_link, self.search_link % cleantitle.geturl(title))
r = client.request(url, headers=headers, timeout='15')
r = client.parseDOM(r, 'div', attrs={'class': 'ml-item'})
r = zip(client.parseDOM(r, 'a', ret='href'), client.parseDOM(r, 'a', ret='title'))
results = [(i[0], i[1], re.findall('\((\d{4})', i[1])) for i in r]
try:
r = [(i[0], i[1], i[2][0]) for i in results if len(i[2]) > 0]
url = [i[0] for i in r if self.matchAlias(i[1], aliases) and (year == i[2])][0]
except:
url = None
pass
if (url == None):
url = [i[0] for i in results if self.matchAlias(i[1], aliases)][0]
url = urlparse.urljoin(self.base_link, '%s/watching.html' % url)
return url
except:
return
def searchMovie(self, title, year, aliases, headers):
try:
title = cleantitle.normalize(title)
url = urlparse.urljoin(self.base_link, self.search_link % urllib.quote_plus(cleantitle.getsearch(title)))
r = client.request(url, headers=headers, timeout='15')
r = client.parseDOM(r, 'div', attrs={'class': 'ml-item'})
r = zip(client.parseDOM(r, 'a', ret='href'), client.parseDOM(r, 'a', ret='title'))
results = [(i[0], i[1], re.findall('\((\d{4})', i[1])) for i in r]
try:
r = [(i[0], i[1], i[2][0]) for i in results if len(i[2]) > 0]
url = [i[0] for i in r if self.matchAlias(i[1], aliases) and (year == i[2])][0]
except:
url = None
pass
if (url == None):
url = [i[0] for i in results if self.matchAlias(i[1], aliases)][0]
return url
except:
return
def movie(self, imdb, title, localtitle, aliases, year):
try:
query = urlparse.urljoin(self.base_link, self.search_link)
query = query % urllib.quote_plus(title)
t = cleantitle.get(title)
r = client.request(query)
r = client.parseDOM(r, 'div', attrs = {'class': 'thumb'})
r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'a', ret='title'), re.findall('(\d{4})', i)) for i in r]
r = [(i[0][0], i[1][0], i[2][0]) for i in r if len(i[0]) > 0 and len(i[1]) > 0 and len(i[2]) > 0]
url = [i[0] for i in r if t in cleantitle.get(i[1]) and year == i[2]][0]
return url
except:
return
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
data = urlparse.parse_qs(url)
data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
query = urlparse.urljoin(self.base_link, self.search_link)
query = query % urllib.quote_plus(data['tvshowtitle'])
t = cleantitle.get(data['tvshowtitle'])
r = client.request(query)
r = client.parseDOM(r, 'div', attrs = {'class': 'thumb'})
r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'a', ret='title'), re.findall('(\d{4})', i)) for i in r]
r = [(i[0][0], i[1][0], i[2][0]) for i in r if len(i[0]) > 0 and len(i[1]) > 0 and len(i[2]) > 0]
url = [i[0] for i in r if t in cleantitle.get(i[1]) and ('Season %s' % season) in i[1]][0]
url += '?episode=%01d' % int(episode)
return url
except:
return
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
data = urlparse.parse_qs(url)
data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
year = re.findall('(\d{4})', premiered)[0]
season = '%01d' % int(season) ; episode = '%01d' % int(episode)
tvshowtitle = '%s %s: Season %s' % (data['tvshowtitle'], year, season)
url = cache.get(self.pidtv_tvcache, 120, tvshowtitle)
if url == None: raise Exception()
url += '?episode=%01d' % int(episode)
url = url.encode('utf-8')
return url
except:
return
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
data = urlparse.parse_qs(url)
data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
headers = eval(data['headers'])
aliases = eval(data['aliases'])
title = data['tvshowtitle'] if 'tvshowtitle' in data else data['title']
title = cleantitle.getsearch(title)
query = self.search_link % (urllib.quote_plus(title))
query = urlparse.urljoin(self.base_link, query)
r = client.request(query, headers=headers, timeout='30', mobile=True)
match = re.compile('alias=(.+?)\'">(.+?)</a>').findall(r)
r = [(i[0], re.findall('(.+?)\s+-\s+Season\s+(\d+)', i[1])) for i in match]
r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if len(i[1]) > 0]
r = [i[0] for i in r if self.matchAlias(i[1], aliases) and int(season) == int(i[2])][0]
url = {'type': 'tvshow', 'id': r, 'episode': episode, 'season': season, 'headers': headers}
url = urllib.urlencode(url)
return url
except:
return
def movie(self, imdb, title, localtitle, aliases, year):
try:
if debrid.status() == False: raise Exception()
t = cleantitle.get(title)
query = self.search_link + urllib.quote_plus(title)
query = urlparse.urljoin(self.base_link, query)
r = client.request(query, XHR=True)
r = json.loads(r)
r = [i for i in r if 'category' in i and 'movie' in i['category'].lower()]
r = [(i['url'], i['label']) for i in r if 'label' in i and 'url' in i]
r = [(i[0], re.findall('(.+?) \((\d{4})', i[1])) for i in r]
r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if len(i[1]) > 0]
r = [i[0] for i in r if t == cleantitle.get(i[1]) and year == i[2]][0]
url = re.findall('(?://.+?|)(/.+)', r)[0]
url = client.replaceHTMLCodes(url)
url = url.encode('utf-8')
return url
except:
return
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
data = urlparse.parse_qs(url)
data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
year = re.findall('(\d{4})', premiered)[0]
if int(year) >= 2016: raise Exception()
url = re.sub('[^A-Za-z0-9]', '-', data['tvshowtitle']).lower()
url = self.tvsearch_link % (url, data['year'], '%01d' % int(season), '%01d' % int(episode))
r = urlparse.urljoin(self.base_link, url)
r = client.request(r, output='geturl')
if not data['year'] in r: raise Exception()
return url
except:
return
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
if url == None: return
tv_maze = tvmaze.tvMaze()
num = tv_maze.episodeAbsoluteNumber(tvdb, int(season), int(episode))
num = str(num)
url = urlparse.urljoin(self.base_link, url)
r = client.request(url)
r = client.parseDOM(r, 'tr', attrs = {'class': ''})
r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'td', attrs = {'class': 'epnum'})) for i in r]
r = [(i[0][0], i[1][0]) for i in r if len(i[0]) > 0 and len(i[1]) > 0]
r = [i[0] for i in r if num == i[1]][0]
url = re.findall('(?://.+?|)(/.+)', r)[0]
url = client.replaceHTMLCodes(url)
url = url.encode('utf-8')
return url
except:
return
def movie(self, imdb, title, localtitle, aliases, year):
try:
query = self.search_link % (urllib.quote_plus(title))
query = urlparse.urljoin(self.base_link, query)
c, h = self.__get_cookies(query)
t = cleantitle.get(title)
r = client.request(query, headers=h, cookie=c)
r = client.parseDOM(r, 'div', attrs={'class': 'cell_container'})
r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'a', ret='title')) for i in r]
r = [(i[0][0], i[1][0]) for i in r if len(i[0]) > 0 and len(i[1]) > 0]
r = [(i[0], re.findall('(.+?) \((\d{4})', i[1])) for i in r]
r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if len(i[1]) > 0]
r = [i[0] for i in r if t == cleantitle.get(i[1]) and year == i[2]][0]
url = re.findall('(?://.+?|)(/.+)', r)[0]
url = client.replaceHTMLCodes(url)
url = url.encode('utf-8')
return url
except:
return
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
if not url:
return
data = urlparse.parse_qs(url)
data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
url = self.__search([data['tvshowtitle']] + source_utils.aliases_to_array(eval(data['aliases'])), data['year'], season)
if not url: return
r = client.request(urlparse.urljoin(self.base_link, url))
r = dom_parser.parse_dom(r, 'div', attrs={'class': 'ep_link'})
r = dom_parser.parse_dom(r, 'a', req='href')
r = [(i.attrs['href'], i.content) for i in r if i]
r = [(i[0], re.findall("^(?:episode)\s*(\d+)$", i[1], re.I)) for i in r]
r = [(i[0], i[1][0] if i[1] else '0') for i in r]
r = [i[0] for i in r if int(i[1]) == int(episode)][0]
return source_utils.strip_domain(r)
except:
return
def searchMovie(self, title, year, aliases, headers):
try:
title = cleantitle.normalize(title)
url = urlparse.urljoin(self.base_link, self.search_link % urllib.quote_plus(cleantitle.getsearch(title)))
r = client.request(url, headers=headers, timeout='15')
r = client.parseDOM(r, 'div', attrs={'class': 'ml-item'})
r = zip(client.parseDOM(r, 'a', ret='href'), client.parseDOM(r, 'a', ret='title'))
results = [(i[0], i[1], re.findall('\((\d{4})', i[1])) for i in r]
try:
r = [(i[0], i[1], i[2][0]) for i in results if len(i[2]) > 0]
url = [i[0] for i in r if self.matchAlias(i[1], aliases) and (year == i[2])][0]
except:
url = None
pass
if (url == None):
url = [i[0] for i in results if self.matchAlias(i[1], aliases)][0]
url = '%s/watch/' % url
return url
except:
return
def resolve(self, url):
try:
b = urlparse.urlparse(url).netloc
b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]
if not b in base64.b64decode(self.b_link): return url
u, p, h = url.split('|')
r = urlparse.parse_qs(h)['Referer'][0]
#u += '&app_id=Exodus'
c = self.request(r, output='cookie', close=False)
result = self.request(u, post=p, referer=r, cookie=c)
url = result.split('url=')
url = [urllib.unquote_plus(i.strip()) for i in url]
url = [i for i in url if i.startswith('http')]
url = url[-1]
return url
except:
return
def searchMovie(self, title, year, aliases, headers):
try:
title = cleantitle.normalize(title)
url = urlparse.urljoin(self.base_link, self.search_link % (cleantitle.geturl(title.replace('\'', '-'))))
r = client.request(url, timeout='10', headers=headers)
r = client.parseDOM(r, 'h2', attrs={'class': 'tit'})
r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'a', ret='title')) for i in r]
r = [(i[0][0], i[1][0]) for i in r if len(i[0]) > 0 and len(i[1]) > 0]
r = [(i[0], re.findall('(.+?) \((\d{4})', i[1])) for i in r]
r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if len(i[1]) > 0]
try:
match = [i[0] for i in r if self.matchAlias(i[1], aliases) and year == i[2]][0]
except:
match = [i[0] for i in r if self.matchAlias(i[1], aliases)][0]
url = re.findall('(?://.+?|)(/.+)', match)[0]
url = client.replaceHTMLCodes(url)
return url.encode('utf-8')
except:
return
def __search(self, title, localtitle, year, content_type):
try:
t = cleantitle.get(title)
tq = cleantitle.get(localtitle)
y = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1), '0']
query = urlparse.urljoin(self.base_link, self.search_link)
post = urllib.urlencode({'k': "%s"}) % tq
r = client.request(query, post=post)
r = json.loads(r)
r = [i.get('result') for i in r if i.get('type', '').encode('utf-8') == content_type]
r = [(i.get('url'), i.get('originalTitle'), i.get('title'), i.get('anneeProduction', 0), i.get('dateStart', 0)) for i in r]
r = [(i[0], re.sub('<.+?>|</.+?>', '', i[1] if i[1] else ''), re.sub('<.+?>|</.+?>', '', i[2] if i[2] else ''), i[3] if i[3] else re.findall('(\d{4})', i[4])[0]) for i in r if i[3] or i[4]]
r = sorted(r, key=lambda i: int(i[3]), reverse=True) # with year > no year
r = [i[0] for i in r if i[3] in y and (t.lower() == cleantitle.get(i[1].lower()) or tq.lower() == cleantitle.query(i[2].lower()))][0]
url = re.findall('(?://.+?|)(/.+)', r)[0]
url = client.replaceHTMLCodes(url)
url = url.encode('utf-8')
return url
except:
return
def __search(self, titles, year, season='0'):
try:
query = urlparse.urljoin(self.base_link, self.search_link)
t = [cleantitle.get(i) for i in set(titles) if i]
y = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1), '0']
r = client.request(query, post={'do': 'search', 'subaction': 'search', 'search_start': 0, 'full_search': 0, 'result_from': 1, 'story': cleantitle.query(titles[0])})
r = dom_parser.parse_dom(r, 'div', attrs={'class': 'fullstream'})
r = [(dom_parser.parse_dom(i, 'h3', attrs={'class': 'mov-title'}), dom_parser.parse_dom(i, 'div', attrs={'class': 'fullmask'})) for i in r]
r = [(dom_parser.parse_dom(i[0], 'a', req='href'), dom_parser.parse_dom(i[1], 'a', attrs={'href': re.compile('.*/year/\d+')})) for i in r]
r = [(i[0][0].attrs['href'], i[0][0].content, i[1][0].content if i[1] else '0') for i in r if i[0]]
r = [(i[0], i[1], i[2], re.findall('(.+?)\s+(?:\s*-\s*saison)\s+(\d+)', i[1], re.I)) for i in r]
r = [(i[0], i[3][0][0] if len(i[3]) > 0 else i[1], i[2], i[3][0][1] if len(i[3]) > 0 else '0') for i in r]
r = [(i[0], i[1], i[2], '1' if int(season) > 0 and i[3] == '0' else i[3]) for i in r]
r = sorted(r, key=lambda i: int(i[2]), reverse=True) # with year > no year
r = [i[0] for i in r if cleantitle.get(i[1]) in t and i[2] in y and int(i[3]) == int(season)][0]
return source_utils.strip_domain(r)
except:
return