我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用scrapy.Item()。
def _extract_item(self, response): #?scrapy shell???response #inspect_response(response, self) #???????scrapy????response????????????? #open_in_browser(response) #??????? l = ItemLoader(response=response, item=MyspiderItem(), type='html') l.add_xpath('movie_name', '//h1/span[@property="v:itemreviewed"]/text()') l.add_xpath('movie_year', '//span[@property="v:initialReleaseDate"]/text()') l.add_xpath('movie_type', '//span[@property="v:genre"]/text()') l.add_xpath('movie_rate', '//strong[@class="ll rating_num"]/text()') l.add_value('url', response.url) #????????????load_item()????scrapy.Item?? #?scrapy-redis????json?item???????redis?item??? #??json?????python?????????????item????? return dict(l.load_item())
def parse_node(self, response, node): i = MyxmlItem() #XPath??????Item i['title'] = node.xpath("/rss/channel/item/title/text()").extract() i['link'] = node.xpath("/rss/channel/item/link/text()").extract() i['author'] = node.xpath("/rss/channel/item/author/text()").extract() #?for????item?? for j in range(len(i['title'])): print(""+str(j+1)+"?") print("?") print(i['title'][j]) print("??") print(i['link'][j]) print("??") print(i['author'][j]) print("----------------------") return i #(4)
def _extract_item(self, response): #??????? l = ItemLoader(response=response, item=MyspiderItem(), type='html') l.add_xpath('movie_name', '//h1/span[@property="v:itemreviewed"]/text()') l.add_xpath('movie_year', '//span[@property="v:initialReleaseDate"]/text()') l.add_xpath('movie_type', '//span[@property="v:genre"]/text()') l.add_xpath('movie_rate', '//strong[@class="ll rating_num"]/text()') l.add_value('url', response.url) #????????????load_item()????scrapy.Item?? #?scrapy-redis????json?item???????redis?item??? #??json?????python?????????????item????? return dict(l.load_item())
def parse_details(self, response): # response = get(response.url) institution = response.xpath('//h2/text()').extract()[0].strip() logging.warn("scrapping: %s - %s"%(response.url, institution)) for tr in response.xpath('//table[@class="fancy"]/tr'): if tr.xpath('td[1]'): item = Item() titlu = xtract(tr, 'td[1]//div/text()') type_ = xtract(tr, 'td[2]//div//strong/text()') consult = xtract(tr, 'td[3]//div/text()') avizare = xtract(tr, 'td[4]//div/text()') avizori = xtract(tr, 'td[5]//div/text()') termen_avize = xtract(tr, 'td[6]//div/text()') mfp_mj = xtract(tr, 'td[7]//div/text()') reavizare = xtract(tr, 'td[8]//div/text()') init_1 = xtract(tr, 'td[9]//a/@href') init_2 = xtract(tr, 'td[10]//a/@href') final_1 = xtract(tr, 'td[11]//a/@href') final_2 = xtract(tr, 'td[12]//a/@href') docs = [{"type": "nota", "url": response.urljoin(f)} for f in [init_1, init_2, final_1, final_2] if f] item['identifier'] = identify(institution, titlu) item['title'] = titlu item['type'] = type_ item['institution'] = "sgg" item['date'] = consult item['description'] = "" item['feedback_days'] = None item['contact'] = None item['documents'] = docs yield item
def parse_item(self,response): self.logger.info('Hi,this is an item page! %s',response.url) item = scrapy.Item() item['id'] = response.xpath('//td[@id="item_id"]/text()').re(r'ID: (\d+)') item['name'] = response.xpath('//td[@id="item_name"]/text()').extract() item['description'] = response.xpath('//td[@id="item_description"]/text()').extract() return item
def default(self, obj): if isinstance(obj, Item): return dict(obj) # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, obj)
def _bogus_item(self, item): max_style = len('advanced intermediate hip hop with something else mixed in') max_teacher = len('someones longish-teacher and-last-name sub for crazy-long foreign-teacher different-name') if len(item['style']) > max_style or len(item['teacher']) > max_teacher: logging.error("Item contained too long properties: %s", item) return True return False
def page_item(self, response: HtmlResponse) -> Item: media_urls = [] get_urls = lambda le: (link.url for link in le.extract_links(response)) if self.settings.get('FILES_STORE'): media_urls.extend(get_urls(self.images_le)) media_urls.extend( set(get_urls(self.files_le)) - set(get_urls(self.le))) metadata = { 'id': _url_hash(response.url, as_bytes=False), 'parent': _url_hash_as_str(response.meta.get('parent')), 'depth': response.meta.get('depth'), 'priority': response.request.priority, } if (self.settings.get('AUTOLOGIN_ENABLED') and not self.queue.has_login_form(response.url)): for form_el, form_meta in extract_forms( response.text, fields=False): if form_meta.get('form') == 'login': self.queue.add_login_form(response.url) metadata['has_login_form'] = True return text_cdr_item( response, crawler_name=self.settings.get('CDR_CRAWLER'), team_name=self.settings.get('CDR_TEAM'), objects=media_urls, metadata=metadata, )
def page_item(self, response: HtmlResponse) -> Item: item = super().page_item(response) if self.page_clf: item['metadata']['page_score'] = self.page_score(response) return item
def test_process_item(self): normal_item = Item() class DummyDocument(Document): pass DummyDocument.save = MagicMock() document_item = document_to_item(DummyDocument)() after = self.pipe.process_item(normal_item, None) self.assertEqual(normal_item, after) after = self.pipe.process_item(document_item, None) self.assertIsInstance(after, DummyDocument)
def document_to_item(document_class): class DocumentAsItemClass(Item): def concrete(self): return document_class(**self) exclude_fields = dir(EmptyDocument) document_fields = [field for field in dir(document_class) if field not in exclude_fields] for field in document_fields + ['id']: DocumentAsItemClass.fields[field] = Field() return DocumentAsItemClass
def get_scrapy_item_classes(): """ Get a list of tuples containing (1) the class name and (2) the class for all of the Scrapy item classes defined in the crawling module. :return: A list of tuples containing (1) the class name and (2) the class for all of the Scrapy item classes defined in the crawling module. """ import lib.inspection.web.crawling.item import scrapy return list(set(IntrospectionHelper.get_all_classes_of_type( to_find=scrapy.Item, path="lib/inspection/web/crawling", )))
def process_spider_output(self, response, result, spider): for i in result: if isinstance(i, scrapy.Item) and (i['info'].get('player', '') == 'iqiyi'): key = i['url'] if key not in self.items.keys(): self.items[key] = copy.deepcopy(i) else: self.items[key]['media_urls'].append(i['media_urls'][0]) if i['info']['count'] == len(self.items[key]['media_urls']): yield self.__sort_item(key) else: yield i
def __sort_item(self, key): item = self.items.pop(key) item['media_urls'].sort(key=lambda url: int(re.findall(r'qd_index=(\d+)&', url)[0])) item['info'].pop('index', None) item['info'].pop('count', None) item['info'].pop('player', None) return item # class MultimediaCrawlerMiddleware(object): # @classmethod # def from_crawler(cls, crawler): # # This method is used by Scrapy to create your spiders. # s = cls() # crawler.signals.connect(s.spider_opened, signal=signals.spider_opened) # return s # # def process_spider_input(self, response, spider): # # Called for each response that goes through the spider # # middleware and into the spider. # # # Should return None or raise an exception. # return None # # def process_spider_output(self, response, result, spider): # # Called with the results returned from the Spider, after # # it has processed the response. # # Must return an iterable of Request, dict or Item objects. # for i in result: # yield i # # def process_spider_exception(self, response, exception, spider): # # Called when a spider or process_spider_input() method # # (from other spider middleware) raises an exception. # # # Should return either None or an iterable of Response, dict # # or Item objects. # pass # # def process_start_requests(self, start_requests, spider): # # Called with the start requests of the spider, and works # # similarly to the process_spider_output() method, except # # that it doesn’t have a response associated. # # # Must return only requests (not items). # for r in start_requests: # yield r # # def spider_opened(self, spider): # spider.logger.info('Spider opened: %s' % spider.name)