我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用lxml.html.fromstring()。
def main(): for url in url_list : try: r = requests.get(url) except : continue tree = html.fromstring(r.text) script = tree.xpath('//script[@language="javascript"]/text()')[0] json_string = regex.findall(script)[0] json_data = json.loads(json_string) next_page_url = tree.xpath('//footer/a/@href') links = [domain + x['nodeRef'] for x in json_data] for link in links: extract(link)
def slack(text: hug.types.text): """Returns JSON containing an attachment with an image url for the Slack integration""" title = text if text == 'top250': top250_res = requests.get(IMDB_URL + '/chart/toptv', headers={'Accept-Language': 'en'}) top250_page = html.fromstring(top250_res.text) candidates = top250_page.xpath('//*[@data-caller-name="chart-top250tv"]//tr/td[2]/a') title = random.choice(candidates).text return dict( response_type='in_channel', attachments=[ dict(image_url=GRAPH_URL + f'/graph?title={quote(title)}&uuid={uuid.uuid4()}') ] )
def sns_notification(body): json_body = body.decode('utf8') js = json.loads(json_body.replace('\n', '')) if js["Type"] == "Notification": arg_info = js["Message"] arg_info = json.loads(arg_info) content = arg_info['content'] subject = arg_info['mail']['commonHeaders']['subject'] html_content = content.partition('Content-Type: text/html; charset=UTF-8')[2] if 'Content-Transfer-Encoding' in html_content: html_content = html_content.partition('Content-Transfer-Encoding: quoted-printable')[2] text = html_content.replace('\r\n', '') table = html.fromstring(text) content = '' for item in table: if item.text: content += item.text.strip() mail_content = str(content) from_mail = arg_info['mail']['source'] to_mail = arg_info['mail']['destination'][0] hash_code = arg_info['mail']['destination'][0].split('@')[0] return subject, from_mail, to_mail, hash_code, mail_content
def scrape_mtgs_images(url='http://www.mtgsalvation.com/spoilers/183-hour-of-devastation', mtgscardurl='http://www.mtgsalvation.com/cards/hour-of-devastation/', exemptlist=[]): page = requests.get(url) tree = html.fromstring(page.content) cards = {} cardstree = tree.xpath('//*[contains(@class, "log-card")]') for child in cardstree: if child.text in exemptlist: continue childurl = mtgscardurl + child.attrib['data-card-id'] + '-' + child.text.replace( ' ', '-').replace("'", "").replace(',', '').replace('-//', '') cardpage = requests.get(childurl) tree = html.fromstring(cardpage.content) cardtree = tree.xpath('//img[contains(@class, "card-spoiler-image")]') try: cardurl = cardtree[0].attrib['src'] except: cardurl = '' pass cards[child.text] = { "url": cardurl } time.sleep(.2) return cards
def scrape_masterpieces(url='http://www.mtgsalvation.com/spoilers/181-amonkhet-invocations', mtgscardurl='http://www.mtgsalvation.com/cards/amonkhet-invocations/'): page = requests.get(url) tree = html.fromstring(page.content) cards = [] cardstree = tree.xpath('//*[contains(@class, "log-card")]') for child in cardstree: childurl = mtgscardurl + \ child.attrib['data-card-id'] + '-' + child.text.replace(' ', '-') cardpage = requests.get(childurl) tree = html.fromstring(cardpage.content) cardtree = tree.xpath('//img[contains(@class, "card-spoiler-image")]') try: cardurl = cardtree[0].attrib['src'] except: cardurl = '' pass card = { "name": child.text, "url": cardurl } cards.append(card) return cards
def parse_lista_diputados(response): tree = fromstring(response.content) # listado de diputados diputados = tree.xpath('//div[@class="listado_1"]/ul/li/a/@href') for diputado in diputados: diputado_url = urljoin(response.url, diputado) response = requests.get(diputado_url) parse_diputado(response) # proxima pagina pagina_siguiente = tree.xpath('//a[contains(., "Página Siguiente")]/@href') if pagina_siguiente: pagina_siguiente_url = pagina_siguiente[0] response = requests.get(pagina_siguiente_url) parse_lista_diputados(response)
def set_vokrugsveta_wallpaper(): try: r = requests.get(URL04) if r.status_code == 200: doc = fromstring(r.text) results = doc.cssselect('a.article__pic') url = 'http://www.vokrugsveta.ru/' + results[0].get('href') print(url) r = requests.get(url, stream=True) if r.status_code == 200: doc = fromstring(r.text) results = doc.cssselect('img') for index, result in enumerate(results): print(index, result.get('src')) i_url = 'http://www.vokrugsveta.ru/' + results[2].get('src') if download(i_url) is True: set_background(comun.POTD) print(url) except Exception as e: print(e)
def get_searx_version(response_container): response_html = response_container.content.decode() try: dom = html.fromstring(response_html) except etree.XMLSyntaxError: # not a valid HTML document # TODO workaround with regex ? return '' searx_full_version = extract_text_from_dom(dom, "/html/head/meta[@name='generator']/@content") if searx_full_version is None: searx_version = '' else: s = searx_full_version.split('/') if len(s) == 2: searx_version = s[1] else: searx_version = searx_full_version return searx_version
def extract_news(news_url): # Fetch html session_requests = requests.session() response = session_requests.get(news_url, headers=getHeaders()) news = {} try: # Parse html tree = html.fromstring(response.content) # Extract information news = tree.xpath(GET_CNN_NEWS_XPATH) news = ''.join(news) except Exception as e: print # coding=utf-8 return {} return news
def get(self, user_id): r = requests.get('http://grouple.co/user/%s/bookmarks' % user_id) tree = html.fromstring(r.text) tds = tree.xpath('//table')[0].xpath('//tr')[1:] mangas = list() for o in tds: item = o.xpath('.//a')[0] manga = { 'name': item.xpath('./text()')[0], 'path': item.xpath('./@href')[0], 'summary': item.xpath('./@title')[0].split(': ', 1)[-1] } item = item.xpath('../a')[1] manga.update({ 'preview': item.xpath('./@rel')[0], 'id': java_hash_code(manga['path']), 'provider': provider_name(manga['path']) }) if manga['provider'] is not None: mangas.append(manga) return {'all': mangas}
def procura_emprego(): busca = raw_input("[+] - Digite o nome da vaga ou uma palavra-chave: ").replace(' ','+').lower() url = "http://empregacampinas.com.br/page/1/?s="+busca #prox_pagina = 0 while True: try: r = requests.get(url, timeout=2) tree = html.fromstring(r.content) vagas = tree.xpath('//*[@id="article"]/div/div/div/div/a/h2/text()') link = tree.xpath('//*[@id="article"]/div/div/div/div/a[@title]/@href') if len(vagas) > 1: qtd_vagas = len(vagas) - 1 else: qtd_vagas = len(vagas) pagina = url.split('/')[4] info_vaga(qtd_vagas,pagina,vagas,link) #PEGA NOVA URL url = tree.xpath('//*[@class="nextpostslink"]/@href')[0] except: menu()
def scrap_twitlonger(twitlonger): ''' Takes a twitlonger post ID, scraps the body of the post and then returns a string depending on the contents of the post. If the hour is stated in said post, it's added If it's not, then it's implied it's current time. Note to self: Implement GMT - whatever our president decides to change it to. ''' page = requests.get('http://www.twitlonger.com/show/%s' %twitlonger) tree = html.fromstring(page.content) texto = tree.xpath('/html/body/div[2]/div[1]/div[3]/div/p[1]/text()') hora = re.search('[0-9]+:[0-9]+',texto[0]) circuitos = texto[0].split(str('detallados a continuación: ').decode('utf-8'))[1].split(str(' #ElNiñoNoEsJuego').decode('utf-8'))[0] if hora: return "La luz se ira a las " + hora.group(0) + " en " + circuitos else: hora = re.search('En momentos',texto[0]) if hora: return "La luz se ira a las " + str(datetime.datetime.now().time()) + " en " + circuitos
def lxml_test(): url = "http://www.caixunzz.com" req = urllib2.Request(url=url) resp = urllib2.urlopen(req) #print resp.read() ''' parse_body=html.fromstring(resp.read()) href=parse_body.xpath('//a[@class="label"]/@href') print href #not working from above ''' tree = etree.HTML(resp.read()) href = tree.xpath('//a[@class="label"]/@href') #print href.tag for i in href: #print html.tostring(i) #print type(i) print i print type(href) #not working yet
def scrape_url(url): #url = 'api-ref-compute-v2.1.html' page = requests.get('http://developer.openstack.org/' + url) tree = html.fromstring(page.content) #Create a list of HTTP verbs verbs = tree.xpath('//a[@class="operation-anchor"]/following::span[1]/text()') operations = tree.xpath('//a[@class="operation-anchor"]/following::div[1]/text()') #Match up Verbs and Operations and output a printed list methods = zip(verbs, operations) print len(verbs) print len(operations) if len(verbs) == len(operations): for verbs, operations in methods: print verbs + ' ' + operations else: print "Number of verbs doesn't match number of operations for ", page.url
def __init__(self): self.name = SOURCE_NAME _file, r = conf.getFeedData(SOURCE_NAME, SOURCE_FILE, unpack=False) zipobj = zipfile.ZipFile(BytesIO(_file)) self.cves = defaultdict(dict) for filename in zipobj.namelist(): with zipobj.open(filename) as infile: page = fromstring(infile.read().decode("utf-8")) vendor = page.xpath("//table[1]//tr[1]//td[2]") if vendor: vendor = vendor[0].text.lower() rows = page.xpath("//table[2]//tr//td") # CVE - Source ID IDs = [[rows[i].text, [x.text for x in rows[i+1].iterchildren()]] for i in range(0, len(rows), 2)] for e in IDs: vendorID = e[0] if not e[0].startswith(vendor.upper()+':') else e[0][len(vendor)+1:] for cve in e[1]: if vendor not in self.cves[cve]: self.cves[cve][vendor] = [] if vendorID not in self.cves[cve][vendor]: self.cves[cve][vendor].append(vendorID)
def get_html_text(url): response = requests.get(url) origin_text = response.text origin_text = re.sub(r'<script.*?>.*?</script>', '', origin_text, flags=re.I | re.M | re.DOTALL) origin_text = re.sub(r'<style.*?>.*?</style>', '', origin_text, flags=re.I | re.M | re.DOTALL) doc = html.fromstring(origin_text) text = doc.xpath('//body//text()') text = [i.strip() for i in text if i.strip()] text = ' '.join(text) seg = jieba.cut(text) stopwords = read_stopwords('./utils/stopwords.txt') # callable read_stopwords() seg = [i.strip() for i in seg if i.strip() and not i.strip().isdigit() and i.strip() not in stopwords] seg = ' '.join(seg) return seg
def _get_quotes(self): '''Gets book's quote data''' if self._page_source is None: return quotes_page = self._page_source.xpath('//a[@class="actionLink" and contains(., "More quotes")]') quotes = [] if len(quotes_page) > 0: resp = open_url(self._connection, quotes_page[0].get('href')) if not resp: return quotes_page = html.fromstring(resp) if quotes_page is None: return for quote in quotes_page.xpath('//div[@class="quoteText"]'): quotes.append(re.sub(r'\s+', ' ', quote.text).strip().decode('ascii', 'ignore')) else: for quote in self._page_source.xpath('//div[@class=" clearFloats bigBox" and contains(., "Quotes from")]//div[@class="bigBoxContent containerWithHeaderContent"]//span[@class="readable"]'): quotes.append(re.sub(r'\s+', ' ', quote.text).strip().decode('ascii', 'ignore')) return quotes
def _get_book_info_from_tooltips(self, book_info): '''Gets books ASIN, title, authors, image url, description, and rating information''' if isinstance(book_info, tuple): book_info = [book_info] books_data = [] link_pattern = 'resources[Book.{0}][type]=Book&resources[Book.{0}][id]={0}' tooltips_page_url = '/tooltips?' + "&".join([link_pattern.format(book_id) for book_id, image_url in book_info]) tooltips_page_info = json.loads(open_url(self._connection, tooltips_page_url))['tooltips'] for book_id, image_url in book_info: book_data = tooltips_page_info['Book.{0}'.format(book_id)] if not book_data: continue book_data = html.fromstring(book_data) parsed_data = self._parse_tooltip_info(book_data, book_id, image_url) if not parsed_data: continue books_data.append(parsed_data) return books_data
def __call__(self, doc, encoding='UTF-8'): if isinstance(doc, (str, bytes)): doc = fromstring(bytes(bytearray(doc, encoding=encoding)), parser=HTMLParser(encoding=encoding)) if not isinstance(doc, HtmlElement): return None for cls in self.EXTRACTORS: extract = cls() tags_ = extract(doc) if tags_: tags = [] for idx, tag in enumerate(tags_): if idx < 2 and len(tag) > 16: break elif len(tag) < 16: tags.append(tag) else: if tags: logger.info('TagExtractor got tags %s', tags) return tags
def walkListItems(sess, url): try: global visited def replacewhite(text): return re.sub(r'(\ |\r|\n|\t)+', ' ', text) resp = sess.get(url=url) root = html.fromstring(resp.text) tds = root.xpath(".//*[@class='kboard-list']//tr/td[2]") for td in tds: href = td.xpath(".//a")[0].attrib['href'] href = urljoin(url, href) href = re.sub(r'pageid=\d+', '', href) if href in visited: continue text = re.sub(r'(\ |\r|\n|\t)+', ' ', td.text_content()) if '???' not in text: continue print(text) visited[href] = (text) walkPageItem(sess, href, text) except BaseException as ex: traceback.print_exc() print(ex)
def walkNextPages(sess, url="https://iptime.com/iptime/?page_id=126&dffid=1&dfsid=11"): try: from os.path import basename def get_pageid(url): from urllib.parse import parse_qsl, urlsplit qs = dict(parse_qsl(urlsplit(url).query)) return int(qs.get("pageid", "1")) while True: pageid = get_pageid(url) print("pageid=%d" % pageid) walkListItems(sess, url) root = html.fromstring(sess.get(url=url).text) arrows = [basename(_) for _ in root.xpath(".//ul[@class='pages']//img/@src")] if 'next_1.gif' not in arrows: break nexturl = next(_ for _ in root.xpath(".//ul[@class='pages']//img") if basename(_.attrib['src']) == 'next_1.gif') url = urljoin(url, nexturl.xpath("../../a/@href")[0]) nextpageid = get_pageid(url) assert nextpageid == pageid+1 except BaseException as ex: traceback.print_exc() print(ex)
def main(): global executor try: session = requests.Session() executor = ThreadPoolExecutor() os.makedirs(dlDir, exist_ok=True) url = 'http://www.zyxel.com/us/en/support/download_landing.shtml' with open('zyxel_us_filelist.csv', 'w') as fout: cw = csv.writer(fout) cw.writerow(['model', 'fver', 'fname', 'furl', 'fdate', 'fsize', 'sha1', 'md5']) resp = session.get(url=url) root = html.fromstring(resp.text) models = get_all_models(root) for modelName in sorted(models.keys()): kbid = models[modelName] resp2 = session.get(url='http://www.zyxel.com/us/en/support/DownloadLandingSR.shtml', params=dict(c="us", l="en", kbid=kbid, md=modelName)) walkFiles(modelName, session, resp2) except BaseException as ex: traceback.print_exc() finally: print('Wait for exeuctor shuddown') executor.shutdown(True)
def main(): global executor try: session = requests.Session() executor = ThreadPoolExecutor() os.makedirs(dlDir, exist_ok=True) url='http://downloadcenter.netgear.com' with open('netgear_filelist.csv', 'w') as fout: cw = csv.writer(fout) cw.writerow(['model', 'fw_ver', 'fileName', 'fw_url', 'fw_date', 'fileSize', 'sha1', 'md5']) response = session.get(url=url) root = html.fromstring(response.text) href = root.xpath(".//a[@id='ctl00_ctl00_ctl00_mainContent_localizedContent_bodyCenter_BasicSearchPanel_btnAdvancedSearch']/@href") href = strip_js(href[0]) formdata = {"__EVENTTARGET": href} resp2 = form_submit(session, root, url, "aspnetForm", formdata, {"Referer": url}) walkCategories(session, resp2) except BaseException as ex: traceback.print_exc() finally: executor.shutdown(True)
def walkCategories(session, response): try: root = html.fromstring(response.text) url = response.url categories = root.xpath(".//select[@name='ctl00$ctl00$ctl00$mainContent$localizedContent$bodyCenter$adsPanel$lbProductCategory']/option") global startCat for iCat, category in enumerate(categories[startCat:], startCat): startCat=0 rsrc = category.xpath("./@value")[0] text = category.xpath(".//text()")[0] print('Category="%s", iCat=%d'%(text, iCat)) formdata= {"__EVENTTARGET": "ctl00$ctl00$ctl00$mainContent$localizedContent$bodyCenter$adsPanel$lbProductCategory", "ctl00$ctl00$ctl00$mainContent$localizedContent$bodyCenter$adsPanel$lbProductCategory": rsrc, "__ASYNCPOST:": "true"} resp2 = form_submit(session, root, url, "aspnetForm", formdata, {"Referer": url}) if not resp2: continue walkFamilies(session, resp2) except BaseException as ex: print('iCat=%d, cat="%s"'%(iCat, text)) traceback.print_exc()
def walkProducts(session, response): try: root = html.fromstring(response.text) products = root.xpath("//select[@name='ctl00$ctl00$ctl00$mainContent$localizedContent$bodyCenter$adsPanel$lbProduct']/option") url = response.url global startProd for iProd, product in enumerate(products[startProd:], startProd): startProd=0 rsrc = product.xpath("./@value")[0] text = product.xpath(".//text()")[0] print('Product="%s", iProd=%d'%(text, iProd)) formdata={"__EVENTTARGET": "ctl00$ctl00$ctl00$mainContent$localizedContent$bodyCenter$adsPanel$lbProduct", "ctl00$ctl00$ctl00$mainContent$localizedContent$bodyCenter$adsPanel$lbProduct": rsrc, "__ASYNCPOST:": "true"} resp2 = form_submit(session, root, url, "aspnetForm", formdata, {"Referer": url}) if not resp2: print('Ignored iProd=%d, product="%s"'%(iProd, text)) continue walkFirmwares(resp2, product) except BaseException as ex: print('Error iProd=%d, product="%s"'%(iProd, text)) traceback.print_exc()
def walkFirmwares(response, product): try: root = html.fromstring(response.text) firmwares = root.xpath("//div[@id='LargeFirmware']//a") for iFirm, firmware in enumerate(firmwares): text = firmware.xpath(".//text()") if "firmware" in " ".join(text).lower(): # print('Firmware="%s", iFirmware=%d'%(text, iFirm)) desc = text[0] href = firmware.xpath("./@data-durl") if not href: href = firmware.xpath("./@href") url = href[0] model = product.xpath(".//text()")[0] print('model="%s", desc="%s", url=%s'%(model, desc, url)) global executor, visited if url in visited: continue visited[url] = (model,desc) executor.submit(download_file, model, desc, url) except BaseException as ex: traceback.print_exc()
def main(): global executor try: session = requests.Session() executor = ThreadPoolExecutor() os.makedirs(dlDir, exist_ok=True) url = 'http://support.netgear.cn/' with open('netgear_cn_filelist.csv', 'w') as fout: cw = csv.writer(fout) cw.writerow(['model', 'fver', 'fname', 'furl', 'fdate', 'fsize', 'sha1', 'md5']) resp = session.get(url=url) root = html.fromstring(resp.text) startProd = 1 prods = root.xpath(".//select[@name='select']/option") for iProd, prod in enumerate(prods[startProd:], startProd): # prodText = prod.xpath("./text()")[0].strip() prodUrl = prod.xpath("./@value")[0].strip() walkProd(session, urljoin(resp.url, prodUrl)) except BaseException as ex: traceback.print_exc() finally: print('Wait for exeuctor shuddown') executor.shutdown(True)
def get_list(): os.system('clear') print "Liste aliniyor..." worst_response = requests.get(worst_page) worst_tree = LH.fromstring(worst_response.content) for atag in worst_tree.xpath(worst_list): details_response = requests.get(worst_page + atag.attrib['href']) details_tree = LH.fromstring(details_response.content) for vuln in details_tree.xpath(heartbleed): if vuln.text_content().startswith('Yes'): print WARNING + worst_page + atag.attrib['href'] + ENDC elif vuln.text_content().startswith('No'): print worst_page + atag.attrib['href'] else: print FAIL + worst_page + atag.attrib['href'] + ENDC
def get_corresponding_author_info(self): """Try to get corresponding author information. Returns (scopus-id, name, email). """ resp = requests.get(self.scopus_link) from lxml import html parsed_doc = html.fromstring(resp.content) for div in parsed_doc.body.xpath('.//div'): for a in div.xpath('a'): if '/cdn-cgi/l/email-protection' in a.get('href', ''): encoded_text = a.attrib['href'].replace('/cdn-cgi/l/email-protection#', '') key = int(encoded_text[0:2], 16) email = ''.join([chr(int('0x{}'.format(x), 16) ^ key) for x in map(''.join, zip(*[iter(encoded_text[2:])]*2))]) for aa in div.xpath('a'): if 'http://www.scopus.com/authid/detail.url' in aa.get('href', ''): scopus_url = aa.attrib['href'] name = aa.text else: scopus_url, name = None, None return (scopus_url, name, email)
def osu(cmd, message, args): if args: osu_input = '%20'.join(args) try: profile_url = 'https://osu.ppy.sh/u/' + osu_input async with aiohttp.ClientSession() as session: async with session.get(profile_url) as data: page = await data.text() root = html.fromstring(page) username = root.cssselect('.profile-username')[0].text[:-1] user_color = str(message.author.color)[1:] sig_url = f'https://lemmmy.pw/osusig/sig.php?colour=hex{user_color}&uname={osu_input}' response = discord.Embed(color=message.author.color) response.set_image(url=sig_url) response.set_author(name=f'{username}\'s osu! Profile', url=profile_url, icon_url=osu_logo) except IndexError: response = discord.Embed(color=0xBE1931, title='? Unable to retrieve profile.') else: response = discord.Embed(color=0xBE1931, title='? Nothing inputted.') await message.channel.send(None, embed=response)
def bash(cmd, message, args): if len(cache) == 0: async with aiohttp.ClientSession() as session: async with session.get('http://bash.org/?random1') as page: page = await page.text() quotes = html.fromstring(page).cssselect('body center table tr td[valign="top"]')[0] for index in range(1, len(quotes), 2): qid = quotes[index - 1][0][0].text score = quotes[index - 1][2].text quote = quotes[index].text_content() quote = { 'id': qid[1:], 'score': score, 'quote': quote } cache.append(quote) quote = cache.pop() # skip quotes that are not fitting into message character limit while len(quote['quote']) > 2037: quote = cache.pop() text = quote['quote'] highlight = 'xml' if text.strip()[0] == '<' else 'yaml' embed = Embed(type='rich', color=0xf7d7c4, description=f'```{highlight}\n{text}\n```') embed.set_author(name=f"?? #{quote['id']} | Score: {quote['score']}", url=f"http://bash.org/?{quote['id']}") await message.channel.send(None, embed=embed)
def joke(cmd, message, args): randomizer = secrets.randbelow(6644) joke_url = f'http://jokes.cc.com/feeds/random/{randomizer}' async with aiohttp.ClientSession() as session: async with session.get(joke_url) as data: joke_json = await data.read() joke_json = json.loads(joke_json) joke_page_url = joke_json['0']['url'] async with aiohttp.ClientSession() as session: async with session.get(joke_page_url) as data: page_data = await data.text() root = html.fromstring(page_data) content = root.cssselect('.content_wrap')[0] joke_text = '' for element in content.cssselect('p'): if element.text != '' and element.text != '\n': joke_text += f'\n{element.text}' while ' ' in joke_text: joke_text = joke_text.replace(' ', ' ') joke_text = ftfy.fix_text(joke_text) embed = discord.Embed(color=0xFFDC5D) embed.add_field(name='?? Have A Random Joke', value=joke_text) await message.channel.send(None, embed=embed)
def cyanideandhappiness(cmd, message, args): comic_img_url = None comic_url = None while not comic_img_url: comic_number = secrets.randbelow(4665) + 1 comic_url = f'http://explosm.net/comics/{comic_number}/' async with aiohttp.ClientSession() as session: async with session.get(comic_url) as data: page = await data.text() root = html.fromstring(page) comic_element = root.cssselect('#main-comic') comic_img_url = comic_element[0].attrib['src'] if comic_img_url.startswith('//'): comic_img_url = 'https:' + comic_img_url embed = discord.Embed(color=0xFF6600) embed.set_image(url=comic_img_url) cnh_image = 'https://i.imgur.com/jJl7FoT.jpg' embed.set_author(name='Cyanide and Happiness', icon_url=cnh_image, url=comic_url) await message.channel.send(None, embed=embed)
def grab_post_list(tags): links = [] for x in range(0, 20): resource = f'http://safebooru.org/index.php?page=dapi&s=post&q=index&tags={tags}&pid={x}' async with aiohttp.ClientSession() as session: async with session.get(resource) as data: data = await data.read() posts = html.fromstring(data) for post in posts: if 'file_url' in post.attrib: file_url = post.attrib['file_url'] extention = file_url.split('.')[-1] if extention in ['png', 'jpg', 'jpeg', 'gif']: height = int(post.attrib['height']) width = int(post.attrib['width']) if width < 2000 and height < 2000: links.append(post) return links
def replace_arch_section(self, cr, uid, view_id, section_xpath, replacement, context=None): # the root of the arch section shouldn't actually be replaced as it's # not really editable itself, only the content truly is editable. [view] = self.browse(cr, uid, [view_id], context=context) arch = etree.fromstring(view.arch.encode('utf-8')) # => get the replacement root if not section_xpath: root = arch else: # ensure there's only one match [root] = arch.xpath(section_xpath) root.text = replacement.text root.tail = replacement.tail # replace all children del root[:] for child in replacement: root.append(copy.deepcopy(child)) return arch
def scrape_subraces_old(href): url = settings.betfair_url + href + settings.betfair_url2_end r = requests.get(url) data = [] if r.status_code == 200: datajson = r.json() domtree = html.fromstring(datajson['children']) ul = domtree.xpath('//ul[@class="children"]')[0] lis = ul.xpath('li') for li in lis: item = {} item['title'] = li.xpath('a/@market-name')[0] try: item['identifier'] = li.xpath('a/@market-id')[0] t = time.localtime(int(li.xpath('a/@market-time')[0]) / 1000) item['date'] = time.strftime('%Y-%m-%d %H:%M:%S', t) data.append(item) except: data = data + scrape_subraces(li.xpath('a/@href')[0]) return(data)
def extract(url): global img_no try : img_no += 1 r = requests.get(url) tree = html.fromstring(r.text) div = tree.xpath('//table[@class="masterresultstable"]\ //div[@class="meshtext-wrapper-left"]') except : div=[] if div != []: div = div[0] else: return typ = div.xpath('.//strong/text()')[0] items = div.xpath('.//li/text()') img = tree.xpath('//img[@id="theImage"]/@src')[0] final_data[img_no] = {} final_data[img_no]['type'] = typ final_data[img_no]['items'] = items final_data[img_no]['img'] = domain + img try : urllib.urlretrieve(domain+img, path+str(img_no)+".png") with open('data_new.json', 'w') as f: json.dump(final_data, f) output = "Downloading Images : {}".format(img_no) sys.stdout.write("\r\x1b[K" + output) sys.stdout.flush() except :return
def get_links(query): urlencoded_query = urllib.parse.quote_plus(query) r = requests.get("https://duckduckgo.com/html/?q=" + urlencoded_query, headers={'User-Agent': USER_AGENT}) tree = html.fromstring(r.content) return tree.xpath('//h2[@class="result__title"]/a[@class="result__a"]/@href')
def scrape_web(website): r = requests.get(website, timeout=5) tree = html.fromstring(r.content) rss_links = tree.xpath('//link[@rel="alternate" and @type="application/atom+xml"]/@href') if len(rss_links) == 0: raise NoLinkError(website) else: return urllib.parse.urljoin(website, rss_links[0])
def getHearthpwnIdAndUrl(name, set, type, isToken, session): log.debug("getHearthpwnIdAndUrl() getting for %s", name) # hearthpwn is also weird hpname_hacked = name.replace('-', ' ').replace('!', '') premium = 0 if isToken else 1 # filter-name={}&filter-premium={}&filter-type={}&filter-set={} r = session.get(setUrlTempl.format(hpname_hacked, premium, hsTypeId[type], setNameIds[set])) r.raise_for_status() html = fromstring(r.text) images = html.xpath('//td[@class="visual-image-cell"]/a/img') descs = html.xpath('//td[@class="visual-details-cell"]/h3/a') for i in range(len(images)): title = descs[i].text if title.lower() == name.lower(): image = images[i].get('src') if not image: image = 'http://media-hearth.cursecdn.com/avatars/148/738/687.png' # /cards/31128-annoy-o-tron-fanclub hpid = hpIdRegex.match(images[i].get('data-href')).group(1) return int(hpid), image.replace('http://', 'https://').lower() log.debug("getHearthpwnIdAndUrl() card not found at hearthpwn '%s' '%s'", set, name) raise Exception("getHearthpwnIdAndUrl() card " + name + " not found at hearthpwn")
def loadTokens(tokens = {}, wantedTokens = {}): resultCards = {} with requests.Session() as session: for name, ids in wantedTokens.items(): card = None if 'id' in ids: card = tokens[ids['id']] if name != card['name']: log.warning('loadTokens() names do not match: %s - %s', name, tokens[ids['id']]['name']) if 'id' not in ids: for token in tokens.values(): if name == token['name']: if card: log.warning('loadTokens() found token again: %s', name) card = token if not card: log.warning('loadTokens() could not find: %s', name) exit() r = session.get('http://www.hearthpwn.com/cards/{}'.format(ids['hpwn'])) r.raise_for_status() image = fromstring(r.text).xpath('//img[@class="hscard-static"]')[0].get('src') if not image: image = 'https://media-hearth.cursecdn.com/avatars/148/738/687.png' card['cdn'] = image.replace('http://', 'https://').lower() card['hpwn'] = ids['hpwn'] card['head'] = getHearthHeadId(card['name'], "ignored", "ignored") # since jade golem: overwrite scraped stats with prepared ones card['atk'] = ids.get('atk', card['atk']) card['cost'] = ids.get('cost', card['cost']) card['hp'] = ids.get('hp', card['hp']) resultCards[card['name']] = card print('.', end='') return resultCards
def lxml(self): """Get an lxml etree if possible.""" if ('html' not in self.mimetype and 'xml' not in self.mimetype): raise AttributeError('Not an HTML/XML response') from lxml import etree try: from lxml.html import fromstring except ImportError: fromstring = etree.HTML if self.mimetype == 'text/html': return fromstring(self.data) return etree.XML(self.data)
def analy_following_profile(self,html_text): tree = html.fromstring(html_text) url_list = tree.xpath("//h2[@class='ContentItem-title']//span[@class='UserLink UserItem-name']//a[@class='UserLink-link']/@href") for target_url in url_list: target_url = "https://www.zhihu.com" + target_url target_url = target_url.replace("https", "http") if red.sadd('red_had_spider', target_url): red.lpush('red_to_spider', target_url)
def list_mtgs_gallery(url=''): if url == '': return '' page = requests.get(url) tree = html.fromstring(page.content) cards = [] cardstree = tree.xpath('//*[contains(@class, "log-card")]') for child in cardstree: cards.append(child.text) return cards
def scrape_fullspoil(url="http://magic.wizards.com/en/articles/archive/card-image-gallery/hour-devastation", setinfo={"code": "HOU"}, showRarityColors=False, showFrameColors=False, manual_cards=[], delete_cards=[]): if 'name' in setinfo: url = 'http://magic.wizards.com/en/articles/archive/card-image-gallery/' + setinfo['name'].lower().replace('of', '').replace( ' ', ' ').replace(' ', '-') page = requests.get(url) tree = html.fromstring(page.content) cards = [] cardtree = tree.xpath('//*[@id="content-detail-page-of-an-article"]') for child in cardtree: cardElements = child.xpath('//*/p/img') cardcount = 0 for cardElement in cardElements: card = { "name": cardElement.attrib['alt'].replace(u"\u2019", '\'').split(' /// ')[0], "img": cardElement.attrib['src'] } card["url"] = card["img"] #card["cmc"] = 0 #card["manaCost"] = "" #card["type"] = "" #card["types"] = [] #card["text"] = "" #card["colorIdentity"] = [""] # if card['name'] in split_cards: # card["names"] = [card['name'], split_cards[card['name']]] # card["layout"] = "split" #notSplit = True # for backsplit in split_cards: # if card['name'] == split_cards[backsplit]: # notSplit = False # if not card['name'] in delete_cards: cards.append(card) cardcount += 1 fullspoil = {"cards": cards} print "Spoil Gallery has " + str(cardcount) + " cards." download_images(fullspoil['cards'], setinfo['code']) fullspoil = get_rarities_by_symbol(fullspoil, setinfo['code']) fullspoil = get_mana_symbols(fullspoil, setinfo['code']) #fullspoil = get_colors_by_frame(fullspoil, setinfo['code']) return fullspoil
def get_html_tree(): """Gets and converts the management interface page into a parsable tree.""" try: with requests.Session() as s: s.get(_config['base_url'] + _config['welcome_page'], data=_config['welcome_credentials']) s.post(_config['base_url'] + _config['login_page'], data=_config['login_credentials']) r = s.get(_config['base_url'] + _config['management_page']) except Exception as e: logging.error(str(e)) raise e return html.fromstring(r.content)
def main(): """Command line entry point.""" import argparse import sys parser = argparse.ArgumentParser( description=sys.modules[__name__].__doc__) parser.add_argument( 'article_file', metavar='ARTICLE', type=argparse.FileType(), help='path to Wiktionary article file') parser.add_argument( '-z', '--zim-file', action='store_true', help='treat the article file as a ZIM archive, instead of HTML ' 'source') parser.add_argument( '-d', '--debug', action='store_true', help='enable debugging output') args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.debug else logging.INFO) if args.zim_file: article_tuples = ZimFile(args.article_file).article_tuples() else: article_tuples = [(None, None, args.article_file.read())] for article_tuple in article_tuples: context = {'edition': article_tuple[0], 'pagename': article_tuple[1]} doc = html.fromstring(article_tuple[2]) for translation in parse_document(doc): translation.update(context) print json.dumps(translation)
def get_tree(page): page = page.replace(" ", " ") # otherwise starts-with for lxml doesn't work try: tree = html.fromstring(page) except (etree.XMLSyntaxError, etree.ParserError) as e: print u"not parsing, beause etree error in get_tree: {}".format(e) tree = None return tree