我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用re.error()。
def _unsafe_writes(self, src, dest, exception): # sadly there are some situations where we cannot ensure atomicity, but only if # the user insists and we get the appropriate error we update the file unsafely if exception.errno == errno.EBUSY: #TODO: issue warning that this is an unsafe operation, but doing it cause user insists try: try: out_dest = open(dest, 'wb') in_src = open(src, 'rb') shutil.copyfileobj(in_src, out_dest) finally: # assuring closed files in 2.4 compatible way if out_dest: out_dest.close() if in_src: in_src.close() except (shutil.Error, OSError, IOError): e = get_exception() self.fail_json(msg='Could not write data to file (%s) from (%s): %s' % (dest, src, e)) else: self.fail_json(msg='Could not replace file: %s to %s: %s' % (src, dest, exception))
def eval_print_amount(self, sel, list, msg): new_list = list if isinstance(sel, basestring): try: rex = re.compile(sel) except re.error: msg += " <Invalid regular expression %r>\n" % sel return new_list, msg new_list = [] for func in list: if rex.search(func_std_string(func)): new_list.append(func) else: count = len(list) if isinstance(sel, float) and 0.0 <= sel < 1.0: count = int(count * sel + .5) new_list = list[:count] elif isinstance(sel, (int, long)) and 0 <= sel < count: count = sel new_list = list[:count] if len(list) != len(new_list): msg += " List reduced from %r to %r due to restriction <%r>\n" % ( len(list), len(new_list), sel) return new_list, msg
def __call__(self, value): if value is None: return value path = unicode(value) if not os.path.isabs(path): path = os.path.join(self.directory, path) try: value = open(path, self.mode) if self.buffering is None else open(path, self.mode, self.buffering) except IOError as error: raise ValueError('Cannot open {0} with mode={1} and buffering={2}: {3}'.format( value, self.mode, self.buffering, error)) return value
def connect(self): tries = 0 while True: log.debug("Attempting to connect to JACK server...") status = jacklib.jack_status_t() self.client = jacklib.client_open("jack-matchmaker", jacklib.JackNoStartServer, status) err = get_jack_status_error_string(status) if not err: break tries += 1 if self.connect_maxattempts and tries >= self.connect_maxattempts: log.error("Maximum number (%i) of connection attempts reached. Aborting.", self.connect_maxattempts) raise RuntimeError(err) log.debug("Waiting %.2f seconds to connect again...", self.connect_interval) time.sleep(self.connect_interval) jacklib.on_shutdown(self.client, self.shutdown_callback, 'blah') log.debug("Client connected, UUID: %s", jacklib.client_get_uuid(self.client))
def __init__(self): self.lexre = None # Master regular expression. This is a list of # tuples (re,findex) where re is a compiled # regular expression and findex is a list # mapping regex group numbers to rules self.lexretext = None # Current regular expression strings self.lexstatere = {} # Dictionary mapping lexer states to master regexs self.lexstateretext = {} # Dictionary mapping lexer states to regex strings self.lexstaterenames = {} # Dictionary mapping lexer states to symbol names self.lexstate = "INITIAL" # Current lexer state self.lexstatestack = [] # Stack of lexer states self.lexstateinfo = None # State information self.lexstateignore = {} # Dictionary of ignored characters for each state self.lexstateerrorf = {} # Dictionary of error functions for each state self.lexreflags = 0 # Optional re compile flags self.lexdata = None # Actual input data (as a string) self.lexpos = 0 # Current position in input text self.lexlen = 0 # Length of the input text self.lexerrorf = None # Error rule (if any) self.lextokens = None # List of valid tokens self.lexignore = "" # Ignored characters self.lexliterals = "" # Literal characters that can be passed through self.lexmodule = None # Module self.lineno = 1 # Current line number self.lexoptimize = 0 # Optimized mode
def get_tokens(self): tokens = self.ldict.get("tokens",None) if not tokens: self.log.error("No token list is defined") self.error = 1 return if not isinstance(tokens,(list, tuple)): self.log.error("tokens must be a list or tuple") self.error = 1 return if not tokens: self.log.error("tokens is empty") self.error = 1 return self.tokens = tokens # Validate the tokens
def edit(self, argv): (group, subgroup, task) = self._backward_parser(argv) maybe_raise_unrecognized_argument(argv) data_pretty = json.dumps(self._load_task_data(group, subgroup, task), indent=4) selector = _(group, subgroup, task) new_data_raw = prompt("Editing: %s" % selector, data_pretty, "json") if new_data_raw: new_data = json.loads(new_data_raw) if is_valid_task_data(new_data): msg.normal("Manually edited: %s" % selector) self._save_task(group, subgroup, task, new_data) else: msg.error("Invalid data.") else: msg.normal("Operation cancelled.")
def interpret(argv): try: dit = Dit() dit.interpret(argv) except DitError as err: msg.error(err) except SubprocessError as err: msg.error("`%s` returned with non-zero code, aborting." % err) except IndexError as err: # this was probably caused by a pop on an empty argument list msg.error("Missing argument.") except json.decoder.JSONDecodeError: msg.error("Invalid JSON.") except re.error as err: # this was probably caused by a bad regex in the --where filter msg.error("Bad regular expression: %s" % err)
def __init__(self, app, conf, public_api_routes=None): api_routes = [] if public_api_routes is None else public_api_routes self._iotronic_app = app # TODO(mrda): Remove .xml and ensure that doesn't result in a # 401 Authentication Required instead of 404 Not Found route_pattern_tpl = '%s(\.json|\.xml)?$' try: self.public_api_routes = [re.compile(route_pattern_tpl % route_tpl) for route_tpl in api_routes] except re.error as e: msg = _('Cannot compile public API routes: %s') % e LOG.error(msg) raise exception.ConfigInvalid(error_msg=msg) super(AuthTokenMiddleware, self).__init__(app, conf)
def process_path_value(cls, val, must_exist, can_have_subdict): """ does the relative path processing for a value from the dictionary, which can be a string, a list of strings, or a list of strings and "tagged" strings (sub-dictionaries whose values are strings) :param val: the value we are processing, for error messages :param must_exist: whether there must be a value :param can_have_subdict: whether the value can be a tagged string """ if isinstance(val, six.string_types): return cls.relative_path(val, must_exist) elif isinstance(val, list): vals = [] for entry in val: if can_have_subdict and isinstance(entry, dict): for subkey, subval in six.iteritems(entry): vals.append({subkey: cls.relative_path(subval, must_exist)}) else: vals.append(cls.relative_path(entry, must_exist)) return vals
def regex(self): """ Returns a compiled regular expression, depending upon the activated language-code. """ language_code = get_language() if language_code not in self._regex_dict: if isinstance(self._regex, six.string_types): regex = self._regex else: regex = force_text(self._regex) try: compiled_regex = re.compile(regex, re.UNICODE) except re.error as e: raise ImproperlyConfigured( '"%s" is not a valid regular expression: %s' % (regex, six.text_type(e))) self._regex_dict[language_code] = compiled_regex return self._regex_dict[language_code]
def validate(self, instance, value): #pylint: disable=unused-argument,no-self-use """Check if the value is valid for the Property If valid, return the value, possibly coerced from the input value. If invalid, a ValueError is raised. .. warning:: Calling :code:`validate` again on a coerced value must not modify the value further. .. note:: This function should be able to handle :code:`instance=None` since valid Property values are independent of containing HasProperties class. However, the instance is passed to :code:`error` for a more verbose error message, and it may be used for additional optional validation. """ return value
def error(self, instance, value, error_class=None, extra=''): """Generate a :code:`ValueError` for invalid value assignment The instance is the containing HasProperties instance, but it may be None if the error is raised outside a HasProperties class. """ error_class = error_class if error_class is not None else ValueError prefix = 'The {} property'.format(self.__class__.__name__) if self.name != '': prefix = prefix + " '{}'".format(self.name) if instance is not None: prefix = prefix + ' of a {cls} instance'.format( cls=instance.__class__.__name__, ) raise error_class( '{prefix} must be {info}. A value of {val!r} {vtype!r} was ' 'specified. {extra}'.format( prefix=prefix, info=self.info or 'corrected', val=value, vtype=type(value), extra=extra, ) )
def validate(self, instance, value): """Check if value is a string, and strips it and changes case""" value_type = type(value) if not isinstance(value, string_types): self.error(instance, value) if self.regex is not None and self.regex.search(value) is None: #pylint: disable=no-member self.error(instance, value) value = value.strip(self.strip) if self.change_case == 'upper': value = value.upper() elif self.change_case == 'lower': value = value.lower() if self.unicode: value = text_type(value) else: value = value_type(value) return value
def validate(self, instance, value): """Checks that the value is a valid file open in the correct mode If value is a string, it attempts to open it with the given mode. """ if isinstance(value, string_types) and self.mode is not None: try: value = open(value, self.mode) except (IOError, TypeError): self.error(instance, value, extra='Cannot open file: {}'.format(value)) if not all([hasattr(value, attr) for attr in ('read', 'seek')]): self.error(instance, value, extra='Not a file-like object') if not hasattr(value, 'mode') or self.valid_modes is None: pass elif value.mode not in self.valid_modes: self.error(instance, value, extra='Invalid mode: {}'.format(value.mode)) if getattr(value, 'closed', False): self.error(instance, value, extra='File is closed.') return value
def view_source (self, url, line, col): """View URL source in editor window.""" self.editor.setWindowTitle(u"View %s" % url) self.editor.setUrl(url) data, info = urlutil.get_content(url, proxy=self.config["proxy"]) if data is None: msg = u"An error occurred retreiving URL `%s': %s." % (url, info) self.editor.setText(msg) else: content_type = httputil.get_content_type(info) if not content_type: # read function for content type guessing read = lambda: data content_type = mimeutil.guess_mimetype(url, read=read) self.editor.setContentType(content_type) self.editor.setText(data, line=line, col=col) self.editor.show()
def open_target(self): ''' Connects to a NE using telnet protocol with provided login credentials''' print_info('telnet Target open') host = self.target port = self.port print_info ( "OPENING TELNET Connection...\n") print_info ("HOST: {0} PORT: {1}".format(host, port)) try: self.tnet.open(host, port) self.log = open(self.logfile,'w') except socket.error,err: print_warning( "Login failed {0}".format(str(err))) return False else: return True
def read(self, prompt='', timeout=60): ''' Reads the output till the prompt and returns the result and reports Failure on mismatch of response''' if not prompt: prompt = self.ne_prompt res = self.tnet.expect([prompt], timeout) self.cmd_rsp = res[2] try: if res: self.log.write(res[2]) self.log.flush() else: self.log.write("Expected Prompt Not found.", res) self.log.flush() #re.search(prompt, self.cmd_rsp) except re.error: print_debug( "Expected Response:{0}".format( prompt)) print_debug( "Received Response:{0}".format(self.cmd_rsp)) return self.cmd_rsp
def requestprepare(self, dbase): """ Request that the prepare method of the rule is executed if needed Special: Custom Filters have fixed values, so only one instance needs to exists during a search. It is stored in a FilterStore, and initialized once. As filters are can be grouped in a group filter, we request a prepare. Only the first time prepare will be called """ if self.nrprepare == 0: if self.use_regex: self.regex = [None]*len(self.labels) for i in range(len(self.labels)): if self.list[i]: try: self.regex[i] = re.compile(self.list[i], re.I) except re.error: self.regex[i] = re.compile('') self.match_substring = self.match_regex self.prepare(dbase) self.nrprepare += 1
def filedump(self, ext='tmp', lazy=True): """Dumps parsed configurations into files. :param str ext: The file extension to use for the dumped files. If empty, this overrides the existing conf files. :param bool lazy: Only write files that have been modified """ # Best-effort atomicity is enforced above us by reverter.py for filename in self.parsed: tree = self.parsed[filename] if ext: filename = filename + os.path.extsep + ext try: if lazy and not tree.is_dirty(): continue out = nginxparser.dumps(tree) logger.debug('Writing nginx conf tree to %s:\n%s', filename, out) with open(filename, 'w') as _file: _file.write(out) except IOError: logger.error("Could not open file for writing: %s", filename)
def _add_directives(block, directives, replace): """Adds or replaces directives in a config block. When replace=False, it's an error to try and add a directive that already exists in the config block with a conflicting value. When replace=True and a directive with the same name already exists in the config block, the first instance will be replaced. Otherwise, the directive will be added to the config block. ..todo :: Find directives that are in included files. :param list block: The block to replace in :param list directives: The new directives. """ for directive in directives: _add_directive(block, directive, replace) if block and '\n' not in block[-1]: # could be " \n " or ["\n"] ! block.append(nginxparser.UnspacedList('\n'))
def get_tokens(self): tokens = self.ldict.get('tokens', None) if not tokens: self.log.error('No token list is defined') self.error = True return if not isinstance(tokens, (list, tuple)): self.log.error('tokens must be a list or tuple') self.error = True return if not tokens: self.log.error('tokens is empty') self.error = True return self.tokens = tokens # Validate the tokens
def match_file(self, filename): """Used to check if files can be handled by this linter, Often this will just file extension checks.""" pattern = self.options.get('pattern') or self.default_pattern if not pattern: return True globs = pattern.split() for glob in globs: if fnmatch.fnmatch(filename, glob): # ??? glob ?? return True try: if re.match(pattern, filename, re.I): # ??????????? return True except re.error: pass return False
def validate_args(self, args: configargparse.Namespace) -> None: if args.version_parser == self.versionparser_name: if args.version_regex: try: re.compile(args.version_regex) except re.error as e: raise ErrorMessage("The regular expression passed to %s (%s) is invalid: %s." % (highlight("--version-regex"), highlight(args.version_regex), str(e))) def check_for(string: str) -> None: if string not in args.version_regex: raise ErrorMessage("The regular expression specified in %s must contain a named group %s." % (highlight("--version-regex"), highlight(string))) for g in ["<major>", "<minor>", "<patch>"]: check_for(g) else: raise ErrorMessage("%s requires the parameter %s" % (highlight("--version-parser=%s" % self.versionparser_name), highlight("--version-regex")))
def _check_locale(self): ''' Uses the locale module to test the currently set locale (per the LANG and LC_CTYPE environment settings) ''' try: # setting the locale to '' uses the default locale # as it would be returned by locale.getdefaultlocale() locale.setlocale(locale.LC_ALL, '') except locale.Error: # fallback to the 'C' locale, which may cause unicode # issues but is preferable to simply failing because # of an unknown locale locale.setlocale(locale.LC_ALL, 'C') os.environ['LANG'] = 'C' os.environ['LC_ALL'] = 'C' os.environ['LC_MESSAGES'] = 'C' except Exception: e = get_exception() self.fail_json(msg="An unknown error was encountered while attempting to validate the locale: %s" % e)
def _set_cwd(self): try: cwd = os.getcwd() if not os.access(cwd, os.F_OK|os.R_OK): raise return cwd except: # we don't have access to the cwd, probably because of sudo. # Try and move to a neutral location to prevent errors for cwd in [os.path.expandvars('$HOME'), tempfile.gettempdir()]: try: if os.access(cwd, os.F_OK|os.R_OK): os.chdir(cwd) return cwd except: pass # we won't error here, as it may *not* be a problem, # and we don't want to break modules unnecessarily return None
def handlePrefProviders(self): for mc in self.multiconfigs: localdata = data.createCopy(self.databuilder.mcdata[mc]) bb.data.update_data(localdata) bb.data.expandKeys(localdata) # Handle PREFERRED_PROVIDERS for p in (localdata.getVar('PREFERRED_PROVIDERS', True) or "").split(): try: (providee, provider) = p.split(':') except: providerlog.critical("Malformed option in PREFERRED_PROVIDERS variable: %s" % p) continue if providee in self.recipecaches[mc].preferred and self.recipecaches[mc].preferred[providee] != provider: providerlog.error("conflicting preferences for %s: both %s and %s specified", providee, provider, self.recipecaches[mc].preferred[providee]) self.recipecaches[mc].preferred[providee] = provider
def matchFile(self, buildfile): """ Find the .bb file which matches the expression in 'buildfile'. Raise an error if multiple files """ matches = self.matchFiles(buildfile) if len(matches) != 1: if matches: msg = "Unable to match '%s' to a specific recipe file - %s matches found:" % (buildfile, len(matches)) if matches: for f in matches: msg += "\n %s" % f parselog.error(msg) else: parselog.error("Unable to find any recipe file matching '%s'" % buildfile) raise NoSpecificMatch return matches[0]
def _match_re(self): if self.__match_re is None: expression = '^%s$' % self._expression try: self.__match_re = re.compile(expression, re.IGNORECASE | re.DOTALL) except AssertionError: # access error through sys to keep py3k and backward compat e = str(sys.exc_info()[1]) if e.endswith('this version only supports 100 named groups'): raise TooManyFields('sorry, you are attempting to parse ' 'too many complex fields') except re.error: raise NotImplementedError("Group names (e.g. (?P<name>) can " "cause failure, as they are not escaped properly: '%s'" % expression) return self.__match_re
def getprog(self): pat = self.getpat() if not pat: self.report_error(pat, "Empty regular expression") return None pat = self.getcookedpat() flags = 0 if not self.iscase(): flags = flags | re.IGNORECASE try: prog = re.compile(pat, flags) except re.error as what: try: msg, col = what except: msg = str(what) col = -1 self.report_error(pat, msg, col) return None return prog
def remove_emoji(desstr,restr=''): try: highpoints = re.compile(u'([\U00002600-\U000027BF])|([\U0001f300-\U0001f64F])|([\U0001f680-\U0001f6FF])') print 'remove emoji now.' except re.error: print 'failed to remove emoji.' highpoints = re.compile(u'([\u2600-\u27BF])|([\uD83C][\uDF00-\uDFFF])|([\uD83D][\uDC00-\uDE4F])|([\uD83D][\uDE80-\uDEFF])') # mytext = u'<some string containing 4-byte chars>' desstr = highpoints.sub(u'\u25FD', desstr) return desstr
def remove_define_emoji(desstr,restr=''): try: co = re.compile(u'😌') print 'remove define emoji only.' except Exception, e: print 'error ====' print e return co.sub(restr,desstr)
def __init__(self, message, parent=None): """ Args: message (str): Error message detailing validation failure. parent (str): Adds the parent as the closest reference point for the error. Use :meth:`add_parent` to add more. """ super(ValidationError, self).__init__(message) self.message = message self._parents = [] if parent: self._parents.append(parent)
def __str__(self): """ Returns: str: A descriptive message of the validation error that may also include the path to the validator that failed. """ if self._parents: return '{}: {}'.format('.'.join(self._parents[::-1]), self.message) else: return self.message
def __repr__(self): # Not a perfect repr, but includes the error location information. return 'ValidationError(%r)' % six.text_type(self)
def __init__(self, min_length=None, max_length=None, pattern=None): if min_length is not None: assert isinstance(min_length, numbers.Integral), \ 'min_length must be an integral number' assert min_length >= 0, 'min_length must be >= 0' if max_length is not None: assert isinstance(max_length, numbers.Integral), \ 'max_length must be an integral number' assert max_length > 0, 'max_length must be > 0' if min_length and max_length: assert max_length >= min_length, 'max_length must be >= min_length' if pattern is not None: assert isinstance(pattern, six.string_types), \ 'pattern must be a string' self.min_length = min_length self.max_length = max_length self.pattern = pattern self.pattern_re = None if pattern: try: self.pattern_re = re.compile(r"\A(?:" + pattern + r")\Z") except re.error as e: raise AssertionError('Regex {!r} failed: {}'.format( pattern, e.args[0]))
def try_compile(self): """Compile this :class:`Regex` as a Python regular expression. .. warning:: Python regular expressions use a different syntax and different set of flags than MongoDB, which uses `PCRE`_. A regular expression retrieved from the server may not compile in Python, or may match a different set of strings in Python than when used in a MongoDB query. :meth:`try_compile()` may raise :exc:`re.error`. .. _PCRE: http://www.pcre.org/ """ return re.compile(self.pattern, self.flags)