我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用scrapy.Request()。
def parse_answer(self, reponse): # ??question?answer ans_json = json.loads(reponse.text) is_end = ans_json["paging"]["is_end"] next_url = ans_json["paging"]["next"] # ??answer????? for answer in ans_json["data"]: answer_item = ZhihuAnswerItem() answer_item["zhihu_id"] = answer["id"] answer_item["url"] = answer["url"] answer_item["question_id"] = answer["question"]["id"] answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None answer_item["content"] = answer["content"] if "content" in answer else None answer_item["parise_num"] = answer["voteup_count"] answer_item["comments_num"] = answer["comment_count"] answer_item["create_time"] = answer["created_time"] answer_item["update_time"] = answer["updated_time"] answer_item["crawl_time"] = datetime.datetime.now() yield answer_item if not is_end: yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)
def parse_book(self, response): item = BookItem() sel = Selector(response) e = sel.xpath("//div[@id='wrapper']") item['name'] = e.xpath("./descendant::h1/descendant::span/text()").extract() item['author'] = e.xpath("//*[@id='info']/span[1]/a/text()").extract() item['bookinfo'] = e.xpath("//*[@id='info']/text()").extract() item['score'] = e.xpath('//*[@id="interest_sectl"]/div/div[2]/strong/text()').extract() item['commentNum'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@property = "v:votes"]/text()').extract() item['fivestar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][1]/text()').extract() item['fourstar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][2]/text()').extract() item['threestar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][3]/text()').extract() item['twostar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][4]/text()').extract() item['onestar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][5]/text()').extract() item['tag'] = response.xpath("//*[@id = 'db-tags-section']/descendant::a/text()").extract() request = scrapy.Request(response.url + "/comments/hot", callback=self.parse_review) # ??????????? request.meta['item'] = item return request # ???????????
def parse(self, response): item = BookItem() sel = Selector(response) e = sel.xpath("//div[@id='wrapper']") item['name'] = e.xpath("./descendant::h1/descendant::span/text()").extract() item['author'] = e.xpath("//*[@id='info']/span[1]/a/text()").extract() item['bookinfo'] = e.xpath("//*[@id='info']/text()").extract() item['score'] = e.xpath('//*[@id="interest_sectl"]/div/div[2]/strong/text()').extract() item['commentNum'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@property = "v:votes"]/text()').extract() item['fivestar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][1]/text()').extract() item['fourstar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][2]/text()').extract() item['threestar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][3]/text()').extract() item['twostar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][4]/text()').extract() item['onestar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][5]/text()').extract() item['tag'] = response.xpath("//*[@id = 'db-tags-section']/descendant::a/text()").extract() request = scrapy.Request(response.url + "/comments/hot", callback=self.parse_review) # ??????????? request.meta['item'] = item return request # ???????????
def generate_productlist(self, response): product_list = response.xpath("//a[@class='sellPoint']/@href").extract() for product_url in product_list: yield scrapy.Request( 'http:' + product_url, callback=self.generate_product_detail ) # ????? # next_page = response.xpath("//a[@class='cur']/following-sibling::*[1]/@href").extract()[0] page_key = int(response.meta['page_key']) if page_key < 100: yield scrapy.Request( response.url.replace('-' + str(page_key) + '.html', '-' + str(page_key + 1) + '.html'), meta={"page_key": page_key+1}, callback=self.generate_productlist )
def start_requests(self): yield scrapy.Request( 'http://bbs.zhiyoo.com/', meta={"page_key": 1, "proxy": MongoClient.get_random_proxy()}, callback=self.generate_forum ) for index in self.forum_arr: yield scrapy.Request( 'http://bbs.zhiyoo.com/source/module/forum/tab_ajax.php?index=nav_' + str(index), meta={"page_key": 1, "proxy": MongoClient.get_random_proxy()}, callback=self.generate_forum ) # yield scrapy.Request( # 'http://bbs.zhiyoo.com/forum-401-1.html', # callback=self.generate_forum_page_list # )
def generate_forum_url_list(self, response): all_a_tags = response.xpath('//a/@href').extract() forum_dict = {} for a_tag in all_a_tags: if a_tag.find("forum") != -1: if a_tag in forum_dict: forum_dict[a_tag] += 1 else: forum_dict[a_tag] = 1 for a_href in forum_dict: yield scrapy.Request( a_href, meta={"page_key": 1}, dont_filter='true', callback=self.get_record_list ) # ??????? for a_href in self.forum_url: yield scrapy.Request( a_href, meta={"page_key": 1}, dont_filter='true', callback=self.get_record_list )
def generate_firm_content(self, response): qitem = YQichachaItem() qitem._id = re.search(u'firm_(.*)(\.html)$', response.url).group(1) qitem.name = response.xpath("//div[contains(@class, 'company-top-name')]/text()").extract()[0] base_info = list() base_info.append({"????": self.clean_content(response.xpath( "//span[contains(@class, 'm_comInfo')]").extract()[0])}) qitem.base_info = base_info qitem.save() chacha_url_pre = self.url_qichacha_pre + '/company_getinfos?unique=' + qitem._id + '&companyname='+qitem.name yield scrapy.Request( chacha_url_pre +'&tab=base', callback=self.generate_firm_base, cookies=self.qicha_cookie, encoding='utf-8', meta={"item": qitem, "chacha_url_pre":chacha_url_pre} )
def generate_article_url(self, response): as_id = ''.join(random.sample(string.ascii_letters + string.digits, 15)) cp_id = ''.join(random.sample(string.ascii_letters + string.digits, 15)) yield scrapy.Request( "http://www.toutiao.com/api/pc/feed/?category=news_tech&utm_source=toutiao&widen=1&max_behot_time=0" + "max_behot_time_tmp=" + str(int(time.time())) + "tadrequire=true&as=" + as_id + "&cp=" + cp_id + "&t=" + str(time.time()), callback=self.generate_article_url ) article_list = json.loads(response.body) if article_list.get("message") != "success": return for article_detail in article_list.get('data'): # wenda gallery ad ? # news_tech and news_finance tag_url = article_detail.get('tag_url') if article_detail.get('article_genre') == 'article'\ and (tag_url == 'news_tech' or tag_url == 'news_finance'): yield scrapy.Request( self.toutiao_url_pre + article_detail.get('source_url'), callback=self.generate_article_content )
def generate_articlelist(self, response): if response.body.find("list") == -1: return articlelist = json.loads(response.body) page_key = int(response.meta['page_key']) # if 1 == 1: if page_key == 1 or self.check_rep_time(response.body): yield scrapy.Request( response.url.replace(re.search(u'index=[\d]+', response.url).group(0), 'index='+str(page_key+1)), callback=self.generate_articlelist, meta={"page_key": str(page_key+1)} ) # scrapy all article for artUrl in articlelist['list']: yield scrapy.Request( artUrl['ArtUrl'], callback=self.generate_article_detail )
def get_changyan_topic_id(self, response): article_item = YPcpopItem() article_item._id = response.meta['article_id'] comment_all = json.loads(response.body) if 'cmt_sum' in comment_all: article_item.replies = str(comment_all['cmt_sum']) if 'participation_sum' in comment_all: article_item.views = str(comment_all['participation_sum']) MongoClient.save_forum_views(article_item, YPcpopItem) MongoClient.save_forum_replies(article_item, YPcpopItem) if 'topic_id' in comment_all: yield scrapy.Request( 'http://changyan.sohu.com/api/2/topic/comments?&client_id=cyrYYYfxG&page_size=100&page_no=1&topic_id='+ str(comment_all['topic_id']), meta={"article_id": article_item._id, "page_no":1, "topic_id":str(comment_all['topic_id'])}, callback=self.get_changyan_comment )
def generate_forum_url(self, response): # page_key = int(response.meta['page_key']) + 1 # check last forum time ????????? # rep_time = response.xpath('//div[@class="Forumhome_listbox"]//dl//dd//p/text()').extract() # if self.check_rep_date(rep_time): # url = 'http://club.lenovo.com.cn/forum-all-reply_time-0-' + str(page_key) # yield scrapy.Request( # url, # meta={"page_key": page_key, "proxy": MongoClient.get_random_proxy()}, # callback=self.generate_forum_url # ) for h1a_forum_url in response.xpath('//div[@class="Forumhome_listbox"]//dd//h1//a//@href').extract(): yield scrapy.Request( h1a_forum_url, meta={"proxy": MongoClient.get_random_proxy()}, callback=self.generate_forum_content ) # parse forum content and store
def generate_forum_url(self, response): url_xpath = response.xpath( '//div[@class="threadlist"]//div[@class="threadlist_title"]//a[@onclick="atarget(this)"]/@href').extract() rep_time_path = response.xpath( '//div[@class="threadlist_info"]//div[@class="lastreply"]//span/@title').extract() page_key = int(response.meta['page_key']) + 1 if len(rep_time_path) > 0: if self.check_rep_date(rep_time_path[0]) or page_key == 2: # ????? forum_key = response.meta['forum_key'] yield scrapy.Request( "http://bbs.lenovomobile.cn/" + forum_key + "/" + str(page_key) + "/", meta={"page_key": page_key, "forum_key": forum_key}, callback=self.generate_forum_url ) logging.error(len(url_xpath)) # ???? for forum_url in url_xpath: yield scrapy.Request( # eg. /zui/t778232/ "http://bbs.lenovomobile.cn" + forum_url + '1/', callback=self.generate_forum_content )
def generate_article_comment_sum(self, response): com_sum_script = response.xpath("//html//script[1]//text()").extract() com_sum = 0 if len(com_sum_script) > 1: com_sum_script = re.search(u'[\d]+', com_sum_script[1]) try: com_sum = com_sum_script.group(0) except: com_sum = '' ithome_item = YIthome2Item() ithome_item._id = re.search(u'[\d]+', response.url).group(0) ithome_item.replies = str(com_sum) MongoClient.save_ithome_com_sum(ithome_item) hash_key = response.xpath('//input[@id="hash"]/@value').extract() if len(hash_key) > 0: com_url = \ "http://dyn.ithome.com/ithome/getajaxdata.aspx?newsID=" + response.meta['article_id'] com_url += "&type=commentpage&order=false&hash="+hash_key[0]+"&page=" yield scrapy.Request( com_url + str(1), dont_filter='true', callback=self.generate_article_comment )
def start_requests(self): # enter forum yield scrapy.Request( 'http://jiyouhui.it168.com/forum.php', meta={"page_key": 1}, callback=self.generate_forum_url_list ) yield scrapy.Request( 'http://benyouhui.it168.com/forum.php', meta={"page_key": 1}, callback=self.generate_forum_url_list ) # yield scrapy.Request( # 'http://benyouhui.it168.com/forum-962-1.html', # meta={"page_key": 1}, # callback=self.generate_forum_page_list # )
def parse(self, response): list_types = Selector(response).xpath('//div[@class="listado_1"]//ul/li/a') for types in list_types: href= types.xpath("./@href").extract() text = types.xpath("./text()").extract() if Terms.filterBytype(text[0]): type = Terms.getType(text[0]) initiative_url = Utils.createUrl(response.url,href[0]) yield scrapy.Request(initiative_url,errback=self.errback_httpbin,callback=self.initiatives, meta={'type': type}) """ urlsa = "" urlsa = "http://www.congreso.es/portal/page/portal/Congreso/Congreso/Iniciativas/Indice%20de%20Iniciativas?_piref73_1335503_73_1335500_1335500.next_page=/wc/servidorCGI&CMD=VERLST&BASE=IW12&PIECE=IWC2&FMT=INITXD1S.fmt&FORM1=INITXLUS.fmt&DOCS=100-100&QUERY=%28I%29.ACIN1.+%26+%28161%29.SINI." yield scrapy.Request(urlsa, errback=self.errback_httpbin, callback=self.oneinitiative, meta={'type': u"Proposición no de Ley en Comisión"}) """
def initiatives(self, response): type = response.meta['type'] first_url = Selector(response).xpath('//div[@class="resultados_encontrados"]/p/a/@href').extract()[0] num_inis = Selector(response).xpath('//div[@class="SUBTITULO_CONTENIDO"]/span/text()').extract() split = first_url.partition("&DOCS=1-1") for i in range(1,int(num_inis[0])+1): new_url = split[0]+"&DOCS="+str(i)+"-"+str(i)+split[2] initiative_url = Utils.createUrl(response.url,new_url) CheckItems.addElement(initiative_url) if Blacklist.getElement(initiative_url): if not Blacklist.getElement(initiative_url): yield scrapy.Request(initiative_url,errback=self.errback_httpbin, callback=self.oneinitiative, meta = {'type':type}) else: yield scrapy.Request(initiative_url,errback=self.errback_httpbin, callback=self.oneinitiative, meta = {'type':type})
def recursiveDS(self,response): text = response.meta['texto'] item = response.meta['item'] links = response.meta['allDS'] text += self.searchDS(response, ref=item["ref"], name=item["url"]) if not links: item["contenido"].append(text) yield item else: first_url = links[0] Utils.delfirstelement(links) yield scrapy.Request(Utils.createUrl(response.url, first_url), callback=self.recursiveDS, dont_filter=True, meta={'item': item, 'allDS': links, "texto": text})
def parse(self, response): item = response.css('div.listBox ul li ') hrefs = item.css('div.listimg a::attr(href)').extract() # titles = item.css('div.listInfo h3 p::text').extract() # logging.log(logging.INFO, "parse " + len(hrefs)) # ???????????????????????parse_movie?? for href in hrefs: # logging.log(logging.INFO, "hrefs[" + index + "]=" + href) try: yield scrapy.Request(response.urljoin(href), callback=self.parse_movie) except Exception as e: continue # ???????? next_page_str = u'???' rex = '//div[@class="pagebox"]/a[contains(text(), "%s")]/@href' % next_page_str next_page = response.xpath(rex).extract_first() # ???????????????????????????????????????????? if next_page is not None: next_page = response.urljoin(next_page) yield scrapy.Request(next_page, callback=self.parse)
def test_clear(self): self.assertEqual(len(self.q), 0) for i in range(10): # XXX: can't use same url for all requests as SpiderPriorityQueue # uses redis' set implemention and we will end with only one # request in the set and thus failing the test. It should be noted # that when using SpiderPriorityQueue it acts as a request # duplication filter whenever the serielized requests are the same. # This might be unwanted on repetitive requests to the same page # even with dont_filter=True flag. req = Request('http://example.com/?page=%s' % i) self.q.push(req) self.assertEqual(len(self.q), 10) self.q.clear() self.assertEqual(len(self.q), 0)
def test_queue(self): req1 = Request('http://example.com/page1', priority=100) req2 = Request('http://example.com/page2', priority=50) req3 = Request('http://example.com/page2', priority=200) self.q.push(req1) self.q.push(req2) self.q.push(req3) out1 = self.q.pop() out2 = self.q.pop() out3 = self.q.pop() self.assertEqual(out1.url, req3.url) self.assertEqual(out2.url, req1.url) self.assertEqual(out3.url, req2.url)
def test_scheduler_persistent(self): # TODO: Improve this test to avoid the need to check for log messages. self.spider.log = mock.Mock(spec=self.spider.log) self.scheduler.persist = True self.scheduler.open(self.spider) self.assertEqual(self.spider.log.call_count, 0) self.scheduler.enqueue_request(Request('http://example.com/page1')) self.scheduler.enqueue_request(Request('http://example.com/page2')) self.assertTrue(self.scheduler.has_pending_requests()) self.scheduler.close('finish') self.scheduler.open(self.spider) self.spider.log.assert_has_calls([ mock.call("Resuming crawl (2 requests scheduled)"), ]) self.assertEqual(len(self.scheduler), 2) self.scheduler.persist = False self.scheduler.close('finish') self.assertEqual(len(self.scheduler), 0)
def parse(self,response): origin_url = response.url if "index" not in origin_url: soup = BeautifulSoup(response.body,"lxml") catalogue = soup.find("a",class_ = "blue CurrChnlCls").get("title").strip() news_list = soup.find("div", class_ = "lie_main_m").find_all("li") for news in news_list: title = news.find("a").text.strip() news_url = "http://www.cnta.gov.cn/xxfb" + news.find("a").get("href")[2:] news_no = news_url.rsplit("/",1)[-1].split(".")[0] item = NewsItem( news_url =news_url, title = title, news_no = news_no, catalogue = catalogue, ) yield scrapy.Request(item["news_url"],callback=self.parse_news,meta={'item':item}) else: topic_url = origin_url.rsplit(".",1)[0] self.flag.setdefault(topic_url,0) yield scrapy.Request(origin_url,callback=self.parse_topic)
def parse(self, response): origin_url = response.url #http://money.163.com/special/002526O5/transport_02.html search_result = re.search(r"_(\d)*?\.",origin_url) #???? pageindex = search_result.group(1) if search_result else 1 soup = BeautifulSoup(response.body,"lxml") news_list = soup("div",class_="list_item clearfix") for news in news_list: news_date = news.find("span",class_="time").text if news.find("span",class_="time")else None title = news.find("h2").text if news.find("h2") else None news_url = news.find("h2").a.get("href",None) if news.find("h2") else None abstract = news.find("p").contents[0] if news.find("p") else None item = NewsItem(title=title,news_url=news_url,abstract=abstract,news_date=news_date) item = judge_news_crawl(item) #?????????? if item: request = scrapy.Request(news_url,callback=self.parse_news,meta={"item":item}) yield request else: self.flag = int(pageindex) if not self.flag: next_url = self.next_url % int(pageindex)+1 yield scrapy.Request(next_url)
def next_page_parse(self,response): html = response.body url = response.url np_soup = BeautifulSoup(html,"lxml") #???<div id="last2" lastTime="1467972702826" pageIndex="2" style="display:none;"></div> res = np_soup.find(name="div",attrs={"lasttime":True}) lasttime = res.get("lasttime",None) if res else None pageindex = res.get("pageindex",None)if res else None for i in self.fetch_newslist(np_soup): request = scrapy.Request(i['news_url'],callback=self.parse_news) request.meta['item'] = i request.meta["pageindex"] = i yield request #???? if not self.flag and lasttime: pageindex = str(int(pageindex)+1) new_url = re.sub(r'pageidx=.*?&lastTime=.*',"pageidx=%s&lastTime=%s" % (pageindex,lasttime),url,1) yield scrapy.Request(new_url, callback=self.next_page_parse) # else: #log.msg("can't find lasttime or pageindex", level=log.INFO)
def parse(self , response): origin_url = response.url soup = BeautifulSoup(response.body,"lxml") temp_soup = soup.find('div',id = "ess_ctr10789_ModuleContent") if soup.find('div',id = "ess_ctr10789_ModuleContent") else None if temp_soup: news_list = temp_soup.find_all("a" , href = re.compile("http://www.toptour.cn/tab")) for news in news_list: news_url = news.get("href") title = news.text.strip() item = NewsItem( news_url = news_url, title = title, catalogue = u"???" ) yield scrapy.Request(item["news_url"],callback=self.parse_news,meta={'item':item}) else: logger.warning("%s can't find news_list " % origin_url)
def parse(self, response): """parse crawl page :response: TODO :returns: None """ # debug # from scrapy.shell import inspect_response # inspect_response(response, self) for i in range(1, self.page+1): yield scrapy.Request( response.request.url + '%s' % (i), self.parse_ip, dont_filter=True, )
def logged_in(self, response): conn = pymysql.connect(**DB_CONFIG) cursor = conn.cursor() sql = 'select * from section' cursor.execute(sql) for row in cursor.fetchall(): item = ByrbbsArticleItem() item['section_url'] = row[1] yield scrapy.Request(response.urljoin(row[1]), meta={'cookiejar': response.meta['cookiejar'], 'item': item}, headers=HEADERS, callback=self.parse_article_list) # ??????????? # self.start_urls = ['https://bbs.byr.cn/board/BM_Market'] # item = ByrbbsArticleItem() # item['section_url'] = 'board/BM_Market' # return scrapy.Request(self.start_urls[0], meta={'cookiejar': response.meta['cookiejar'], 'item': item}, # headers=HEADERS, callback=self.parse_article_list) # ??????????????????????
def logged_in(self, response): conn = pymysql.connect(**DB_CONFIG) cursor = conn.cursor() sql = 'select * from section' cursor.execute(sql) for row in cursor.fetchall(): item = ByrbbsArticleItem() item['section_url'] = row[1] yield scrapy.Request(response.urljoin(row[1]), meta={'cookiejar': response.meta['cookiejar'], 'item': item}, headers=HEADERS, callback=self.parse_article_list_pre) # ??????????? # self.start_urls = ['https://bbs.byr.cn/board/BUPTPost'] # item = ByrbbsArticleItem() # item['section_url'] = 'BUPTPost' # return scrapy.Request(self.start_urls[0], meta={'cookiejar': response.meta['cookiejar'], 'item': item}, # headers=HEADERS, callback=self.parse_article_list) # ?????????
def parse(self, response): soup = BeautifulSoup(response.body, 'html.parser') #????? infos = soup.findAll(attrs={'class': 'item-mod'}) # ???? pagesUrl = soup.find(attrs={'class': 'list-page'}) print("????????") # ???? number = int(pagesUrl.find(attrs={'class': 'total'}).em.string) # ??????????50??? pages = number // 50 if (number % 50 > 0): pages = pages + 1 print("??" + str(pages)) purl = pagesUrl.find(attrs={'class': 'pagination'}).a['href'] purl = purl[0:-3] for i in range(1, pages + 1): temp = purl + "p" + str(i) + "/" print("???????????" + temp) print("????" + temp) yield scrapy.Request(temp, callback=self.parse_item) print("??????")
def parse_item(self, response): soup = BeautifulSoup(response.body, 'html.parser') # ????? infos = soup.findAll(attrs={'class': 'item-mod'}) for q in infos: if 'data-link' in str(q): item = AjkItem() item['title'] = q.h3.a.string print(q.h3.a.string) item['detailUrl'] = q.h3.a.get('href') print(q.h3.a.get('href')) print(q.find(attrs={'class': 'address'}).a.string) if q.find(attrs={'class': 'price'}) != None: item['price'] = q.find(attrs={'class': 'price'}).span.string print(q.find(attrs={'class': 'price'}).span.string) else: item['price'] = q.find(attrs={'class': 'favor-tag around-price'}).span.string + 'around' print(q.find(attrs={'class': 'favor-tag around-price'}).span.string + 'around') # item['telephone'] = q.find(attrs={'class': 'tel'}).contents[1] # print(q.find(attrs={'class': 'tel'}).string) yield scrapy.Request(url=q.h3.a.get('href'), callback=self.parse_item2)
def parse(self, response): for solution_href in response.selector.xpath('//a[@title="Participants solved the problem"]/@href'): solution_url = response.urljoin( solution_href.extract() + '?order=BY_CONSUMED_TIME_ASC') yield scrapy.Request(solution_url, callback=self.parse_problem_solution_list_page) if response.selector.xpath('//span[@class="inactive"]/text()').extract(): if response.selector.xpath('//span[@class="inactive"]/text()')[0].extract() != u'\u2192': next_page_href = response.selector.xpath( '//div[@class="pagination"]/ul/li/a[@class="arrow"]/@href')[0] next_page_url = response.urljoin(next_page_href.extract()) yield scrapy.Request(next_page_url, callback=self.parse) else: next_page_href = response.selector.xpath( '//div[@class="pagination"]/ul/li/a[@class="arrow"]/@href')[1] next_page_url = response.urljoin(next_page_href.extract()) yield scrapy.Request(next_page_url, callback=self.parse)
def parse_lista_diputados(self, response): # listado de diputados diputados = response.xpath( '//div[@class="listado_1"]/ul/li/a/@href').extract() for diputado in diputados: request = scrapy.Request( response.urljoin(diputado), callback=self.parse_diputado) yield request # proxima pagina pagina_siguiente = response.xpath( '//a[contains(., "Página Siguiente")]/@href').extract_first() if pagina_siguiente: request = scrapy.Request( pagina_siguiente, callback=self.parse_lista_diputados) yield request
def parse_user(self, response): ''' ??????json??????????????json.loads???? :param response: :return: ''' result = json.loads(response.text) item = UserItem() #????????????????????????????? for field in item.fields: if field in result.keys(): item[field] = result.get(field) #?????item?????Request?????????????????????????? yield item yield Request(self.follows_url.format(user = result.get("url_token"),include=self.follows_query,offset=0,limit=20),callback=self.parse_follows) yield Request(self.followers_url.format(user = result.get("url_token"),include=self.followers_query,offset=0,limit=20),callback=self.parse_followers)
def parse_follows(self, response): ''' ?????????????????json?? ???????data?page???page????? :param response: :return: ''' results = json.loads(response.text) if 'data' in results.keys(): for result in results.get('data'): yield Request(self.user_url.format(user = result.get("url_token"),include=self.user_query),callback=self.parse_user) #????page????????page????is_end?????False????False????????????????? if 'page' in results.keys() and results.get('is_end') == False: next_page = results.get('paging').get("next") #????????????yield????Request??????????????????? yield Request(next_page,self.parse_follows)
def parse_followers(self, response): ''' ?????????????????? ?????????????????json?? ???????data?page???page????? :param response: :return: ''' results = json.loads(response.text) if 'data' in results.keys(): for result in results.get('data'): yield Request(self.user_url.format(user = result.get("url_token"),include=self.user_query),callback=self.parse_user) #????page????????page????is_end?????False????False????????????????? if 'page' in results.keys() and results.get('is_end') == False: next_page = results.get('paging').get("next") #????????????yield????Request??????????????????? yield Request(next_page,self.parse_followers)
def parse(self, response): """ ???html??????url ?????url?????? ?????url???? /question/xxx ????????????? """ all_urls = response.css("a::attr(href)").extract() all_urls = [parse.urljoin(response.url, url) for url in all_urls] # ??lambda???????url????????true???????false??? all_urls = filter(lambda x: True if x.startswith("https") else False, all_urls) for url in all_urls: match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url) if match_obj: # ?????question??????????????????? request_url = match_obj.group(1) yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question) #?? # break else: # pass # ????question?????????? yield scrapy.Request(url, headers=self.headers, callback=self.parse)
def start_requests(self): return [scrapy.Request('https://www.zhihu.com/#signin', headers=self.headers, callback=self.login)]
def login(self, response): response_text = response.text match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL) xsrf = '' if match_obj: xsrf = (match_obj.group(1)) if xsrf: post_url = "https://www.zhihu.com/login/phone_num" post_data = { "_xsrf": xsrf, "phone_num": "18487255487", "password": "ty158917", "captcha": "" } import time t = str(int(time.time() * 1000)) captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t) yield scrapy.Request(captcha_url, headers=self.headers, meta={"post_data":post_data}, callback=self.login_after_captcha)
def parse(self, response): links = response.xpath("//*[@class = 'tagCol']/descendant::a/@href").extract() for href in links: for pageNum in np.linspace(0, 180, 10): # ????Tag??10??? full_url = response.urljoin(href + "/?start=" + str(int(pageNum)) + "&type=S") # ?type=S ???????? yield scrapy.Request(full_url, callback=self.parse_tag_per_page) # ???????????????
def parse_tag_per_page(self, response): links = response.xpath("//ul[@class = 'subject-list']/descendant::a[@class = 'nbg']/@href").extract() for book in links: yield scrapy.Request(book, callback=self.parse_book) # ???????????????BookItem
def start_requests(self): for part_url in self.start_urls: yield scrapy.Request( part_url, meta={"page_key": 0}, callback=self.generate_productlist ) # ?? ??????????100???
def generate_product_detail(self, response): product_id1 = re.search('/([\d]+)/', response.url).group(1) product_id2 = re.search('/([\d]+).html', response.url).group(1) category = generate_product_category(response) yield scrapy.Request( 'http://review.suning.com/ajax/review_lists/general-000000000' + product_id2 + '-' + product_id1 + '-total-1-default-10-----reviewList.htm', callback=self.generate_product_comment, meta={"page_key": 1, "category": category, "url": response.url} )
def generate_comment_usefulcnt(self, response): review_userful = json.loads(re.search('usefulCnt\((.*)\)', response.body).group(1)) if 'reviewUsefuAndReplylList' in review_userful: useful_dict = review_userful.get('reviewUsefuAndReplylList') suning_item = YSuningItem() c_id = str(useful_dict[0].get('commodityReviewId')) suning_item._id = c_id suning_item.useful_vote_count = str(useful_dict[0].get('usefulCount')) suning_item.replies = str(useful_dict[0].get('replyCount')) if useful_dict[0].get('replyCount') > 0: yield scrapy.Request( 'https://review.suning.com/ajax/reply_list/' + c_id + '--1-replylist.htm', callback=self.generate_comment_replylist ) MongoClient.save_suning_usefulcnt(suning_item, YSuningItem)
def generate_forum_list(self, response): forum_list = response.xpath('//a/@href').extract() if len(forum_list) > 0: for forum_url in forum_list: url = re.search(u'http://www.18095.com/forum-\d{1,10}-1.html', forum_url) if url is not None: yield scrapy.Request( forum_url, meta={"page_key": 1}, callback=self.generate_forum_list ) page_key = int(response.meta['page_key']) rep_time_list = response.xpath('//tr/td[@class="by"]/em/a').extract() if len(response.xpath('//span[@id="fd_page_bottom"]//a[@class="nxt"]/@href').extract()) != 0: if page_key == 1 or self.check_rep_date(rep_time_list): nxt_page = \ response.xpath('//span[@id="fd_page_bottom"]//a[@class="nxt"]/@href').extract()[0] yield scrapy.Request( nxt_page, meta={"page_key": -1}, callback=self.generate_forum_list ) thread_list = response.xpath('//a[contains(@class,"xst")]/@href').extract() if len(thread_list) > 0: logging.error(len(thread_list)) for thread_url in thread_list: yield scrapy.Request( thread_url, callback=self.generate_forum_thread )
def start_requests(self): # get into the bbs yield scrapy.Request( self.start_urls[0], meta={"page_key": 1}, callback=self.generate_forum_list ) # yield scrapy.Request( # 'http://bbs.gfan.com/forum-1686-1.html', # callback=self.generate_forum_page_list # )
def generate_forum_list(self, response): forum_list = re.findall(u'http://bbs.gfan.com/forum-[\d]+-1.html', response.body) if len(forum_list) > 0: for forum_url in forum_list: if forum_url not in self.forum_dict: yield scrapy.Request( forum_url, meta={"page_key": 1}, callback=self.generate_forum_list ) pg_bar = response.xpath('//div[@class="pg"]//a[@class="nxt"]/@href').extract() rep_time_list = response.xpath('//tr/td[@class="by"]/em/a').extract() page_key = int(response.meta['page_key']) if len(pg_bar) > 0: if page_key == 1 or self.check_rep_date(rep_time_list): yield scrapy.Request( pg_bar[0], meta={"page_key": -1}, callback=self.generate_forum_list ) thread_list = response.xpath('//a[@class="xst"]/@href').extract() logging.error(len(thread_list)) if len(thread_list) > 0: for thread_url in thread_list: yield scrapy.Request( thread_url, callback=self.generate_forum_thread )
def generate_forum(self, response): forum_list = response.xpath('//td[@class="fl_g"]//dl//dt//a/@href').extract() if len(forum_list) > 0: for forum_url in forum_list: f_url = forum_url yield scrapy.Request( f_url, meta={"page_key": 1}, callback=self.generate_forum ) # check ?????? rep_time_list = response.xpath('//tr/td[@class="by"]/em/a').extract() page_key = int(response.meta['page_key']) pg_bar = response.xpath('//div[@class="pg"]//a[@class="nxt"]/@href').extract() if page_key == 1 or self.check_rep_date(rep_time_list): if len(pg_bar) > 0: yield scrapy.Request( pg_bar[0], meta={"page_key": -1}, callback=self.generate_forum ) # scrapy all tie url thread_list = response.xpath('//a[contains(@class,"xst")]/@href').extract() logging.error(len(thread_list)) if len(thread_list) > 0: for thread_url in thread_list: yield scrapy.Request( thread_url, callback=self.generate_forum_thread )
def generate_forum(self, response): forum_list = response.xpath('//td[@class="fl_g"]//dl//dt//a/@href').extract() if len(forum_list) > 0: for forum_url in forum_list: f_url = forum_url if forum_url.find('bbs.zhiyoo.com') == -1: f_url = 'http://bbs.zhiyoo.com/' + forum_url yield scrapy.Request( f_url, meta={"page_key": 1, "proxy": MongoClient.get_random_proxy()}, callback=self.generate_forum ) # check ?????? pg_bar = response.xpath('//div[@class="pg"]//a[@class="nxt"]/@href').extract() page_key = int(response.meta['page_key']) rep_time_list = response.xpath('//tr/td[@class="by"]/em/a').extract() # ??????????? if len(pg_bar) > 0: if page_key == 1 or self.check_rep_date(rep_time_list): yield scrapy.Request( pg_bar[0], meta={"page_key": -1, "proxy": MongoClient.get_random_proxy()}, callback=self.generate_forum ) # scrapy all tie url thread_list = response.xpath('//a[@class="xst"]/@href').extract() if len(thread_list) > 0: for thread_url in thread_list: yield scrapy.Request( thread_url, meta={"proxy": MongoClient.get_random_proxy()}, callback=self.generate_forum_thread )
def get_record_list(self, response): content = response.body content = content.replace('<!--', '') content = content.replace('-->', '') tree = etree.HTML(content) url_list = tree.xpath('//*[@id="thread_list"]//a/@href') category = response.meta['category'] for i in url_list: if '/p/' in i and 'http://' not in i: tie_url = 'http://tieba.baidu.com' + i yield scrapy.Request( tie_url, meta={"category": category}, callback=self.get_record_page_num ) # check last reply time, ???????????? 12:12 rep_time = tree.xpath('//span[contains(@class,"threadlist_reply_date")]/text()') if self.check_rep_date(rep_time[0]): next_page = tree.xpath('//a[contains(@class, "next")]/text()') if len(next_page) > 0: logging.error(next_page[0]) page_key = int(response.meta['page_key']) + 50 url = 'http://tieba.baidu.com/f?ie=utf-8&kw=' + category + '&fr=search&pn=' + str(page_key) yield scrapy.Request( url, meta={"page_key": page_key, "category": category}, callback=self.get_record_list )