Python re 模块,I 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用re.I。
def __init__(self, url, timeout=None, num_workers=10, **kwargs):
"""
Initialise an instance.
:param url: The root URL to use for scraping.
:param timeout: The timeout, in seconds, to be applied to requests.
This defaults to ``None`` (no timeout specified).
:param num_workers: The number of worker threads you want to do I/O,
This defaults to 10.
:param kwargs: Passed to the superclass.
"""
super(SimpleScrapingLocator, self).__init__(**kwargs)
self.base_url = ensure_slash(url)
self.timeout = timeout
self._page_cache = {}
self._seen = set()
self._to_fetch = queue.Queue()
self._bad_hosts = set()
self.skip_externals = False
self.num_workers = num_workers
self._lock = threading.RLock()
# See issue #45: we need to be resilient when the locator is used
# in a thread, e.g. with concurrent.futures. We can't use self._lock
# as it is for coordinating our internal threads - the ones created
# in _prepare_threads.
self._gplock = threading.RLock()
def get_encodings_from_content(content):
"""Returns encodings from given content string.
:param content: bytestring to extract encodings from.
"""
warnings.warn((
'In requests 3.0, get_encodings_from_content will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
def filterwarnings(action, message="", category=Warning, module="", lineno=0,
append=False):
"""Insert an entry into the list of warnings filters (at the front).
'action' -- one of "error", "ignore", "always", "default", "module",
or "once"
'message' -- a regex that the warning message must match
'category' -- a class that the warning must be a subclass of
'module' -- a regex that the module name must match
'lineno' -- an integer line number, 0 matches all warnings
'append' -- if true, append to the list of filters
"""
import re
assert action in ("error", "ignore", "always", "default", "module",
"once"), "invalid action: %r" % (action,)
assert isinstance(message, str), "message must be a string"
assert isinstance(category, type), "category must be a class"
assert issubclass(category, Warning), "category must be a Warning subclass"
assert isinstance(module, str), "module must be a string"
assert isinstance(lineno, int) and lineno >= 0, \
"lineno must be an int >= 0"
_add_filter(action, re.compile(message, re.I), category,
re.compile(module), lineno, append=append)
def __init__(self, url, timeout=None, num_workers=10, **kwargs):
"""
Initialise an instance.
:param url: The root URL to use for scraping.
:param timeout: The timeout, in seconds, to be applied to requests.
This defaults to ``None`` (no timeout specified).
:param num_workers: The number of worker threads you want to do I/O,
This defaults to 10.
:param kwargs: Passed to the superclass.
"""
super(SimpleScrapingLocator, self).__init__(**kwargs)
self.base_url = ensure_slash(url)
self.timeout = timeout
self._page_cache = {}
self._seen = set()
self._to_fetch = queue.Queue()
self._bad_hosts = set()
self.skip_externals = False
self.num_workers = num_workers
self._lock = threading.RLock()
# See issue #45: we need to be resilient when the locator is used
# in a thread, e.g. with concurrent.futures. We can't use self._lock
# as it is for coordinating our internal threads - the ones created
# in _prepare_threads.
self._gplock = threading.RLock()
def get_encodings_from_content(content):
"""Returns encodings from given content string.
:param content: bytestring to extract encodings from.
"""
warnings.warn((
'In requests 3.0, get_encodings_from_content will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
def get_encodings_from_content(content):
"""Returns encodings from given content string.
:param content: bytestring to extract encodings from.
"""
warnings.warn((
'In requests 3.0, get_encodings_from_content will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
def __init__(self, proxies=None, **x509):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
self.key_file = x509.get('key_file')
self.cert_file = x509.get('cert_file')
self.addheaders = [('User-Agent', self.version)]
self.__tempfiles = []
self.__unlink = os.unlink # See cleanup()
self.tempcache = None
# Undocumented feature: if you assign {} to tempcache,
# it is used to cache files retrieved with
# self.retrieve(). This is not enabled by default
# since it does not work for changing documents (and I
# haven't got the logic to check expiration headers
# yet).
self.ftpcache = ftpcache
# Undocumented feature: you can use a different
# ftp cache by assigning to the .ftpcache member;
# in case you want logically independent URL openers
# XXX This is not threadsafe. Bah.
def find_media_files(media_path):
unconverted = []
for dirname, directories, files in os.walk(media_path):
for file in files:
#skip hidden files
if file.startswith('.'):
continue
if is_video(file) or is_subtitle(file):
file = os.path.join(dirname, file)
#Skip Sample files
if re.search(".sample.",file,re.I):
continue
unconverted.append(file)
sorted_unconvered = sorted(unconverted)
return sorted_unconvered
def add_targets(self):
self.target_dir = os.path.dirname(self.input_video)
self.hard_link = ''
if MOVE_FILES:
#If we care about foreign languages execute this part
if FOREIGN:
audiostreams = self.get_audio_streams()
#if we want to create hard links and there is both english and locale audio stream in the file or in the name
if HARD_LINK and ((LOCALE in audiostreams and 'eng' in audiostreams) or (re.search('.{}.'.format(LOCALE),self.input_video,re.I) and re.search('.eng.',self.input_video,re.I))):
self.target_dir = TVSHOW_TARGET if self.is_show else MOVIE_TARGET
self.hard_link = LANG_TVSHOW_TARGET if self.is_show else LANG_MOVIE_TARGET
else:
#If the the input is matches LOCALE put it in the lang folders
if re.search(LANG_PATTERN,self.input_video,re.I | re.M):
self.target_dir = LANG_TVSHOW_TARGET if self.is_show else LANG_MOVIE_TARGET
#Else put them in the main folder
else:
self.target_dir = TVSHOW_TARGET if self.is_show else MOVIE_TARGET
#if we don't give a shit about multiple languages simply determine if tvshow or movie
else:
self.target_dir = TVSHOW_TARGET if self.is_show else MOVIE_TARGET
def append_folder(self):
if (CREATE_TVSHOW_DIRS and self.is_show):
sub_folder=os.path.basename(self.input_video)[:os.path.basename(self.input_video).find('-')-1]
if CREATE_SEASON_DIRS:
match = re.search(TV_SHOW_PATTERNS[2],self.input_video,re.I)
if match:
season = match.group(1)
if 'season' in locals():
if len(season) == 1:
season = ' 0' + season
else:
season = ' ' + season
else:
Logger.info('Failed to match season pattern in {new}'.format(new=self.input_video))
sys.exit(0)
sub_folder = os.path.join(sub_folder,'Season' + season)
elif (CREATE_MOVIE_DIRS and not self.is_show):
sub_folder=os.path.basename(self.input_video)[:-4]
if 'sub_folder' in locals():
self.target_dir = os.path.join(self.target_dir,sub_folder)
if self.hard_link:
self.hard_link = os.path.join(self.hard_link,sub_folder)
def goglib_search_filter(self, search_bar):
self.goglib_search_filter_list = []
filter = search_bar.get_text()
for game_name in self.goglib_games_list:
# Find sequence of characters in the beggining of the string
if bool(re.match(filter, self.goglib_dict_name_to_title[game_name], re.I)):
self.goglib_search_filter_list.append(game_name)
# Find sequence of characters anywere in the string
if len(filter) > 1:
if filter.lower() in self.goglib_dict_name_to_title[game_name].lower():
self.goglib_search_filter_list.append(game_name)
self.goglib_apply_filters()
def mylib_search_filter(self, search_bar):
self.mylib_search_filter_list = []
filter = search_bar.get_text()
for game_name in self.mylib_games_list:
# Find sequence of characters in the beggining of the string
if bool(re.match(filter, self.mylib_dict_name_to_title[game_name], re.I)):
self.mylib_search_filter_list.append(game_name)
# Find sequence of characters anywere in the string
if len(filter) > 1:
if filter.lower() in self.mylib_dict_name_to_title[game_name].lower():
self.mylib_search_filter_list.append(game_name)
self.mylib_apply_filters()
def search(self, markup):
#print 'looking for %s in %s' % (self, markup)
found = None
# If given a list of items, scan it for a text element that
# matches.
if isList(markup) and not isinstance(markup, Tag):
for element in markup:
if isinstance(element, NavigableString) \
and self.search(element):
found = element
break
# If it's a Tag, make sure its name or attributes match.
# Don't bother with Tags if we're searching for text.
elif isinstance(markup, Tag):
if not self.text:
found = self.searchTag(markup)
# If it's text, make sure the text matches.
elif isinstance(markup, NavigableString) or \
isString(markup):
if self._matches(markup, self.text):
found = markup
else:
raise Exception, "I don't know how to match against a %s" \
% markup.__class__
return found
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
if not url:
return
data = urlparse.parse_qs(url)
data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
url = self.__search([data['tvshowtitle']] + source_utils.aliases_to_array(eval(data['aliases'])), data['year'], season)
if not url: return
r = client.request(urlparse.urljoin(self.base_link, url))
r = dom_parser.parse_dom(r, 'div', attrs={'class': 'ep_link'})
r = dom_parser.parse_dom(r, 'a', req='href')
r = [(i.attrs['href'], i.content) for i in r if i]
r = [(i[0], re.findall("^(?:episode)\s*(\d+)$", i[1], re.I)) for i in r]
r = [(i[0], i[1][0] if i[1] else '0') for i in r]
r = [i[0] for i in r if int(i[1]) == int(episode)][0]
return source_utils.strip_domain(r)
except:
return
def __search(self, titles, year, season='0'):
try:
query = urlparse.urljoin(self.base_link, self.search_link)
t = [cleantitle.get(i) for i in set(titles) if i]
y = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1), '0']
r = client.request(query, post={'do': 'search', 'subaction': 'search', 'search_start': 0, 'full_search': 0, 'result_from': 1, 'story': cleantitle.query(titles[0])})
r = dom_parser.parse_dom(r, 'div', attrs={'class': 'fullstream'})
r = [(dom_parser.parse_dom(i, 'h3', attrs={'class': 'mov-title'}), dom_parser.parse_dom(i, 'div', attrs={'class': 'fullmask'})) for i in r]
r = [(dom_parser.parse_dom(i[0], 'a', req='href'), dom_parser.parse_dom(i[1], 'a', attrs={'href': re.compile('.*/year/\d+')})) for i in r]
r = [(i[0][0].attrs['href'], i[0][0].content, i[1][0].content if i[1] else '0') for i in r if i[0]]
r = [(i[0], i[1], i[2], re.findall('(.+?)\s+(?:\s*-\s*saison)\s+(\d+)', i[1], re.I)) for i in r]
r = [(i[0], i[3][0][0] if len(i[3]) > 0 else i[1], i[2], i[3][0][1] if len(i[3]) > 0 else '0') for i in r]
r = [(i[0], i[1], i[2], '1' if int(season) > 0 and i[3] == '0' else i[3]) for i in r]
r = sorted(r, key=lambda i: int(i[2]), reverse=True) # with year > no year
r = [i[0] for i in r if cleantitle.get(i[1]) in t and i[2] in y and int(i[3]) == int(season)][0]
return source_utils.strip_domain(r)
except:
return
def __search(self, titles, episode):
try:
query = self.search_link % urllib.quote_plus(cleantitle.query(titles[0]) + ' ' + str(episode))
query = urlparse.urljoin(self.base_link, query)
t = [cleantitle.get(i) + str(episode) for i in set(titles) if i]
r = client.request(query)
r = r.split('</style>')[-1].strip()
r = json.loads(r)
r = [(i.get('title', {}).get('rendered'), i.get('content', {}).get('rendered')) for i in r]
r = [(re.sub('ger (?:sub|dub)', '', i[0], flags=re.I).strip(), i[1]) for i in r if i[0] and i[1]]
r = [(i[0], re.findall('(.+?) (\d*)$', i[0]), i[1]) for i in r]
r = [(i[0] if not i[1] else i[1][0][0] + ' ' + str(int(i[1][0][1])), i[2]) for i in r]
r = [dom_parser.parse_dom(i[1], 'div') for i in r if cleantitle.get(i[0]) in t]
r = [[x.attrs['href'] for x in dom_parser.parse_dom(i, 'a', req='href')] + [x.attrs['src'] for x in dom_parser.parse_dom(i, 'iframe', req='src')] for i in r]
return r[0]
except:
return
def get_encodings_from_content(content):
"""Returns encodings from given content string.
:param content: bytestring to extract encodings from.
"""
warnings.warn((
'In requests 3.0, get_encodings_from_content will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
def t_join(m_count):
tmp_count = 0
i = 0
if I < m_count:
count = len(ip_list) + 1
else:
count = m_count
while True:
time.sleep(4)
ac_count = threading.activeCount()
#print ac_count,count
if ac_count < count and ac_count == tmp_count:
i+=1
else:
i=0
tmp_count = ac_count
#print ac_count,queue.qsize()
if (queue.empty() and threading.activeCount() <= 1) or i > 5:
break
def setup(self, config):
"""
Deterine max size to unpack and which directories to ignore.
:param config: Configuration object.
:type config: ``dict``
"""
self.max_size = config.get(helper.MAX_FILE_SIZE, 128) * 1024 * 1024
self.config = config
ignore = {}
path = os.path.join(
config[helper.CODE_ROOT], 'utils', 'diskimage_ignore.txt')
with open(path) as inp:
for line in inp:
if len(line.strip()) == 0 or line.startswith('#'):
continue
ignore[re.escape(line.strip().lower())] = True
self.ignore = re.compile('|'.join(list(ignore.keys())), re.I)
def setup(self, config):
"""
Load name model (word list) and compile regexes for stop characters.
:param config: Configuration object.
:type config: ``dict``
"""
reference_model = os.path.join(
config[helper.CODE_ROOT], config[helper.NAME_MODEL])
self.stopper = regex.compile(('(%s)' % '|'.join([
'and', 'or', 'og', 'eller', r'\?', '&', '<', '>', '@', ':', ';', '/',
r'\(', r'\)', 'i', 'of', 'from', 'to', r'\n', '!'])),
regex.I | regex.MULTILINE)
self.semistop = regex.compile(
('(%s)' % '|'.join([','])), regex.I | regex.MULTILINE)
self.size_probability = [0.000, 0.000, 0.435, 0.489, 0.472, 0.004, 0.000]
self.threshold = 0.25
self.candidates = defaultdict(int)
with gzip.open(reference_model, 'rb') as inp:
self.model = json.loads(inp.read().decode('utf-8'))
self.tokenizer = regex.compile(r'\w{2,20}')
def setup(self, config):
"""
Compile configured regular expressions.
:param config: Configuration object.
:type config: ``dict``
"""
self.matches = {}
patterns = []
for entity_type, pattern_conf in config.get(helper.ENTITIES, {}).items():
patterns.append(
r'\b(?P<{}>{})\b'.format(entity_type, pattern_conf[helper.PATTERN]))
self.pattern = regex.compile(
'|'.join(patterns),
regex.I | regex.U)
def clean_downloaded_metadata(self, mi):
docase = (
mi.language == 'zhn'
)
if mi.title and docase:
# Remove series information from title
m = re.search(r'\S+\s+(\(.+?\s+Book\s+\d+\))$', mi.title)
if m is not None:
mi.title = mi.title.replace(m.group(1), '').strip()
mi.title = fixcase(mi.title)
mi.authors = fixauthors(mi.authors)
if mi.tags and docase:
mi.tags = list(map(fixcase, mi.tags))
mi.isbn = check_isbn(mi.isbn)
if mi.series and docase:
mi.series = fixcase(mi.series)
if mi.title and mi.series:
for pat in (r':\s*Book\s+\d+\s+of\s+%s$', r'\(%s\)$', r':\s*%s\s+Book\s+\d+$'):
pat = pat % re.escape(mi.series)
q = re.sub(pat, '', mi.title, flags=re.I).strip()
if q and q != mi.title:
mi.title = q
break
def _process_message(self, message):
chat = Chat.from_message(self, message)
for mt in MESSAGE_TYPES:
if mt in message:
return self._handlers[mt](chat, message[mt])
if "text" not in message:
return
for patterns, handler in self._commands:
m = re.search(patterns, message["text"], re.I)
if m:
return handler(chat, m)
# No match, run default if it's a 1to1 chat
if not chat.is_group():
return self._default(chat, message)
else:
return self._group_message(chat, message)
def __init__(self, url, timeout=None, num_workers=10, **kwargs):
"""
Initialise an instance.
:param url: The root URL to use for scraping.
:param timeout: The timeout, in seconds, to be applied to requests.
This defaults to ``None`` (no timeout specified).
:param num_workers: The number of worker threads you want to do I/O,
This defaults to 10.
:param kwargs: Passed to the superclass.
"""
super(SimpleScrapingLocator, self).__init__(**kwargs)
self.base_url = ensure_slash(url)
self.timeout = timeout
self._page_cache = {}
self._seen = set()
self._to_fetch = queue.Queue()
self._bad_hosts = set()
self.skip_externals = False
self.num_workers = num_workers
self._lock = threading.RLock()
# See issue #45: we need to be resilient when the locator is used
# in a thread, e.g. with concurrent.futures. We can't use self._lock
# as it is for coordinating our internal threads - the ones created
# in _prepare_threads.
self._gplock = threading.RLock()
def get_encodings_from_content(content):
"""Returns encodings from given content string.
:param content: bytestring to extract encodings from.
"""
warnings.warn((
'In requests 3.0, get_encodings_from_content will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
def get_encodings_from_content(content):
"""Returns encodings from given content string.
:param content: bytestring to extract encodings from.
"""
warnings.warn((
'In requests 3.0, get_encodings_from_content will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
def get_vhdl_setting(cmd_obj, key):
'''
Borrowing an idea from OdatNurd from ST forum, creating a method
that will return the value of a key and also check to see if
it's been overridden in project files. Defaults are handled by
the supplied sublime-settings file.
This will actually work on the regular Preferences as well I think
though might do bad things if the key doesn't exist.
'''
# Load the defaults, or user overridden defaults.
vhdl_settings = sublime.load_settings('vhdl_mode.sublime-settings')
default = vhdl_settings.get(key, None)
# Load the view's settings
view_settings = cmd_obj.view.settings()
return view_settings.get(key, default)
#----------------------------------------------------------------------------
def scan_instantiations(cmd_obj):
'''
Obtaining a list of all regions that contain instantiation labels
and then creating a dictionary of instantiated components and their
associated labels.
'''
instances = {}
selector = 'meta.block.instantiation entity.name.label'
regions = cmd_obj.view.find_by_selector(selector)
for region in regions:
line = cmd_obj.view.substr(cmd_obj.view.full_line(region))
line = re.sub(r'\n', '', line)
row, col = cmd_obj.view.rowcol(region.begin())
pattern = r'^\s*(?P<label>\w+)\s*:\s*(?:entity)?\s*((?P<lib>\w+)\.)?(?P<entity>[\w\.]+)'
s = re.search(pattern, line, re.I)
if s:
if s.group('entity') in instances:
instances[s.group('entity')].append(s.group('label'))
else:
instances[s.group('entity')] = [s.group('label')]
else:
print('vhdl-mode: Could not match instantiation on line {}'.format(row+1))
return instances
def parse_str(self, gen_str):
"""Attempts to extract the information from a generic interface."""
# Right now I'm going to punt. There are so many variations
# on these that it's difficult to write a RE for it. Also
# there are few ways to have to rewrite it. We will extract
# a name, and then a type string (which may include defaults)
gen_pattern = r'\s?(?P<name>.*?)\s?(?::)\s?(?P<type>.*)'
gp = re.compile(gen_pattern, re.IGNORECASE)
s = re.search(gp, gen_str)
if s:
self.name = s.group('name')
# Sometimes the type has a trailing space. Eliminating it.
self.type = re.sub(r'\s*$', '', s.group('type'))
self.success = True
else:
print('vhdl-mode: Could not parse generic string.')
self.success = False
def get_xlc_version(conf, cc):
"""Get the compiler version"""
cmd = cc + ['-qversion']
try:
out, err = conf.cmd_and_log(cmd, output=0)
except Errors.WafError:
conf.fatal('Could not find xlc %r' % cmd)
# the intention is to catch the 8.0 in "IBM XL C/C++ Enterprise Edition V8.0 for AIX..."
for v in (r"IBM XL C/C\+\+.* V(?P<major>\d*)\.(?P<minor>\d*)",):
version_re = re.compile(v, re.I).search
match = version_re(out or err)
if match:
k = match.groupdict()
conf.env['CC_VERSION'] = (k['major'], k['minor'])
break
else:
conf.fatal('Could not determine the XLC version.')
def ifort_modifier_win32(self):
v = self.env
v.IFORT_WIN32 = True
v.FCSTLIB_MARKER = ''
v.FCSHLIB_MARKER = ''
v.FCLIB_ST = v.FCSTLIB_ST = '%s.lib'
v.FCLIBPATH_ST = v.STLIBPATH_ST = '/LIBPATH:%s'
v.FCINCPATH_ST = '/I%s'
v.FCDEFINES_ST = '/D%s'
v.fcprogram_PATTERN = v.fcprogram_test_PATTERN = '%s.exe'
v.fcshlib_PATTERN = '%s.dll'
v.fcstlib_PATTERN = v.implib_PATTERN = '%s.lib'
v.FCLNK_TGT_F = '/out:'
v.FC_TGT_F = ['/c', '/o', '']
v.FCFLAGS_fcshlib = ''
v.LINKFLAGS_fcshlib = '/DLL'
v.AR_TGT_F = '/out:'
v.IMPLIB_ST = '/IMPLIB:%s'
v.append_value('LINKFLAGS', '/subsystem:console')
if v.IFORT_MANIFEST:
v.append_value('LINKFLAGS', ['/MANIFEST'])
def sxc_common_flags(conf):
v=conf.env
v['CC_SRC_F']=[]
v['CC_TGT_F']=['-c','-o']
if not v['LINK_CC']:v['LINK_CC']=v['CC']
v['CCLNK_SRC_F']=[]
v['CCLNK_TGT_F']=['-o']
v['CPPPATH_ST']='-I%s'
v['DEFINES_ST']='-D%s'
v['LIB_ST']='-l%s'
v['LIBPATH_ST']='-L%s'
v['STLIB_ST']='-l%s'
v['STLIBPATH_ST']='-L%s'
v['RPATH_ST']=''
v['SONAME_ST']=[]
v['SHLIB_MARKER']=[]
v['STLIB_MARKER']=[]
v['LINKFLAGS_cprogram']=['']
v['cprogram_PATTERN']='%s'
v['CFLAGS_cshlib']=['-fPIC']
v['LINKFLAGS_cshlib']=['']
v['cshlib_PATTERN']='lib%s.so'
v['LINKFLAGS_cstlib']=[]
v['cstlib_PATTERN']='lib%s.a'
def get_xlf_version(conf, fc):
"""Get the compiler version"""
cmd = fc + ['-qversion']
try:
out, err = conf.cmd_and_log(cmd, output=0)
except Errors.WafError:
conf.fatal('Could not find xlf %r' % cmd)
for v in (r"IBM XL Fortran.* V(?P<major>\d*)\.(?P<minor>\d*)",):
version_re = re.compile(v, re.I).search
match = version_re(out or err)
if match:
k = match.groupdict()
conf.env['FC_VERSION'] = (k['major'], k['minor'])
break
else:
conf.fatal('Could not determine the XLF version.')
def get_xlc_version(conf, cc):
"""Get the compiler version"""
cmd = cc + ['-qversion']
try:
out, err = conf.cmd_and_log(cmd, output=0)
except Errors.WafError:
conf.fatal('Could not find xlc %r' % cmd)
# the intention is to catch the 8.0 in "IBM XL C/C++ Enterprise Edition V8.0 for AIX..."
for v in (r"IBM XL C/C\+\+.* V(?P<major>\d*)\.(?P<minor>\d*)",):
version_re = re.compile(v, re.I).search
match = version_re(out or err)
if match:
k = match.groupdict()
conf.env['CC_VERSION'] = (k['major'], k['minor'])
break
else:
conf.fatal('Could not determine the XLC version.')
def ifort_modifier_win32(self):
v = self.env
v.IFORT_WIN32 = True
v.FCSTLIB_MARKER = ''
v.FCSHLIB_MARKER = ''
v.FCLIB_ST = v.FCSTLIB_ST = '%s.lib'
v.FCLIBPATH_ST = v.STLIBPATH_ST = '/LIBPATH:%s'
v.FCINCPATH_ST = '/I%s'
v.FCDEFINES_ST = '/D%s'
v.fcprogram_PATTERN = v.fcprogram_test_PATTERN = '%s.exe'
v.fcshlib_PATTERN = '%s.dll'
v.fcstlib_PATTERN = v.implib_PATTERN = '%s.lib'
v.FCLNK_TGT_F = '/out:'
v.FC_TGT_F = ['/c', '/o', '']
v.FCFLAGS_fcshlib = ''
v.LINKFLAGS_fcshlib = '/DLL'
v.AR_TGT_F = '/out:'
v.IMPLIB_ST = '/IMPLIB:%s'
v.append_value('LINKFLAGS', '/subsystem:console')
if v.IFORT_MANIFEST:
v.append_value('LINKFLAGS', ['/MANIFEST'])
def sxc_common_flags(conf):
v=conf.env
v['CC_SRC_F']=[]
v['CC_TGT_F']=['-c','-o']
if not v['LINK_CC']:v['LINK_CC']=v['CC']
v['CCLNK_SRC_F']=[]
v['CCLNK_TGT_F']=['-o']
v['CPPPATH_ST']='-I%s'
v['DEFINES_ST']='-D%s'
v['LIB_ST']='-l%s'
v['LIBPATH_ST']='-L%s'
v['STLIB_ST']='-l%s'
v['STLIBPATH_ST']='-L%s'
v['RPATH_ST']=''
v['SONAME_ST']=[]
v['SHLIB_MARKER']=[]
v['STLIB_MARKER']=[]
v['LINKFLAGS_cprogram']=['']
v['cprogram_PATTERN']='%s'
v['CFLAGS_cshlib']=['-fPIC']
v['LINKFLAGS_cshlib']=['']
v['cshlib_PATTERN']='lib%s.so'
v['LINKFLAGS_cstlib']=[]
v['cstlib_PATTERN']='lib%s.a'
def get_xlf_version(conf, fc):
"""Get the compiler version"""
cmd = fc + ['-qversion']
try:
out, err = conf.cmd_and_log(cmd, output=0)
except Errors.WafError:
conf.fatal('Could not find xlf %r' % cmd)
for v in (r"IBM XL Fortran.* V(?P<major>\d*)\.(?P<minor>\d*)",):
version_re = re.compile(v, re.I).search
match = version_re(out or err)
if match:
k = match.groupdict()
conf.env['FC_VERSION'] = (k['major'], k['minor'])
break
else:
conf.fatal('Could not determine the XLF version.')
def get_xlc_version(conf, cc):
"""Get the compiler version"""
cmd = cc + ['-qversion']
try:
out, err = conf.cmd_and_log(cmd, output=0)
except Errors.WafError:
conf.fatal('Could not find xlc %r' % cmd)
# the intention is to catch the 8.0 in "IBM XL C/C++ Enterprise Edition V8.0 for AIX..."
for v in (r"IBM XL C/C\+\+.* V(?P<major>\d*)\.(?P<minor>\d*)",):
version_re = re.compile(v, re.I).search
match = version_re(out or err)
if match:
k = match.groupdict()
conf.env['CC_VERSION'] = (k['major'], k['minor'])
break
else:
conf.fatal('Could not determine the XLC version.')
def get_suncc_version(conf, cc):
"""Get the compiler version"""
cmd = cc + ['-V']
try:
out, err = conf.cmd_and_log(cmd, output=0)
except Errors.WafError as e:
# Older versions of the compiler exit with non-zero status when reporting their version
if not (hasattr(e, 'returncode') and hasattr(e, 'stdout') and hasattr(e, 'stderr')):
conf.fatal('Could not find suncc %r' % cmd)
out = e.stdout
err = e.stderr
version = (out or err)
version = version.splitlines()[0]
version_re = re.compile(r'cc:\s+sun\s+(c\+\+|c)\s+(?P<major>\d*)\.(?P<minor>\d*)', re.I).search
match = version_re(version)
if match:
k = match.groupdict()
conf.env['CC_VERSION'] = (k['major'], k['minor'])
else:
conf.fatal('Could not determine the suncc version.')
# ============ the --as-needed flag should added during the configuration, not at runtime =========
def sxc_common_flags(conf):
v=conf.env
v['CC_SRC_F']=[]
v['CC_TGT_F']=['-c','-o']
if not v['LINK_CC']:v['LINK_CC']=v['CC']
v['CCLNK_SRC_F']=[]
v['CCLNK_TGT_F']=['-o']
v['CPPPATH_ST']='-I%s'
v['DEFINES_ST']='-D%s'
v['LIB_ST']='-l%s'
v['LIBPATH_ST']='-L%s'
v['STLIB_ST']='-l%s'
v['STLIBPATH_ST']='-L%s'
v['RPATH_ST']=''
v['SONAME_ST']=[]
v['SHLIB_MARKER']=[]
v['STLIB_MARKER']=[]
v['LINKFLAGS_cprogram']=['']
v['cprogram_PATTERN']='%s'
v['CFLAGS_cshlib']=['-fPIC']
v['LINKFLAGS_cshlib']=['']
v['cshlib_PATTERN']='lib%s.so'
v['LINKFLAGS_cstlib']=[]
v['cstlib_PATTERN']='lib%s.a'
def get_xlf_version(conf, fc):
"""Get the compiler version"""
cmd = fc + ['-qversion']
try:
out, err = conf.cmd_and_log(cmd, output=0)
except Errors.WafError:
conf.fatal('Could not find xlf %r' % cmd)
for v in (r"IBM XL Fortran.* V(?P<major>\d*)\.(?P<minor>\d*)",):
version_re = re.compile(v, re.I).search
match = version_re(out or err)
if match:
k = match.groupdict()
conf.env['FC_VERSION'] = (k['major'], k['minor'])
break
else:
conf.fatal('Could not determine the XLF version.')
def check_for_auto_merge_trigger(text):
"""Checks the text for the phrases that should trigger an automerge."""
# The comment must address @dpebot directly, on the same line
comment = re.search(
r'@{}\s+\b(.+)'.format(github_helper.github_user()), text, re.I)
if not comment:
return False
else:
# Just get the meat of the command
comment = comment.group(1).strip()
satisfaction = r'\b(pass|passes|green|approv(al|es)|happy|satisfied)'
ci_tool = r'\b(travis|tests|statuses)\b'
merge_action = r'\bmerge\b'
triggers = (
r'{}.+({}.+)?{}'.format(merge_action, ci_tool, satisfaction),
r'{}.+{},.+{}'.format(ci_tool, satisfaction, merge_action),
'lgtm',
)
return any(re.search(trigger, comment, re.I) for trigger in triggers)
def filter_soup(soup, currentpage={}, config={}, **kwargs):
"""
Find patterns that look like callouts, for example **Note:**, and add
callout classes to their parent elements (usually <p>)
"""
# callout classes are defined by page>target>config>default
callout_classes = currentpage.get(CALLOUT_TYPES_FIELD,
config.get(CALLOUT_TYPES_FIELD,
DEFAULT_CALLOUT_TYPES))
callout_intro = re.compile(r"("+"|".join(callout_classes)+"):?$", re.I)
callout_base_class = currentpage.get(CALLOUT_CLASS_FIELD,
config.get(CALLOUT_CLASS_FIELD,
DEFAULT_CALLOUT_CLASS))
callouts = soup.find_all(name=["strong","em"], string=callout_intro)
for c in callouts:
if not c.previous_sibling: #This callout starts a block
callout_type = c.string.replace(":","").lower()
if callout_type in callout_classes:
c.parent["class"] = [callout_base_class, callout_type]
def __init__(self, url, timeout=None, num_workers=10, **kwargs):
"""
Initialise an instance.
:param url: The root URL to use for scraping.
:param timeout: The timeout, in seconds, to be applied to requests.
This defaults to ``None`` (no timeout specified).
:param num_workers: The number of worker threads you want to do I/O,
This defaults to 10.
:param kwargs: Passed to the superclass.
"""
super(SimpleScrapingLocator, self).__init__(**kwargs)
self.base_url = ensure_slash(url)
self.timeout = timeout
self._page_cache = {}
self._seen = set()
self._to_fetch = queue.Queue()
self._bad_hosts = set()
self.skip_externals = False
self.num_workers = num_workers
self._lock = threading.RLock()
# See issue #45: we need to be resilient when the locator is used
# in a thread, e.g. with concurrent.futures. We can't use self._lock
# as it is for coordinating our internal threads - the ones created
# in _prepare_threads.
self._gplock = threading.RLock()
def get_encodings_from_content(content):
"""Returns encodings from given content string.
:param content: bytestring to extract encodings from.
"""
warnings.warn((
'In requests 3.0, get_encodings_from_content will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
def __init__(self, url, timeout=None, num_workers=10, **kwargs):
"""
Initialise an instance.
:param url: The root URL to use for scraping.
:param timeout: The timeout, in seconds, to be applied to requests.
This defaults to ``None`` (no timeout specified).
:param num_workers: The number of worker threads you want to do I/O,
This defaults to 10.
:param kwargs: Passed to the superclass.
"""
super(SimpleScrapingLocator, self).__init__(**kwargs)
self.base_url = ensure_slash(url)
self.timeout = timeout
self._page_cache = {}
self._seen = set()
self._to_fetch = queue.Queue()
self._bad_hosts = set()
self.skip_externals = False
self.num_workers = num_workers
self._lock = threading.RLock()
# See issue #45: we need to be resilient when the locator is used
# in a thread, e.g. with concurrent.futures. We can't use self._lock
# as it is for coordinating our internal threads - the ones created
# in _prepare_threads.
self._gplock = threading.RLock()
def get_encodings_from_content(content):
"""Returns encodings from given content string.
:param content: bytestring to extract encodings from.
"""
warnings.warn((
'In requests 3.0, get_encodings_from_content will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
def get_encodings_from_content(content):
"""Returns encodings from given content string.
:param content: bytestring to extract encodings from.
"""
warnings.warn((
'In requests 3.0, get_encodings_from_content will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
def __init__(self, url, timeout=None, num_workers=10, **kwargs):
"""
Initialise an instance.
:param url: The root URL to use for scraping.
:param timeout: The timeout, in seconds, to be applied to requests.
This defaults to ``None`` (no timeout specified).
:param num_workers: The number of worker threads you want to do I/O,
This defaults to 10.
:param kwargs: Passed to the superclass.
"""
super(SimpleScrapingLocator, self).__init__(**kwargs)
self.base_url = ensure_slash(url)
self.timeout = timeout
self._page_cache = {}
self._seen = set()
self._to_fetch = queue.Queue()
self._bad_hosts = set()
self.skip_externals = False
self.num_workers = num_workers
self._lock = threading.RLock()
# See issue #45: we need to be resilient when the locator is used
# in a thread, e.g. with concurrent.futures. We can't use self._lock
# as it is for coordinating our internal threads - the ones created
# in _prepare_threads.
self._gplock = threading.RLock()
def get_encodings_from_content(content):
"""Returns encodings from given content string.
:param content: bytestring to extract encodings from.
"""
warnings.warn((
'In requests 3.0, get_encodings_from_content will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))