我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用lxml.html.xpath()。
def get_file_urls(mainUrl,extension): uniFileUrls = [] if not mainUrl.lower().startswith('http://') and not mainUrl.lower().startswith('https://'): mainUrl = 'http://%s'%mainUrl print('Downloading from %s...'%mainUrl) if extension.startswith('*'): extension = extension[1:] if not extension.startswith('.'): extension = '.' + extension req = urllib.request.Request( mainUrl, data=None, headers={ 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36' } ) urlContent = urllib.request.urlopen(req).read().decode('utf-8') html = lxml.html.fromstring(urlContent) urls = html.xpath('//a/@href') for url in urls: if url.endswith(extension): url = urljoin(mainUrl,url) if url not in uniFileUrls: uniFileUrls.append(url) return uniFileUrls
def latest_content(url): ''' ?????????? Parameter -------- url:???? Return -------- string:????????? ''' try: html = lxml.html.parse(url) res = html.xpath('//div[@id=\"artibody\"]/p') if ct.PY3: sarr = [etree.tostring(node).decode('utf-8') for node in res] else: sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr).replace(' ', '')#.replace('\n\n', '\n'). html_content = lxml.html.fromstring(sarr) content = html_content.text_content() return content except Exception as er: print(str(er))
def _guba_content(url): try: html = lxml.html.parse(url) res = html.xpath('//div[@class=\"ilt_p\"]/p') if ct.PY3: sarr = [etree.tostring(node).decode('utf-8') for node in res] else: sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr).replace(' ', '')#.replace('\n\n', '\n'). html_content = lxml.html.fromstring(sarr) content = html_content.text_content() ptime = html.xpath('//div[@class=\"fl_left iltp_time\"]/span/text()')[0] rcounts = html.xpath('//div[@class=\"fl_right iltp_span\"]/span[2]/text()')[0] reg = re.compile(r'\((.*?)\)') rcounts = reg.findall(rcounts)[0] return [content, ptime, rcounts] except Exception: return ['', '', '0']
def _today_ticks(symbol, tdate, pageNo, retry_count, pause): ct._write_console() for _ in range(retry_count): time.sleep(pause) try: html = lxml.html.parse(ct.TODAY_TICKS_URL % (ct.P_TYPE['http'], ct.DOMAINS['vsf'], ct.PAGES['t_ticks'], symbol, tdate, pageNo )) res = html.xpath('//table[@id=\"datatbl\"]/tbody/tr') if ct.PY3: sarr = [etree.tostring(node).decode('utf-8') for node in res] else: sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr) sarr = '<table>%s</table>'%sarr sarr = sarr.replace('--', '0') df = pd.read_html(StringIO(sarr), parse_dates=False)[0] df.columns = ct.TODAY_TICK_COLUMNS df['pchange'] = df['pchange'].map(lambda x : x.replace('%', '')) except Exception as e: print(e) else: return df raise IOError(ct.NETWORK_URL_ERROR_MSG)
def get_releasenote(html_source): html = lxml.html.fromstring(html_source) versions_dates = html.xpath("//*[contains(@class, 'app-version-block')]//h5") releasenotes = [] for version_date in versions_dates: try: date = datetime.datetime.strptime(re.search('\((.+?)\)', version_date.text).group(1), '%b %d, %Y') version = re.search(r'Version (.+?) \(', version_date.text).group(1) try: note = version_date.getnext() note = re.sub(b"[\r\n]+", b".", etree.tostring(note, pretty_print=True)) note = re.sub(b"<br />", b".", note) note = re.sub(b"<br/>", b".", note) note = b'. '.join(re.findall(b'<p>(.+?)</p>', note)) except: note = '' releasenotes.append({'date': date, 'version': version, 'note': note}) except: pass # notes = html.xpath("//*[contains(@class, 'app-version-note')]") return releasenotes
def parseXMLxpathSearch(xml_source, xpathString): #--------------------------------------------------------------------------------- return_values = [] try: root = etree.XML(xml_source) data_points = root.xpath(xpathString) for data in data_points: return_values.append(etree.tostring(data)) data.clear() except: pass return return_values #--------------------------------------------------------------------------------- # parse XML and return value asked (designed for errors via stdout)
def parseXMLxpathSearchSingle(xml_source, xpathString): #--------------------------------------------------------------------------------- return_values = [] try: root = etree.XML(xml_source) data_points = root.xpath(xpathString) for data in data_points: return_values.append(data) data.clear() except: pass return return_values #--------------------------------------------------------------------------------- # parse HTML and return value asked
def parseXMLxpathSearchAttribute(xml_source, xpathString): #--------------------------------------------------------------------------------- return_values = [] try: root = etree.XML(xml_source) data_points = root.xpath(xpathString) for data in data_points: return_values.append(data) data.clear() except: pass return return_values #--------------------------------------------------------------------------------- # parse HTML and return value asked
def parseHTMLxpathSearch(http_source, xpathString): #--------------------------------------------------------------------------------- return_values = [] http_source= str(http_source).replace('\x00','') try: html = lxml.html.fromstring(http_source) for data in html.xpath(xpathString): return_values.append(etree.tostring(data.content)) data.clear() except: pass return return_values #--------------------------------------------------------------------------------- # parse HTML and return value asked
def __query_new_stocks(self): DATA_URL = 'http://vip.stock.finance.sina.com.cn/corp/view/vRPD_NewStockIssue.php?page=1&cngem=0&orderBy=NetDate&orderType=desc' html = lxml.html.parse(DATA_URL) res = html.xpath('//table[@id=\"NewStockTable\"]/tr') if six.PY2: sarr = [etree.tostring(node) for node in res] else: sarr = [etree.tostring(node).decode('utf-8') for node in res] sarr = ''.join(sarr) sarr = sarr.replace('<font color="red">*</font>', '') sarr = '<table>%s</table>' % sarr df = pd.read_html(StringIO(sarr), skiprows=[0, 1])[0] df = df.select(lambda x: x in [0, 1, 2, 3, 7], axis=1) df.columns = ['code', 'xcode', 'name', 'ipo_date', 'price'] df['code'] = df['code'].map(lambda x: str(x).zfill(6)) df['xcode'] = df['xcode'].map(lambda x: str(x).zfill(6)) return df
def _dist_cotent(year, pageNo, retry_count, pause): for _ in range(retry_count): time.sleep(pause) try: if pageNo > 0: ct._write_console() html = lxml.html.parse(rv.DP_163_URL%(ct.P_TYPE['http'], ct.DOMAINS['163'], ct.PAGES['163dp'], year, pageNo)) res = html.xpath('//div[@class=\"fn_rp_list\"]/table') if ct.PY3: sarr = [etree.tostring(node).decode('utf-8') for node in res] else: sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr) df = pd.read_html(sarr, skiprows=[0])[0] df = df.drop(df.columns[0], axis=1) df.columns = rv.DP_163_COLS df['divi'] = df['plan'].map(_fun_divi) df['shares'] = df['plan'].map(_fun_into) df = df.drop('plan', axis=1) df['code'] = df['code'].astype(object) df['code'] = df['code'].map(lambda x : str(x).zfill(6)) pages = [] if pageNo == 0: page = html.xpath('//div[@class=\"mod_pages\"]/a') if len(page)>1: asr = page[len(page)-2] pages = asr.xpath('text()') except Exception as e: print(e) else: if pageNo == 0: return df, pages[0] if len(pages)>0 else 0 else: return df raise IOError(ct.NETWORK_URL_ERROR_MSG)
def _get_forecast_data(year, quarter, pageNo, dataArr): ct._write_console() try: html = lxml.html.parse(ct.FORECAST_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], ct.PAGES['fd'], year, quarter, pageNo, ct.PAGE_NUM[1])) res = html.xpath("//table[@class=\"list_table\"]/tr") if ct.PY3: sarr = [etree.tostring(node).decode('utf-8') for node in res] else: sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr) sarr = sarr.replace('--', '0') sarr = '<table>%s</table>'%sarr df = pd.read_html(sarr)[0] df = df.drop([4, 5, 8], axis=1) df.columns = ct.FORECAST_COLS dataArr = dataArr.append(df, ignore_index=True) nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick') if len(nextPage)>0: pageNo = re.findall(r'\d+',nextPage[0])[0] return _get_forecast_data(year, quarter, pageNo, dataArr) else: return dataArr except Exception as e: print(e)
def _newstocks(data, pageNo, retry_count, pause): for _ in range(retry_count): time.sleep(pause) ct._write_console() try: html = lxml.html.parse(rv.NEW_STOCKS_URL%(ct.P_TYPE['http'],ct.DOMAINS['vsf'], ct.PAGES['newstock'], pageNo)) res = html.xpath('//table[@id=\"NewStockTable\"]/tr') if ct.PY3: sarr = [etree.tostring(node).decode('utf-8') for node in res] else: sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr) sarr = sarr.replace('<font color="red">*</font>', '') sarr = '<table>%s</table>'%sarr df = pd.read_html(StringIO(sarr), skiprows=[0, 1])[0] df = df.drop([df.columns[idx] for idx in [1, 12, 13, 14]], axis=1) df.columns = rv.NEW_STOCKS_COLS df['code'] = df['code'].map(lambda x : str(x).zfill(6)) res = html.xpath('//table[@class=\"table2\"]/tr[1]/td[1]/a/text()') tag = '???' if ct.PY3 else unicode('???', 'utf-8') hasNext = True if tag in res else False data = data.append(df, ignore_index=True) pageNo += 1 if hasNext: data = _newstocks(data, pageNo, retry_count, pause) except Exception as ex: print(ex) else: return data
def get_notices(code=None, date=None): ''' ?????? Parameters -------- code:???? date:?????? Return -------- DataFrame?????? title:???? type:???? date:???? url:????URL ''' if code is None: return None symbol = 'sh' + code if code[:1] == '6' else 'sz' + code url = nv.NOTICE_INFO_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], ct.PAGES['ntinfo'], symbol) url = url if date is None else '%s&gg_date=%s'%(url, date) html = lxml.html.parse(url) res = html.xpath('//table[@class=\"body_table\"]/tbody/tr') data = [] for td in res: title = td.xpath('th/a/text()')[0] type = td.xpath('td[1]/text()')[0] date = td.xpath('td[2]/text()')[0] url = '%s%s%s'%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], td.xpath('th/a/@href')[0]) data.append([title, type, date, url]) df = pd.DataFrame(data, columns=nv.NOTICE_INFO_CLS) return df
def _parse_fq_data(url, index, retry_count, pause): for _ in range(retry_count): time.sleep(pause) try: request = Request(url) text = urlopen(request, timeout=10).read() text = text.decode('GBK') html = lxml.html.parse(StringIO(text)) res = html.xpath('//table[@id=\"FundHoldSharesTable\"]') if ct.PY3: sarr = [etree.tostring(node).decode('utf-8') for node in res] else: sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr) df = pd.read_html(sarr, skiprows = [0, 1])[0] if len(df) == 0: return pd.DataFrame() if index: df.columns = ct.HIST_FQ_COLS[0:7] else: df.columns = ct.HIST_FQ_COLS if df['date'].dtypes == np.object: df['date'] = df['date'].astype(np.datetime64) df = df.drop_duplicates('date') except Exception as e: print(e) else: return df raise IOError(ct.NETWORK_URL_ERROR_MSG)
def _cap_tops(last=5, pageNo=1, retry_count=3, pause=0.001, dataArr=pd.DataFrame()): ct._write_console() for _ in range(retry_count): time.sleep(pause) try: request = Request(rv.LHB_SINA_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], rv.LHB_KINDS[0], ct.PAGES['fd'], last, pageNo)) text = urlopen(request, timeout=10).read() text = text.decode('GBK') html = lxml.html.parse(StringIO(text)) res = html.xpath("//table[@id=\"dataTable\"]/tr") if ct.PY3: sarr = [etree.tostring(node).decode('utf-8') for node in res] else: sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr) sarr = '<table>%s</table>'%sarr df = pd.read_html(sarr)[0] df.columns = rv.LHB_GGTJ_COLS dataArr = dataArr.append(df, ignore_index=True) nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick') if len(nextPage)>0: pageNo = re.findall(r'\d+', nextPage[0])[0] return _cap_tops(last, pageNo, retry_count, pause, dataArr) else: return dataArr except Exception as e: print(e)
def _inst_tops(last=5, pageNo=1, retry_count=3, pause=0.001, dataArr=pd.DataFrame()): ct._write_console() for _ in range(retry_count): time.sleep(pause) try: request = Request(rv.LHB_SINA_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], rv.LHB_KINDS[2], ct.PAGES['fd'], last, pageNo)) text = urlopen(request, timeout=10).read() text = text.decode('GBK') html = lxml.html.parse(StringIO(text)) res = html.xpath("//table[@id=\"dataTable\"]/tr") if ct.PY3: sarr = [etree.tostring(node).decode('utf-8') for node in res] else: sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr) sarr = '<table>%s</table>'%sarr df = pd.read_html(sarr)[0] df = df.drop([2,3], axis=1) df.columns = rv.LHB_JGZZ_COLS dataArr = dataArr.append(df, ignore_index=True) nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick') if len(nextPage)>0: pageNo = re.findall(r'\d+', nextPage[0])[0] return _inst_tops(last, pageNo, retry_count, pause, dataArr) else: return dataArr except Exception as e: print(e)
def _inst_detail(pageNo=1, retry_count=3, pause=0.001, dataArr=pd.DataFrame()): ct._write_console() for _ in range(retry_count): time.sleep(pause) try: request = Request(rv.LHB_SINA_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], rv.LHB_KINDS[3], ct.PAGES['fd'], '', pageNo)) text = urlopen(request, timeout=10).read() text = text.decode('GBK') html = lxml.html.parse(StringIO(text)) res = html.xpath("//table[@id=\"dataTable\"]/tr") if ct.PY3: sarr = [etree.tostring(node).decode('utf-8') for node in res] else: sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr) sarr = '<table>%s</table>'%sarr df = pd.read_html(sarr)[0] df.columns = rv.LHB_JGMX_COLS dataArr = dataArr.append(df, ignore_index=True) nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick') if len(nextPage)>0: pageNo = re.findall(r'\d+', nextPage[0])[0] return _inst_detail(pageNo, retry_count, pause, dataArr) else: return dataArr except Exception as e: print(e)
def _get_report_data(year, quarter, pageNo, dataArr): ct._write_console() try: request = Request(ct.REPORT_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], ct.PAGES['fd'], year, quarter, pageNo, ct.PAGE_NUM[1])) print(ct.REPORT_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], ct.PAGES['fd'], year, quarter, pageNo, ct.PAGE_NUM[1])) text = urlopen(request, timeout=10).read() text = text.decode('GBK') text = text.replace('--', '') html = lxml.html.parse(StringIO(text)) res = html.xpath("//table[@class=\"list_table\"]/tr") if ct.PY3: sarr = [etree.tostring(node).decode('utf-8') for node in res] else: sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr) sarr = '<table>%s</table>'%sarr df = pd.read_html(sarr)[0] df = df.drop(11, axis=1) df.columns = ct.REPORT_COLS dataArr = dataArr.append(df, ignore_index=True) nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick') if len(nextPage)>0: pageNo = re.findall(r'\d+', nextPage[0])[0] return _get_report_data(year, quarter, pageNo, dataArr) else: return dataArr except Exception as e: print(e)
def _get_profit_data(year, quarter, pageNo, dataArr): ct._write_console() try: request = Request(ct.PROFIT_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], ct.PAGES['fd'], year, quarter, pageNo, ct.PAGE_NUM[1])) text = urlopen(request, timeout=10).read() text = text.decode('GBK') text = text.replace('--', '') html = lxml.html.parse(StringIO(text)) res = html.xpath("//table[@class=\"list_table\"]/tr") if ct.PY3: sarr = [etree.tostring(node).decode('utf-8') for node in res] else: sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr) sarr = '<table>%s</table>'%sarr df = pd.read_html(sarr)[0] df.columns=ct.PROFIT_COLS dataArr = dataArr.append(df, ignore_index=True) nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick') if len(nextPage)>0: pageNo = re.findall(r'\d+', nextPage[0])[0] return _get_profit_data(year, quarter, pageNo, dataArr) else: return dataArr except: pass
def _get_operation_data(year, quarter, pageNo, dataArr): ct._write_console() try: request = Request(ct.OPERATION_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], ct.PAGES['fd'], year, quarter, pageNo, ct.PAGE_NUM[1])) text = urlopen(request, timeout=10).read() text = text.decode('GBK') text = text.replace('--', '') html = lxml.html.parse(StringIO(text)) res = html.xpath("//table[@class=\"list_table\"]/tr") if ct.PY3: sarr = [etree.tostring(node).decode('utf-8') for node in res] else: sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr) sarr = '<table>%s</table>'%sarr df = pd.read_html(sarr)[0] df.columns=ct.OPERATION_COLS dataArr = dataArr.append(df, ignore_index=True) nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick') if len(nextPage)>0: pageNo = re.findall(r'\d+', nextPage[0])[0] return _get_operation_data(year, quarter, pageNo, dataArr) else: return dataArr except Exception as e: print(e)
def _get_debtpaying_data(year, quarter, pageNo, dataArr): ct._write_console() try: request = Request(ct.DEBTPAYING_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], ct.PAGES['fd'], year, quarter, pageNo, ct.PAGE_NUM[1])) text = urlopen(request, timeout=10).read() text = text.decode('GBK') html = lxml.html.parse(StringIO(text)) res = html.xpath("//table[@class=\"list_table\"]/tr") if ct.PY3: sarr = [etree.tostring(node).decode('utf-8') for node in res] else: sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr) sarr = '<table>%s</table>'%sarr df = pd.read_html(sarr)[0] df.columns = ct.DEBTPAYING_COLS dataArr = dataArr.append(df, ignore_index=True) nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick') if len(nextPage)>0: pageNo = re.findall(r'\d+', nextPage[0])[0] return _get_debtpaying_data(year, quarter, pageNo, dataArr) else: return dataArr except Exception as e: print(e)
def _get_cashflow_data(year, quarter, pageNo, dataArr): ct._write_console() try: request = Request(ct.CASHFLOW_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], ct.PAGES['fd'], year, quarter, pageNo, ct.PAGE_NUM[1])) text = urlopen(request, timeout=10).read() text = text.decode('GBK') text = text.replace('--', '') html = lxml.html.parse(StringIO(text)) res = html.xpath("//table[@class=\"list_table\"]/tr") if ct.PY3: sarr = [etree.tostring(node).decode('utf-8') for node in res] else: sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr) sarr = '<table>%s</table>'%sarr df = pd.read_html(sarr)[0] df.columns = ct.CASHFLOW_COLS dataArr = dataArr.append(df, ignore_index=True) nextPage = html.xpath('//div[@class=\"pages\"]/a[last()]/@onclick') if len(nextPage)>0: pageNo = re.findall(r'\d+', nextPage[0])[0] return _get_cashflow_data(year, quarter, pageNo, dataArr) else: return dataArr except Exception as e: print(e)
def _parse_fq_data(url, index, retry_count, pause): for _ in range(retry_count): time.sleep(pause) try: request = Request(url) text = urlopen(request, timeout=10).read() text = text.decode('GBK') html = lxml.html.parse(StringIO(text)) res = html.xpath('//table[@id=\"FundHoldSharesTable\"]') sarr = [etree.tostring(node) for node in res] sarr = ''.join(sarr) if sarr == '': return None df = pd.read_html(sarr, skiprows = [0, 1])[0] if len(df) == 0: return pd.DataFrame() if index: df.columns = ct.HIST_FQ_COLS[0:7] else: df.columns = ct.HIST_FQ_COLS if df['date'].dtypes == np.object: df['date'] = df['date'].astype(np.datetime64) df = df.drop_duplicates('date') except ValueError as e: # ???????????? return None except Exception as e: print(e) else: return df raise IOError(ct.NETWORK_URL_ERROR_MSG)
def parse_venues(venue_types) : methods = {'conf' : clean_conf, 'journals' : clean_journal} venues = [] for venue_type in venue_types : folder = config.DATA + ("venues/html/%s/" % venue_type) print "\nProcessing folder '%s'" % folder for file_name in os.listdir(folder) : print " '%s'" % file_name with open(os.path.join(folder, file_name), 'r') as file : lines = file.readlines() # Get the line of interest and parse it as an HTML html = lxml.html.fromstring(lines[16]) for item in html.xpath("//div[@id='browse-%s-output']//li/a" % venue_type) : process_method = methods[venue_type] name = process_method(item.text_content()) venues.append((name, venue_type)) print "%d venues." % len(venues) return venues
def parseCDATA(xml_source, xpathString): #--------------------------------------------------------------------------------- return_values = [] print(xml_source) root = etree.fromstring(xml_source) for log in root.xpath(xpathString): return_values.append(str(log.text)) return return_values
def _to_db(html,db,cursor,domain_name): html = lxml.html.fromstring(html) rows = html.xpath('//table') list_dns = [] elements = rows[0].xpath('.//tr') for k in elements: c = k.xpath('.//td//text()') #print c list_dns.append(c[0]) print list_dns dic_domain = {} elements = rows[1].xpath('.//tr') if len(elements) < 1: return 3 for k in elements: c = k.xpath('.//td//text()') if len(c) >1: for ip in ast.literal_eval(str(c[4])): #print c[0] dic_domain['id'] = int(c[0]) dic_domain['sub_domain_name'] = c[2] dic_domain['ip'] = ip dic_domain['find_time'] =c[5] #print dic_domain sql = "INSERT INTO app_subdomainbrute(domain_name, \ sub_domain, sub_ip, fuzz_time, fuzzall_id) \ VALUES ('%s', '%s', '%s', '%s', '%d' )" % \ (domain_name, dic_domain['sub_domain_name'], dic_domain['ip'], dic_domain['find_time'], dic_domain['id']) print sql cursor.execute(sql) db.commit()
def _to_db(html,db,cursor,domain_name): #print html html = lxml.html.fromstring(html) rows = html.xpath('//table') dic_domain = {} elements = rows[1].xpath('.//tr') print len(elements) if len(elements) < 2: return False for k in elements: c = k.xpath('.//td//text()') if len(c) >1: for ip in ast.literal_eval(str(c[4])): #print c[0] dic_domain['id'] = int(c[0]) dic_domain['sub_domain_name'] = c[2] dic_domain['ip'] = ip dic_domain['find_time'] =c[5] #print dic_domain sql = "INSERT INTO app_subdomainbrute(domain_name, \ sub_domain, sub_ip, fuzz_time, fuzzall_id) \ VALUES ('%s', '%s', '%s', '%s', '%d' )" % \ (domain_name, dic_domain['sub_domain_name'], dic_domain['ip'], dic_domain['find_time'], dic_domain['id']) #print sql cursor.execute(sql) db.commit() return True
def _to_db(html,db,cursor,domain_name): html = lxml.html.fromstring(html) rows = html.xpath('//table') list_dns = [] elements = rows[0].xpath('.//tr') for k in elements: c = k.xpath('.//td//text()') #print c list_dns.append(c[0]) #print list_dns dic_domain = {} elements = rows[1].xpath('.//tr') if len(elements) < 1: return 3 for k in elements: c = k.xpath('.//td//text()') if len(c) >1: for ip in ast.literal_eval(str(c[4])): #print c[0] dic_domain['id'] = int(c[0]) dic_domain['sub_domain_name'] = c[2] dic_domain['ip'] = ip dic_domain['find_time'] =c[5] #print dic_domain sql = "INSERT INTO app_subdomainbrute(domain_name, \ sub_domain, sub_ip, fuzz_time, fuzzall_id) \ VALUES ('%s', '%s', '%s', '%s', '%d' )" % \ (domain_name, dic_domain['sub_domain_name'], dic_domain['ip'], dic_domain['find_time'], dic_domain['id']) #print sql cursor.execute(sql) db.commit()
def guba_sina(show_content=False): """ ??sina??????????? Parameter -------- show_content:?????????False Return -------- DataFrame title, ???? content, ?????show_content=True????? ptime, ???? rcounts,???? """ from pandas.io.common import urlopen try: with urlopen(nv.GUBA_SINA_URL%(ct.P_TYPE['http'], ct.DOMAINS['sina'])) as resp: lines = resp.read() html = lxml.html.document_fromstring(lines) res = html.xpath('//ul[@class=\"list_05\"]/li') heads = html.xpath('//div[@class=\"tit_04\"]') data = [] for head in heads[:1]: title = head.xpath('a/text()')[0] url = head.xpath('a/@href')[0] ds = [title] ds.extend(_guba_content(url)) data.append(ds) for row in res: title = row.xpath('a[2]/text()')[0] url = row.xpath('a[2]/@href')[0] ds = [title] ds.extend(_guba_content(url)) data.append(ds) df = pd.DataFrame(data, columns=nv.GUBA_SINA_COLS) df['rcounts'] = df['rcounts'].astype(float) return df if show_content is True else df.drop('content', axis=1) except Exception as er: print(str(er))
def get_ranking(html_source, date, category): html = lxml.html.fromstring(html_source) ranking_obj = [] for i, rank_type in [(2,'free'), (3, 'paid'), (4, 'grossing')]: app_names=[] app_urls = [] seller_names = [] seller_ids = [] store_app_ids=[] ranks = range(1, len(html.xpath("//tr/td[%d]//*[contains(@class, 'app-name')]/span"%i))+1 ) for app_name in html.xpath("//tr/td[%d]//*[contains(@class, 'app-name')]/span"%i): try: app_names.append(app_name.text[:150]) except: app_names.append(0) """ We can use these urls to get the missing app_ids later (if needed)""" for app_url in html.xpath("//tr/td[%d]//*[contains(@class, 'app-name')]"%i): try: app_urls.append(app_url.attrib['href']) except: app_urls.append(0) for img in html.xpath('//tr/td[%d]/div/div/a/img'%i): store_app_id = img.attrib['src'] try: store_app_id = re.search('ios/(.+)/', store_app_id).group(1) store_app_ids.append(store_app_id) except: store_app_ids.append(0) for seller_name in html.xpath("//tr/td[%d]//*[contains(@class, 'publisher-name')]/span"%i): try: seller_names.append(seller_name.text[:150]) except: seller_names.append(0) for seller in html.xpath("//tr/td[%d]//*[contains(@class, 'publisher-name')]"%i): seller_id = seller.attrib['href'] try: seller_id = re.search('(company|publisher)/(.+)/', seller_id).group(2) seller_ids.append(seller_id) except: seller_ids.append(0) for rank, store_app_id, app_name, seller_id, seller_name, app_url in zip(ranks, store_app_ids, app_names, seller_ids, seller_names, app_urls): ranking_obj.append( AppAnnieRankings(store_app_id=store_app_id, app_name=app_name[:200], rank_type=rank_type, category=category[:200], seller_id=seller_id, seller_name=seller_name[:200], app_url=app_url, rank=rank, date=date) ) AppAnnieRankings.objects.bulk_create(ranking_obj, batch_size=10000)
def _to_db(html,db,cursor,domain_name): html = lxml.html.fromstring(html) rows = html.xpath('//table') ''' list_dns_record = [] list_mx_record = [] elements = rows[0].xpath('.//tr') for k in elements: c = k.xpath('.//td//text()') #print c list_dns_record.append(c[0]) #print list_dns elements = rows[1].xpath('.//tr') if len(elements) < 1: return 3 for k in elements: c = k.xpath('.//td//text()') list_mx_record.append(c[0]) print list_dns_record print list_mx_record''' dic_domain = {} elements = rows[3].xpath('.//tr') if len(elements) <2: return False for k in elements: c = k.xpath('.//td//text()') if len(c)>0: dic_domain['domain'] = domain dic_domain['fuzzall_id'] = int(c[0]) dic_domain['ip_range'] = c[1] #dic_domain['data_tag'] = c[2] dic_domain['fuzz_time'] = c[3] sql = "INSERT INTO app_domainiprange(domain_name, \ ip_range, fuzz_time, fuzzall_id) \ VALUES ('%s', '%s', '%s', '%d' )" % \ (domain_name, dic_domain['ip_range'], dic_domain['fuzz_time'], dic_domain['fuzzall_id']) cursor.execute(sql) db.commit() return True