我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用scrapy.Spider()。
def parse_search(self, response): """ @summary: ?????????????request??????? @param response:start_requests()????????????? """ # ???????????????????????"antispider"?? # ????"antispider"??????????????????????????? if "antispider" in response.url: spider_logger.error("Closing spider for verification code received in %s ! Spider will restart automatically after 12 hours!" % response.url) time.sleep(43200) # ?????????????? raise CloseSpider('antispider') # ext????????????????json????url??????????????? ext = response.xpath( '//div[@class="wx-rb bg-blue wx-rb_v1 _item"][1]/@href').extract() # ?????????????????????????????ext?? if not ext: spider_logger.error("Faild searching {0} !".format(response.meta['query'])) return # ???????json???url?????????10?????????????1?(page=1????)?url json_url = "".join(ext).replace('/gzh?','http://weixin.sogou.com/gzhjs?')+'&cb=sogou.weixin_gzhcb&page=1&gzhArtKeyWord=' cookies = response.meta['cookies'] yield Request(json_url, callback= self.parse_index, cookies=cookies, meta ={'cookies':cookies})
def __init__(self, feed_file=None, feed_title=None, feed_link=None, feed_description=None, crawler_settings=None): settings = crawler_settings if crawler_settings else dict(self.default_settings) if feed_file: settings['FEED_FILE'] = feed_file if feed_title: settings['FEED_TITLE'] = feed_title if feed_link: settings['FEED_LINK'] = feed_link if feed_description: settings['FEED_DESCRIPTION'] = feed_description self.crawler = get_crawler(settings_dict=settings) self.spider = scrapy.Spider.from_crawler(self.crawler, 'example.com') self.spider.parse = lambda response: () item_processor = settings.get('ITEM_PROCESSOR') if not item_processor: item_processor = RaisedItemPipelineManager elif isinstance(item_processor, six.string_types): item_processor = load_object(item_processor) self.ipm = item_processor.from_crawler(self.crawler)
def __init__(self, *args, **kwargs): super(scrapy.Spider,self).__init__(*args, **kwargs) super(TiebaSearchSpider,self).__init__() self.dig_pattern = re.compile('(\d+)') self.postid_pattern = re.compile('/p/(\d{10})') self.page_all=1 self.site_id=2 self.site_name = u'tieba_search' self.Flag_List = [] self.Maxpage_List = [] self.MAX_PAGE_NUM = 5 self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0', 'Host': 'www.baidu.com', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', }
def __init__(self, *args, **kwargs): super(scrapy.Spider,self).__init__(*args, **kwargs) # self.sqldb = SqliteTime(self.name) super(DmozSpider_search,self).__init__() self.dig_pattern = re.compile('(\d+)') self.postid_pattern = re.compile('/p/(\d{10})') self.page_all=1 self.site_id = 500 self.Flag_List = [] self.Maxpage_List = [] self.MAX_PAGE_NUM = 5 self.headers={ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language' : 'zh-cn,zh;q=0.8,en-us;q=0.5,en;q=0.3', 'Connection' : 'keep-alive', 'DNT' : '1', 'Host' : 'www.baidu.com', 'User-Agent' : 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0', } self.content_pa1=re.compile('</div>(.*?)<br',re.S)
def setUp(self): self.spider = Spider('myspider') self.key = 'scrapy_redis:tests:%s:queue' % self.spider.name self.q = self.queue_cls(self.server, Spider('myspider'), self.key)
def setUp(self): self.persist = False self.key_prefix = 'scrapy_redis:tests:' self.queue_key = self.key_prefix + '%(spider)s:requests' self.dupefilter_key = self.key_prefix + '%(spider)s:dupefilter' self.idle_before_close = 0 self.scheduler = Scheduler(self.server, self.persist, self.queue_key, SpiderQueue, self.dupefilter_key, self.idle_before_close) self.spider = Spider('myspider')
def __init__(self, query=None, start_time=None, end_time=None, index_pages=None): """ @summary: ?????????, ????????????? @param query: ???,??????? @param start_time: ????????start_time??????????????????????? @param end_time: ????????end_time????? @param index_pages: ????????????? """ # ?????????????????????????? if query: self.query = query # self.query???????????? else: # ??????????????????????? spider_logger.error("Spider need single search word each time!Check input!") raise CloseSpider('invaild search word') # ???????????????100?? if start_time: self.from_time = start_time else: self.from_time = datetime.now()-timedelta(days=100) # ????100?? # ????????????? if end_time: self.end_time = end_time else: self.end_time = datetime.now() # ???????? # ??????? if index_pages: self.index_pages = int(index_pages) else: self.index_pages = 10 # ????10?
def parse_index(self, response): """ @summary: ?????????????????Request?? @param response: parse_search()????????????? @return: list????????????url??????????? """ if "antispider" in response.url: spider_logger.error("Closing spider for verification code received in %s ! Spider will restart automatically after 12 hours!" % response.url) time.sleep(43200) raise CloseSpider('antispider') requests = [] page_list = self._get_result(response) # ??????????????? if not page_list: return requests next_page = True # ???????? # ??????????????? for item in page_list: if isinstance(item, Request): # ?????Request requests.append(item) next_page = False break if item['publish_time'] <= self.from_time: # ????????self.from_time next_page = False break elif item['publish_time'] > self.end_time: # ????????self.end_time continue else: req = Request(item['url'], self.parse_page) # ??????? req.meta["item"] = item requests.append(req) # ?????,??????Request;??????? if next_page and self._next_result_page(response): cookies = response.meta['cookies'] requests.append(Request(self._next_result_page(response),callback=self.parse_index,cookies=cookies, meta ={'cookies':cookies})) return requests
def parse_page(self, response): """ @summary: ?????? @param response: parse_index()????????????? @return: ?????_finish_item()?????? """ if "antispider" in response.url: spider_logger.error("Closing spider for verification code received in %s ! Spider will restart automatically after 12 hours!" % response.url) time.sleep(43200) raise CloseSpider('antispider') item = response.meta["item"] return self._finish_item(item, response)
def __init__(self): scrapy.Spider.__init__(self) self.download_delay = 1/float(self.rate)
def __init__(self, url): super(Spider, self).__init__() self.start_urls = [url] self.le = LinkExtractor(canonicalize=False) self.files_le = LinkExtractor( tags=['a'], attrs=['href'], deny_extensions=[], canonicalize=False)
def make_crawler(**extra_settings): settings = Settings() settings['ITEM_PIPELINES'] = { 'scrapy_cdr.media_pipeline.CDRMediaPipeline': 1, 'tests.utils.CollectorPipeline': 100, } settings.update(extra_settings) runner = CrawlerRunner(settings) return runner.create_crawler(Spider)
def parse(self, response): """ Override function of the class scrapy.Spider. Called when response is obtained :param response: Response object used to get the details of the webpage """ for href in response.xpath("//a/@href").extract(): # Iterating over all the urls in the google search page if href[:7] == '/url?q=' and is_standard_website(href): # Getting the search results alone url = href[7:].split('&')[0] # starting another request for each search result url yield scrapy.Request(url, meta={'download_maxsize': 2097152}, callback=self.parse_result_contents)
def process_response(self, request, response, spider: Spider): if response.status == 302 or response.status == 503: self.ERROR_COUNT += 1 print('????%s' % self.ERROR_COUNT) if self.ERROR_COUNT > 100: spider.close(spider, 'http status error') return response
def parse(self, response): """ Default callback function with response for the crawled url https://doc.scrapy.org/en/latest/topics/spiders.html#scrapy.spiders.Spider.parse """ response = response.replace(body=re.sub(r"<br\s*[\/]?>", "\n", response.body.decode('utf=8'))) property_key = response.url.split('=')[1].replace('&', '') # logging.debug("Parsing property_key: %s", property_key) property_info = self.parse_property_info(response) property_values = self.parse_property_values(response) property_sales = self.parse_property_sales(response) property_info['property_key'] = property_key property_info['sales'] = property_sales property_info['values'] = property_values yield Property(property_info)
def main(): test_spider = MySpider(scrapy.Spider) test_spider.start_requests()
def spider_closed(self, spider, reason): spider.logger.info('Spider closed: %s %s', spider.name, reason) # if spider finished without error update last_scraped_at if reason == 'finished': try: self.logger.info('Updating media last_scraped_at information') self.cursor.execute(sql_update_media, [spider.name]) self.db.commit() self.db.close() except mysql.Error as err: self.logger.error('Unable to update last_scraped_at: %s', err) self.db.rollback() self.db.close() if self.is_slack: error_msg = '{}: Unable to update last_scraped_at: {}'.format( spider.name, err) self.slack.chat.post_message('#rojak-pantau-errors', error_msg, as_user=True) else: if self.is_slack: # Send error to slack error_msg = '{}: Spider fail because: {}'.format( spider.name, reason) self.slack.chat.post_message('#rojak-pantau-errors', error_msg, as_user=True) # subscibe to item_droped event
def __init__(self,*args,**kwargs): super(scrapy.Spider,self).__init__(*args,**kwargs) self.Flag_List = [] self.Maxpage_List = [] self.MAX_PAGE_NUM = 76 self.site_id = 1 #????? self.site_name = u'baidu_weibo' self.base_url = 'https://www.baidu.com/s?wd=%s&pn=0&cl=2&tn=baiduwb&ie=utf-8&f=3&rtt=2' self.topic_kws = None self.pa = re.compile('&pn=(\d+)&') self.pa_time = re.compile('\d+') self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0', 'Host': 'www.baidu.com', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', } self.headers_weibo = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0', 'Host': 'weibo.com', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' }
def __init__(self): scrapy.Spider.__init__(self) pydispatch.dispatcher.connect(self.handle_spider_closed, signals.spider_closed) # all asin scrapied will store in the array self.asin_pool = []
def __init__(self): scrapy.Spider.__init__(self) pydispatch.dispatcher.connect(self.handle_spider_closed, signals.spider_closed) # all asin scrapied will store in the array self.product_pool = {} self.log = [] self.products = []
def __init__(self): scrapy.spiders.Spider.__init__(self) self.global_settings = get_project_settings() if self.global_settings['PLATFORM'] in ['win', 'mac']: self.driver = webdriver.PhantomJS(executable_path= self.global_settings['PHANTOMJS_PATH']) elif self.global_settings['PLATFORM'] in ['linux']: self.driver = webdriver.PhantomJS() self.driver.set_page_load_timeout(30) self.driver.implicitly_wait(10) self.type_id_list = self.global_settings['CRAWLER']['type_id_list'] self.re_type_id = re.compile(self.global_settings['CRAWLER']['re_type_id']) self.url_template = self.global_settings['CRAWLER']['url_template']
def __del__(self): self.driver.quit() scrapy.spiders.Spider.__del__(self)
def start_requests(self): """Makes the initial request to the page you want to scrape. Returns an iterable of Requests, which the Spider can crawl. More requests will be generated successively from initial requests.""" urls = [ 'https://www.dice.com/jobs/detail/Etl%26%2347Informatica-Production-Support-%26%2347Developer-Pyramid-Consulting%2C-Inc.-Bellevue-WA-98006/pyrmid/16-32835?icid=sr1-1p&q=pyramid&l=Seattle,%20WA', ] for url in urls: """For each url you're sending the spider to, make a request. Run parse() on the response object you get back.""" yield scrapy.Request(url=url, callback=self.parse)
def _getbody(self, body): "Handle the body argument." if body == "false": self.get_body = False self.logger.info('Spider will not extract email body.')
def update_settings(cls, settings): """ Before initializing Spider Class, configure `job_idf` to spider settings """ job_idf = settings['job_idf'] spider_conf = cls.mongo_conf_db.get({'_id': job_idf}) scry_settings = ExtractorConf(spider_conf).scrapy_settings custom_settings = cls.custom_settings or {} custom_settings.update(scry_settings) settings.setdict(custom_settings, priority='spider')
def parse(self, response): item=MyfirstpjtItem() item["urlname"]=response.xpath("/html/head/title/text()") print(item["urlname"]) #12.7 Spider #(1) # -*- coding: utf-8 -*-
def quit(self, spider): # second param is instance of spider about to be closed. try: self.url.db.close() print('Spider closed, fetching product urls stopped') except: pass