我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用html.unescape()。
def unquote(data: (str, dict, list)): """???????, ????????????? ????? ?? ?? :param data: ?????? ??? ???????????? :return: ??????????????? ????? """ temp = data if issubclass(temp.__class__, str): return html.unescape(html.unescape(temp)) if issubclass(temp.__class__, dict): for k, v in temp.items(): temp[k] = unquote(v) if issubclass(temp.__class__, list): for i in range(len(temp)): temp[i] = unquote(temp[i]) return temp
def _woxikon_de_url_handler(target): ''' Query woxikon for sysnonym ''' time_out_choice = float(get_variable( 'tq_online_backends_timeout', _timeout_period_default)) try: response = urlopen(fixurl(u'http://synonyms.woxikon.com/de/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice) web_content = StringIO(unescape(decode_utf_8(response.read()))) response.close() except HTTPError: return 1 except URLError as err: if isinstance(err.reason, socket.timeout): # timeout error? return 1 return -1 # other error except socket.timeout: # timeout error failed to be captured by URLError return 1 return web_content
def __folha_get_script_content(line, is_title=False): """ Processes the Folha de São Paulo script lines to get the Title and Link of the most read news :param line: a line from the script :return: title or link """ start_index = line.index('"') + 1 last_index = line.rindex('"') content = line[start_index:last_index] # We have to escape html entities for the Title content if is_title: content = html.unescape(content) content = content.replace("\;", "") # Unescape still leaves some garbage we have to clean... return content
def image(self, ctx, *, query: str): """Grab an image off the Internet using Qwant. * query - A string to be used in the search criteria. """ params = urllib.parse.urlencode({"count": "100", "offset": "1", "q": query}) url = BASE_URL_QWANT_API.format(params) async with ctx.bot.session.request("GET", url, headers=self.headers) as response: if response.status == 200: data = await response.json() if not data["data"]["result"]["items"]: await ctx.send("No results found. :<") return item = systemrandom.choice(data["data"]["result"]["items"]) embed = discord.Embed(title=html.unescape(item["title"])) embed.description = f"{item['url']}\n{item['media']}" embed.set_image(url=item["media"]) embed.set_footer(text="Powered by Qwant") await ctx.send(embed=embed) else: message = "Couldn't reach Qwant. x.x" await ctx.send(message)
def unescape_html(html_): """ Replace HTML entities (e.g. `£`) in a string. :param html_: The escaped HTML. :return: The input string with entities replaces. """ # http://stackoverflow.com/a/2360639 if sys.version_info.major == 2: # 2.7 # noinspection PyUnresolvedReferences,PyCompatibility from HTMLParser import HTMLParser return HTMLParser().unescape(html_) if sys.version_info.minor == 3: # 3.3 # noinspection PyCompatibility from html.parser import HTMLParser # noinspection PyDeprecation return HTMLParser().unescape(html_) # 3.4+ # noinspection PyCompatibility import html return html.unescape(html_)
def get_autopost_form(self, request, obj): initial_text = self.get_autopost_text(obj) initial_text = unescape(strip_tags(initial_text)).strip() initial_text = re_newlines.sub('\n', initial_text) initial_text = initial_text[:conf.TEXT_MAX_LENGTH] if request.method == 'POST': return AutpostForm( request.POST, request.FILES, initial={ 'networks': conf.ALLOWED_NETWORK_NAMES, 'text': initial_text, }, prefix=AUTOPOST_FORM_PREFIX ) else: return AutpostForm( initial={ 'networks': conf.ALLOWED_NETWORK_NAMES, 'text': initial_text, }, prefix=AUTOPOST_FORM_PREFIX )
def convert_to_embed(entry): first_ligne = first_ligne_regex.search(entry.summary).group(0) fl_without_tags = tag_regex.subn('', first_ligne)[0] description = html.unescape(fl_without_tags) colour = discord.Colour.magenta() timestamp = datetime.fromtimestamp(mktime(entry.published_parsed)) embed = discord.Embed(title=entry.title, description=description, url=entry.link, colour=colour, timestamp=timestamp) image = first_img_regex.search(entry.summary) if image: image_url = img_src_url_regex.search(image.group(0)).group(0) embed.set_image(url=image_url) embed.set_author(name=_(krosfeed["name"]), url=_(krosfeed["url"]), icon_url=krosfeed["icon"]) return embed # Get the new entries that have not yet been parsed # Parameters: # - feed: dict, the feed containing the entries # - entry_id: str, the id of the last entry that have been parsed # Return: # - last_entries: list, discord embeds presenting the data of each not parsed entry
def write_unitn(cls, out_path, unitn_path, download_path, is_train): with open(unitn_path) as unitn_sr, open(download_path) as download_sr, open(out_path, 'a+') as out_sr: for unitn_line, download_line in zip(unitn_sr, download_sr): doc_id_unitn, label_unitn, text_unitn = \ re.match(r'\d+\t(\d+)\t(negative|neutral|positive)\t(.+)', unitn_line).groups() doc_id_download, label_download, text_download = \ re.match(r'\d+\t(\d+)\t(negative|neutral|positive)\t(.+)', download_line).groups() text_unitn = text_unitn.encode().decode('unicode-escape') text_unitn = text_unitn.replace(r'’', '\'') if is_train: text_unitn = html.unescape(text_unitn) text_unitn = text_unitn.replace('""', '"') text_download = html.unescape(html.unescape(text_download)) assert doc_id_unitn == doc_id_download assert label_unitn == label_download text = text_unitn if text_download != 'Not Available': # some differences are impossible to reconcile, some unitn data have the wrong order # if re.sub(r'\s+', ' ', text_unitn) != re.sub(r'\s+', ' ', text_download): # logging.error(out_path) # logging.error(text_unitn) # logging.error(text_download) # assert re.sub(r'\s+', ' ', text_unitn) == re.sub(r'\s+', ' ', text_download) text = text_download out_sr.write(json.dumps({'id': doc_id_unitn, 'text': text, 'label': cls.class_map[label_unitn]}) + '\n')
def parse_chat_message(self, message): """Parse a game chat message, returning a tuple with the sender's username and the chat text. HTML entities in the text are decoded. """ if self.protocol_version <= 1: # Remove html formatting msg_pattern = r'<span[^>]+>([^<]+)</span>: <span[^>]+>([^<]+)</span>' match = re.match(msg_pattern, message["content"]) if not match: raise WebTilesError("Unable to parse chat message: %s", message["content"]) sender = match.group(1) chat_text = match.group(2) else: sender = message["sender"] chat_text = message["text"] return (sender, html.unescape(chat_text))
def from_data(cls, question): return cls( category=question['category'], type=question['type'], question=unescape(question['question']), answer=unescape(question['correct_answer']), incorrect=tuple(map(unescape, question['incorrect_answers'])), ) # How many times should the cache be used before making an API request # to get more questions, the lower this number, the more likely it will # make an HTTP request. Set to 0 to always use the API # # Note that the toggler is only called when the trivia session doesn't # have any questions in the queue, so be careful when making this really # high. Otherwise the question cache might never be filled.
def _cookie(self): """Retrieves a random fortune cookie fortune.""" regex = ["class=\"cookie-link\">([^`]*?)<\/a>", "<p>([^`]*?)<\/p>", "(?:\\\\['])", "<strong>([^`]*?)<\/strong>", "<\/strong><\/a>([^`]*?)<br>", "3\)<\/strong><\/a>([^`]*?)<\/div>"] url = "http://www.fortunecookiemessage.com" await self.file_check() async with aiohttp.request("GET", url, headers={"encoding": "utf-8"}) as resp: test = str(await resp.text()) fortune = re.findall(regex[0], test) fortest = re.match("<p>", fortune[0]) if fortest is not None: fortune = re.findall(regex[1], fortune[0]) title = re.findall(regex[3], test) info = re.findall(regex[4], test) info[0] = html.unescape(info[0]) dailynum = re.findall(regex[5], test) self.fortune_process(fortune[0]) await self.bot.say("Your fortune is:") await self.bot.upload("data/horoscope/cookie-edit.png") await self.bot.say("\n" + title[1] + info[1] + "\n" + title[2] + dailynum[0]) os.remove("data/horoscope/cookie-edit.png")
def parse_node(self, response, node): il = FeedEntryItemLoader(response=response, base_url='http://{}'.format(self.name), dayfirst=True) il.add_value('updated', node.xpath('//pubDate/text()').extract_first()) il.add_value('author_name', html.unescape(node.xpath('//dc:creator/text()'). extract_first())) categories = node.xpath('//category/text()').extract() for category in categories: il.add_value('category', html.unescape(category)) title = node.xpath('(//title)[2]/text()').extract() if not title and categories: # Fallback to the first category if no title is provided # (e.g. comic). title = categories[0] il.add_value('title', html.unescape(title)) link = node.xpath('(//link)[2]/text()').extract_first() il.add_value('link', link) return scrapy.Request(link, self._parse_article, meta={'il': il})
def clean_filename(string: str) -> str: """ Sanitize a string to be used as a filename. If minimal_change is set to true, then we only strip the bare minimum of characters that are problematic for filesystems (namely, ':', '/' and '\x00', '\n'). """ string = unescape(string) string = unquote(string) string = re.sub(r'<(?P<tag>.+?)>(?P<in>.+?)<(/(?P=tag))>', "\g<in>", string) string = string.replace(':', '_').replace('/', '_').replace('\x00', '_') string = re.sub('[\n\\\*><?\"|\t]', '', string) string = string.strip() return string
def _html(self, definition): """Generate documentation string in HTML format """ if sys.version_info >= (3, 4): escaped_doc = html.escape( html.unescape(definition.doc), quote=False) else: try: escaped_doc = cgi.escape( HTMLParser.unescape.__func__( HTMLParser, definition.doc.encode('utf8') ) ) except AttributeError: # Python 3.x < 3.4 escaped_doc = cgi.escape( HTMLParser.unescape(HTMLParser, definition.doc) ) escaped_doc = escaped_doc.replace('\n', '<br>') return '{0}\n{1}'.format(definition.full_name, escaped_doc)
def load_data(self, ws, result, payload): data = json.loads(result['result']['result']['value']) if data is None: raise ChromeEmptyException('data is null') charset = data['charset'] data['body'] = self.beautify(html.unescape(data['body']), charset) data['head'] = self.beautify(data['head'], charset) data['text'] = self.beautify(data['text'], charset) effect = self.effect_url(data) hostname = urlparse(effect).hostname if effect else None data['ip'] = socket.gethostbyname(hostname) if hostname else None if len(data['body']) <= len('<body></body>'): raise ChromeShortException('too short in retry') if payload.get('need_screenshot', True): screen = self.screenshot(ws, payload.get('shot_quality', 40), payload.get('shot_format', 'jpeg')) else: screen = None data['screenshot'] = screen current_cookies = self.get_cookies(ws) data['cookies'] = current_cookies data['state']='normal' return data
def extract_url(self, text): """ Extract Hackpad Archive URL from a text :param text: :return: the located URL as a string """ archive_url = None regexes = [ re.compile('https:\/\/[A-Za-z0-9\.-]*hackpad-export\.s3[^"]*(?=")'), re.compile('https:\/\/[A-Za-z0-9\.-]*hackpad-export\.s3[^>]*(?=>)') ] for r in regexes: matches = r.findall(text) if matches: archive_url = html.unescape(matches[0]) self._logger.info("Located download URL: %s" % archive_url) break return archive_url
def get_ocr_from_hocr(hocr_file, out_dir): """Extract OCR from the Hocr data Keyword arguments hocr_file -- The HOCR file out_dir -- Directory to write OCR file to. """ output_file = os.path.join(out_dir, 'OCR.txt') if os.path.exists(output_file) and os.path.isfile(output_file) and options.overwrite: os.remove(output_file) logger.debug("{} exists and we are deleting it.".format(output_file)) if not os.path.exists(output_file): logger.debug("Generating OCR.") data = '' with open(hocr_file, 'r') as fpr: data += fpr.read() data = html.unescape(blanklines.sub('', htmlmatch.sub('\1', data))) with open(output_file, 'w') as fpw: fpw.write(data)
def unescape_html(chatbot, statement): """ Convert escaped html characters into unescaped html characters. For example: "<b>" becomes "<b>". """ import sys # Replace HTML escape characters if sys.version_info[0] < 3: from HTMLParser import HTMLParser html = HTMLParser() else: import html statement.text = html.unescape(statement.text) return statement
def tokenize(self, text): escaped = html.unescape(text) tokenized = self.tok.findall(escaped) if self.verbose: self.verbose_text(text, tokenized) if self.lowercase: tokenized = [t.lower() for t in tokenized] return tokenized # sentences = [] # [print(s) for s in sentences] # tokenizer = SocialTokenizer(debug=True, verbose=True) # # for s in sentences: # tokenizer.tokenize(s)
def react_ratings_render_for_props(rf, user, question): request = rf.get('/') request.user = user template = '{% load react_ratings %}{% react_ratings question %}' context = {'request': request, "question": question} # normally annotated by queryset question.negative_rating_count = 0 question.positive_rating_count = 0 content_type = ContentType.objects.get_for_model(question) expected = ( r'^<div data-a4-widget=\"ratings\" data-attributes=' r'\"(?P<props>{.+})\"><\/div>$' ) match = re.match(expected, helpers.render_template(template, context)) assert match assert match.group('props') props = json.loads(html.unescape(match.group('props'))) assert props['contentType'] == content_type.id assert props['objectId'] == question.id del props['contentType'] del props['objectId'] return props
def test_map_display_point(area_settings): point = {'test': [1, 2]} template = '{% load maps_tags %}{% map_display_point point polygon %}' context = {'point': point, 'polygon': area_settings.polygon} expected = ( r'^<div' r' style="height: 300px"' r' data-map="display_point"' r' data-baseurl="{baseurl}"' r' data-attribution="{attribution}"' r' data-point="(?P<point>{{.+}})"' r' data-polygon="(?P<polygon>{{.+}})"' r'></div>$' ).format(baseurl=escape(settings.A4_MAP_BASEURL), attribution=escape(settings.A4_MAP_ATTRIBUTION)) match = re.match(expected, helpers.render_template(template, context)) assert match _point = match.group('point') assert json.loads(unescape(_point)) == point _polygon = match.group('polygon') assert json.loads(unescape(_polygon)) == area_settings.polygon
def react_comment_render_for_props(rf, user, question): request = rf.get('/') request.user = user template = '{% load react_comments %}{% react_comments question %}' context = {'request': request, "question": question} content_type = ContentType.objects.get_for_model(question) expected = ( r'^<div data-a4-widget=\"comment\" data-attributes=' r'\"(?P<props>{.+})\"><\/div>$' ) match = re.match(expected, helpers.render_template(template, context)) assert match assert match.group('props') props = json.loads(html.unescape(match.group('props'))) assert props['subjectType'] == content_type.id assert props['subjectId'] == question.id del props['subjectType'] del props['subjectId'] return props
def search(self, word): """ Search for word. """ if len(word.split()) > 1: return None _word = Utils.remove_accents(word).strip().lower() try: with self.get(BASE_URL.format(_word)) as request: page = html.unescape(request.read().decode(CHARSET)) except: return None found = Word(word) found.meaning = self.scrape_meaning(page) found.synonyms = self.scrape_synonyms(page) found.extra = self.scrape_extra(page) return found
def sanitize_for_unicode(string: str): # Remove html entities string = html.unescape(string) string = string.replace('\u0091', '‘') string = string.replace('\u0092', '’') string = string.replace('\u0093', '“') string = string.replace('\u0094', '”') string = string.replace('\u0096', '–') string = string.replace('\u0097', '—') string = string.replace('\u00ad', '-') string = string.replace('\u00ae', '®') return string
def materialize_attr_values(a: np.ndarray) -> np.ndarray: scalar = False if np.isscalar(a): scalar = True a = np.array([a]) result: np.ndarray = None if np.issubdtype(a.dtype, np.string_): # First ensure that what we load is valid ascii (i.e. ignore anything outside 7-bit range) temp = np.array([x.decode('ascii', 'ignore') for x in a]) # Then unescape XML entities and convert to unicode result = np.array([html.unescape(x) for x in temp.astype(str)], dtype=np.str_) elif np.issubdtype(a.dtype, np.str_) or np.issubdtype(a.dtype, np.unicode_): result = np.array(a.astype(str), dtype=np.str_) else: result = a if scalar: return result[0] else: return result
def process_uba_report(self): thresholds = { 'PM1': THRESHOLD_PM10, 'NO2': THRESHOLD_NO2, } count = 0 for station in list( csv.DictReader(self.data.splitlines(), delimiter=';') ): val = int(station['Messwert (in µg/m³)']) try: station = Station.objects.get(id=station['Stationscode']) except ObjectDoesNotExist: station = Station.objects.create( id=station['Stationscode'], name=html.unescape(station['Stationsname']), ) if val >= thresholds[self.kind]: Alert.objects.get_or_create( report=self, station=station, value=val, ) count += 1 print('%s alerts created' % count)
def get_message_text(self, truncate=False): text = self.get_parameter("layer:SOREM:1.0:Broadcast_Text"); if not text: text = self.description if self.description else self.headline if truncate: parts = text.split('\n\n', 1) text = parts[0] text = text.replace('\n', ' ').replace('\r', '') if sys.version.startswith('3'): import html text = html.unescape(text) else: text = text.replace(''', "\'").replace('"', '\"').replace('&', '&').replace('>', '>').replace('<', '<') return text
def _command_details(self, output, link_only=False): response = "" command = output.get('@mention').split() if len(command) != 2: response += "command must be in the form `details <meme_url>`\n" else: meme_url = html.unescape(command[1][1:-1]) meme_data = scrape_reddit.update_reddit_meme( self.cursor, self.conn, meme_url, self.lock ) if meme_data is None: response += "I could find any data for this url: `{}`, sorry\n".format(meme_url) else: if link_only: for meme in meme_data: response += meme.get('link') + '\n' else: for meme in meme_data: for key, val in sorted(meme.items()): response += "`{key}`: {data}\n".format(key=key, data=val) response += '\n' return response
def functions_that_return(self, channel: str, text: str) -> ChannelMessages: """ give a type, return functions that return things of that type """ func_names = [] text = text.strip() text = html.unescape(text) for (name, func) in self.known_functions().items(): if str(func.__annotations__.get('return', None)) == text: func_names.append((name, func.__annotations__)) message = f"The following functions return `{text}`:\n" message += '```\n' message += '\n'.join(name for (name, type) in func_names) message += '\n```' return ChannelMessage(channel, message)
def get_return_values(resp: str) -> Optional[str]: """ Attempts to extract the return values from the response body. If this is longer than around 250 characters, chances are high that it's garbage, meaning that no return values were found. """ start = resp.find(RETURN_VALUE_HEADER) if start is None: return None start += len(RETURN_VALUE_HEADER) end = resp.find(b"<h3>", start) ret_vals = unescape(remove_tags(resp[start:end])) return ret_vals if len(ret_vals) < 250 else None
def get_list(items): try: items = eval(items) except: return jsonify(status='error', data={'message': 'items error'}), 400 # items = items.split(',') print(items) data = [] for item in items: try: r = requests.get('https://hacker-news.firebaseio.com/v0/item/' + str(item) + '.json') except Exception as e: print(e) return jsonify(status='error', data={'message': 'request error'}), 400 else: result = r.json() if result.get('text', None): result['text'] = html.unescape(result['text']) data.append(result) return jsonify(status='success', data=data)
def jeopardy_wait_for_answer(self): if self.jeopardy_question_active: message = await self.bot.wait_for_message(timeout = clients.wait_time, check = lambda m: self.jeopardy_answer.lower() in [s + m.content.lower() for s in ["", "a ", "an ", "the "]] or m.content.lower() == BeautifulSoup(html.unescape(self.jeopardy_answer.lower()), "html.parser").get_text().lower()) if message and not message.content.startswith('>'): self.jeopardy_answered = message.author #jeopardy stats
def _trivia_countdown(self, answer_message, embed): while self.trivia_countdown: await asyncio.sleep(1) self.trivia_countdown -= 1 embed.set_footer(text = "You have {} seconds left to answer".format(self.trivia_countdown)) await self.bot.edit_message(answer_message, embed = embed) # url = "http://api.futuretraxex.com/v1/getRandomQuestion # await self.bot.say(BeautifulSoup(html.unescape(data["q_text"]), "html.parser").get_text() + "\n1. " + data["q_options_1"] + "\n2. " + data["q_options_2"] + "\n3. " + data["q_options_3"] + "\n4. " + data["q_options_4"]) # if answer == data["q_correct_option"]: # await self.bot.say("The answer was " + str(data["q_correct_option"]) + ". " + data["q_options_" + str(data["q_correct_option"])] + "\n" + correct_players_output)
def cleanJson(self, json): json = html.unescape(json) # Clean out html formatting json = json.replace('_','[blank]') json = json.replace('<br>','\n') json = json.replace('<br/>','\n') json = json.replace('<i>', '*') json = json.replace('</i>', '*') return json
def advisory_fetch_from_mailman(url): try: response = get(url) if 200 != response.status_code: return None asa = unescape(sub('</?A[^<]*?>', '', response.text)) start = '<PRE>' start_marker = '{}Arch Linux Security Advisory'.format(start) end = '\n-------------- next part --------------' asa = asa[asa.index(start_marker) + len(start):asa.index(end)] return asa.strip() except Exception: return None
def on_status(self, status): try: text = html.unescape(status.text) if is_reply(status): return if not str(status.user.id) in self.id: return self.statuses.append(status) except Exception as e: print(e)
def archive(userid, filename='saved.txt'): with open(filename, 'a') as save: for status in tweepy.Cursor(api_twitter.user_timeline, id=userid).items(200): save.write((html.unescape(encode_tweet(status))))
def encode_info(info_text, data): info = '' for label in info_text: try: line = label.format(**data) + '\n' info += html.unescape(line) except AttributeError: pass return info
def get_text(status): status = get_status(status) print(dir(status)) try: status = status.extended_tweet print(dir(status)) text = status['full_text'] #full_text print('tweet is extended (01)') except AttributeError: try: text = status.full_text print('tweet is extended (02)') except AttributeError: text = status.text print('tweet is not extended') return html.unescape(text)
def search_all_anime(self, search_query: str) -> List[Anime]: """ A function to get data for all search results from a query :param str search_query: is what'll be queried for the search results :return: List of anime objects :rtype: List """ with aiohttp.ClientSession(auth=self._auth, headers={"User-Agent": self.user_agent}) as session: async with session.get(ANIME_SEARCH_URL, params={"q": search_query}) as response: # Raise an error if we get the wrong response code if response.status != 200: raise ResponseError(response.status) response_data = await response.read() entries = etree.fromstring(response_data) animes = [] for entry in entries: try: animes.append( Anime( id=entry.find("id").text, titles=Titles( jp=entry.find("title").text, english=entry.find("english").text, synonyms=entry.find("synonyms").text.split(";") ), episode_count=entry.find("episodes").text, dates=Dates( start=entry.find("start_date").text, end=entry.find("end_date").text ), type=entry.find("type").text, status=entry.find("status").text, synopsis=html.unescape(entry.find("synopsis").text.replace("<br />", "").replace("[i]", "").replace("[/i]", "")), cover=entry.find("image").text ) ) except AttributeError: continue return animes
def search_all_manga(self, search_query: str) -> List[Manga]: """ A function to get data for all search results from a query :param str search_query: is what'll be queried for the search results :return: List of anime objects :rtype: List """ with aiohttp.ClientSession(auth=self._auth, headers={"User-Agent": self.user_agent}) as session: async with session.get(MANGA_SEARCH_URL, params={"q": search_query}) as response: # Raise an error if we get the wrong response code if response.status != 200: raise ResponseError(response.status) response_data = await response.read() entries = etree.fromstring(response_data) mangas = [] for entry in entries: try: mangas.append( Manga( id=entry.find("id").text, titles=Titles( jp=entry.find("title").text, english=entry.find("english").text, synonyms=entry.find("synonyms").text.split(";") ), volumes=entry.find("volumes").text, chapters=entry.find("chapters").text, type=entry.find("type").text, status=entry.find("status").text, dates=Dates( start=entry.find("start_date").text, end=entry.find("end_date").text ), synopsis=html.unescape(entry.find("synopsis").text.replace("<br />", "").replace("[i]", "").replace("[/i]", "")), cover=entry.find("image").text ) ) except AttributeError: continue return mangas
def __init__(self, item): meta = self._get_meta(item) url, type, text = meta['url'], meta['type'], meta['text'], self.id = item['id'] self.subreddit = item['subreddit'] self.title = unescape(item['title']) self.score = int(item['score']) self.url = url self.comments = 'https://redd.it/' + item['id'] self.created_at = int(item['created_utc']) self.type = type self.nsfw = item['over_18'] self.text = text
def _process_name(name): """Fix issues with Jochem names.""" # Unescape HTML entities name = unescape(name) # Remove bracketed stuff on the end name = NG_RE.sub('', name).strip() # Nomenclature groups name = END_RE.sub('', name).strip(', ') # Words name = RATIO_RE.sub('', name).strip(', ') # Ratios # Remove stuff off start name = START_RE.sub('', name).strip() # Remove balanced start and end brackets if none in between name = BRACKET_RE.sub('\g<1>', name) # Un-invert CAS style names comps = name.split(', ') if len(comps) == 2: if comps[1].endswith('-'): name = comps[0] name = '%s%s' % (comps[1], name) elif len(comps) > 2: name = comps[0] for i in range(1, len(comps)): if comps[i].endswith('-'): name = '%s%s' % (comps[i], name) else: name = '%s %s' % (name, comps[i]) return name
def unescape_html(content): if unescape is not None: return unescape(content) else: return HTMLParser().unescape(content)
def list_comments(self, topic_id, start=0): """ ???? :param topic_id: ??ID :param start: ?? :return: ??????? """ xml = self.api.xml(API_GROUP_GET_TOPIC % topic_id, params={'start': start}) xml_results = xml.xpath('//ul[@id="comments"]/li') results = [] for item in xml_results: try: author_avatar = item.xpath('.//img/@src')[0] author_url = item.xpath('.//div[@class="user-face"]/a/@href')[0] author_alias = slash_right(author_url) author_signature = item.xpath('.//h4/text()')[1].strip() author_nickname = item.xpath('.//h4/a/text()')[0].strip() created_at = item.xpath('.//h4/span/text()')[0].strip() content = etree.tostring(item.xpath('.//div[@class="reply-doc content"]/p')[0]).decode('utf8').strip() cid = item.get('id') results.append({ 'id': cid, 'author_avatar': author_avatar, 'author_url': author_url, 'author_alias': author_alias, 'author_signature': author_signature, 'author_nickname': author_nickname, 'created_at': created_at, 'content': unescape(content), }) except Exception as e: self.api.logger.exception('parse comment exception: %s' % e) return build_list_result(results, xml)
def print_rt(self, tweet): text = html.unescape(tweet.retweeted_status.text) fmt = "@{user.screen_name} RT @{rt.user.screen_name}: {text}" return fmt.format(user=tweet.user, rt=tweet.retweeted_status, text=text)
def print_tweet(self, tweet): text = html.unescape(tweet.text) fmt = "@{user.screen_name}: {text}" return fmt.format(user=tweet.user, text=text)
def direct_message(self, data): dm = data.direct_message text = html.unescape(dm.text) fmt = "@{sender} ? @{recipient}: {text}\n" + "-" * 10 print(fmt.format(sender=dm.sender.screen_name, recipient=dm.recipient.screen_name, text=text))
def favorited(self, data): print(data.source.screen_name, "favorited:", html.unescape(data.target_object.text) + "\n" + "-" * 10)
def get_home(**params): req = client.api.statuses.home_timeline.get(count=200, **params) responses = req.iterator.with_since_id() home = [] async for tweets in responses: for tweet in reversed(tweets): text = html.unescape(tweet.text) print("@{user.screen_name}: {text}".format(user=tweet.user, text=text)) print("-" * 10) await asyncio.sleep(180) return home