我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用__builtin__.open()。
def patch_open_read(files): """ Patch open() command and mock read() results :type files: dict """ files = {os.path.abspath(path): content for path, content in files.items()} def mock_open_wrapper(path, *args, **kwargs): __tracebackhide__ = True # pylint: disable=unused-variable assert path in files, 'try to open a non-mocked path\n desired={desired!r}\n mocked={mocked!r}'.format( desired=path, mocked=files.keys()) open_mock = mock.mock_open(read_data=files[path]) return open_mock(path, *args, **kwargs) return mock.patch(__get_open_ref(), mock_open_wrapper)
def __init__(self, stream, errors='strict'): """ Creates a StreamWriter instance. stream must be a file-like object open for writing (binary) data. The StreamWriter may use different error handling schemes by providing the errors keyword argument. These parameters are predefined: 'strict' - raise a ValueError (or a subclass) 'ignore' - ignore the character and continue with the next 'replace'- replace with a suitable replacement character 'xmlcharrefreplace' - Replace with the appropriate XML character reference. 'backslashreplace' - Replace with backslashed escape sequences (only for encoding). The set of allowed parameter values can be extended via register_error. """ self.stream = stream self.errors = errors
def __init__(self, stream, errors='strict'): """ Creates a StreamReader instance. stream must be a file-like object open for reading (binary) data. The StreamReader may use different error handling schemes by providing the errors keyword argument. These parameters are predefined: 'strict' - raise a ValueError (or a subclass) 'ignore' - ignore the character and continue with the next 'replace'- replace with a suitable replacement character; The set of allowed parameter values can be extended via register_error. """ self.stream = stream self.errors = errors self.bytebuffer = "" # For str->str decoding this will stay a str # For str->unicode decoding the first read will promote it to unicode self.charbuffer = "" self.linebuffer = None
def __get_open_ref(): """ :rtype str """ # pylint: disable=import-error,redefined-builtin,unused-import,unused-variable global __OPEN_REF if __OPEN_REF: return __OPEN_REF try: from builtins import open __OPEN_REF = 'builtins.open' except ImportError: from __builtin__ import open __OPEN_REF = '__builtin__.open' return __OPEN_REF
def origin(self): """ Gets a 2-tuple (file, line) describing the source file where this constituent is defined or None if the location cannot be determined. """ suitepy = self.suite.suite_py() if exists(suitepy): import tokenize with open(suitepy) as fp: candidate = None for t in tokenize.generate_tokens(fp.readline): _, tval, (srow, _), _, _ = t if candidate is None: if tval == '"' + self.name + '"' or tval == "'" + self.name + "'": candidate = srow else: if tval == ':': return (suitepy, srow) else: candidate = None
def needsBuild(self, newestInput): logv('Checking whether to build {} with GNU Make'.format(self.subject.name)) cmdline, cwd, env = self._build_run_args() cmdline += ['-q'] if _opts.verbose: # default out/err stream ret_code = run(cmdline, cwd=cwd, env=env, nonZeroIsFatal=False) else: with open(os.devnull, 'w') as fnull: # suppress out/err (redirect to null device) ret_code = run(cmdline, cwd=cwd, env=env, nonZeroIsFatal=False, out=fnull, err=fnull) if ret_code != 0: return (True, "rebuild needed by GNU Make") return (False, "up to date according to GNU Make")
def _load_env_file(e, env=None): if exists(e): with open(e) as f: lineNum = 0 for line in f: lineNum = lineNum + 1 line = line.strip() if len(line) != 0 and line[0] != '#': if not '=' in line: abort(e + ':' + str(lineNum) + ': line does not match pattern "key=value"') key, value = line.split('=', 1) key = key.strip() value = expandvars_in_property(value.strip()) if env is None: os.environ[key] = value logv('Setting environment variable %s=%s from %s' % (key, value, e)) else: env[key] = value logv('Read variable %s=%s from %s' % (key, value, e))
def _scheck_imports(importing_suite, imported_suite, suite_import, bookmark_imports, ignore_uncommitted): importedVersion = imported_suite.version() if imported_suite.isDirty() and not ignore_uncommitted: msg = 'uncommitted changes in {}, please commit them and re-run scheckimports'.format(imported_suite.name) if isinstance(imported_suite, SourceSuite) and imported_suite.vc and imported_suite.vc.kind == 'hg': msg = '{}\nIf the only uncommitted change is an updated imported suite version, then you can run:\n\nhg -R {} commit -m "updated imported suite version"'.format(msg, imported_suite.vc_dir) abort(msg) if importedVersion != suite_import.version and suite_import.version is not None: print 'imported version of {} in {} ({}) does not match parent ({})'.format(imported_suite.name, importing_suite.name, suite_import.version, importedVersion) if exists(importing_suite.suite_py()) and ask_yes_no('Update ' + importing_suite.suite_py()): with open(importing_suite.suite_py()) as fp: contents = fp.read() if contents.count(str(suite_import.version)) == 1: newContents = contents.replace(suite_import.version, str(importedVersion)) if not update_file(importing_suite.suite_py(), newContents, showDiff=True): abort("Updating {} failed: update didn't change anything".format(importing_suite.suite_py())) suite_import.version = importedVersion if bookmark_imports: _sbookmark_visitor(importing_suite, suite_import) else: print 'Could not update as the substring {} does not appear exactly once in {}'.format(suite_import.version, importing_suite.suite_py())
def _copy_eclipse_settings(p, files=None): eclipseJavaCompliance = _convert_to_eclipse_supported_compliance(p.javaCompliance) processors = p.annotation_processors() settingsDir = join(p.dir, ".settings") ensure_dir_exists(settingsDir) for name, sources in p.eclipse_settings_sources().iteritems(): out = StringIO.StringIO() print >> out, '# GENERATED -- DO NOT EDIT' for source in sources: print >> out, '# Source:', source with open(source) as f: print >> out, f.read() if eclipseJavaCompliance: content = out.getvalue().replace('${javaCompliance}', str(eclipseJavaCompliance)) else: content = out.getvalue() if processors: content = content.replace('org.eclipse.jdt.core.compiler.processAnnotations=disabled', 'org.eclipse.jdt.core.compiler.processAnnotations=enabled') update_file(join(settingsDir, name), content) if files: files.append(join(settingsDir, name))
def __init__(self, name, mode): mode = { "r": os.O_RDONLY, "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC, }[mode] if hasattr(os, "O_BINARY"): mode |= os.O_BINARY self.fd = os.open(name, mode, 0o666)
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs): """Open bzip2 compressed tar archive name for reading or writing. Appending is not allowed. """ if len(mode) > 1 or mode not in "rw": raise ValueError("mode must be 'r' or 'w'.") try: import bz2 except ImportError: raise CompressionError("bz2 module is not available") if fileobj is not None: fileobj = _BZ2Proxy(fileobj, mode) else: fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel) try: t = cls.taropen(name, mode, fileobj, **kwargs) except (IOError, EOFError): fileobj.close() raise ReadError("not a bzip2 file") t._extfileobj = False return t # All *open() methods are registered here.
def _check(self, mode=None): """Check if TarFile is still open, and if the operation's mode corresponds to TarFile's mode. """ if self.closed: raise IOError("%s is closed" % self.__class__.__name__) if mode is not None and self.mode not in mode: raise IOError("bad operation for mode %r" % self.mode)
def is_tarfile(name): """Return True if name points to a tar archive that we are able to handle, else return False. """ try: t = open(name) t.close() return True except TarError: return False
def open(filename, mode="rb", compresslevel=9): """Shorthand for GzipFile(filename, mode, compresslevel). The filename argument is required; mode defaults to 'rb' and compresslevel defaults to 9. """ return GzipFile(filename, mode, compresslevel)
def _test(): # Act like gzip; with -d, act like gunzip. # The input file is not deleted, however, nor are any other gzip # options or features supported. args = sys.argv[1:] decompress = args and args[0] == "-d" if decompress: args = args[1:] if not args: args = ["-"] for arg in args: if decompress: if arg == "-": f = GzipFile(filename="", mode="rb", fileobj=sys.stdin) g = sys.stdout else: if arg[-3:] != ".gz": print "filename doesn't end in .gz:", repr(arg) continue f = open(arg, "rb") g = __builtin__.open(arg[:-3], "wb") else: if arg == "-": f = sys.stdin g = GzipFile(filename="", mode="wb", fileobj=sys.stdout) else: f = __builtin__.open(arg, "rb") g = open(arg + ".gz", "wb") while True: chunk = f.read(1024) if not chunk: break g.write(chunk) if g is not sys.stdout: g.close() if f is not sys.stdin: f.close()
def open(file, flag=None, mode=0666): """Open the database file, filename, and return corresponding object. The flag argument, used to control how the database is opened in the other DBM implementations, is ignored in the dumbdbm module; the database is always opened for update, and will be created if it does not exist. The optional mode argument is the UNIX mode of the file, used only when the database has to be created. It defaults to octal code 0666 (and will be modified by the prevailing umask). """ # flag argument is currently ignored # Modify mode depending on the umask try: um = _os.umask(0) _os.umask(um) except AttributeError: pass else: # Turn off any bits that are set in the umask mode = mode & (~um) return _Database(file, mode)
def __init__(self, f): self._i_opened_the_file = None if isinstance(f, basestring): f = __builtin__.open(f, 'rb') self._i_opened_the_file = f # else, assume it is an open file object already try: self.initfp(f) except: if self._i_opened_the_file: f.close() raise
def __init__(self, f): self._i_opened_the_file = None if isinstance(f, basestring): f = __builtin__.open(f, 'wb') self._i_opened_the_file = f try: self.initfp(f) except: if self._i_opened_the_file: f.close() raise
def __init__(self, f): if type(f) == type(''): import __builtin__ f = __builtin__.open(f, 'rb') self.initfp(f)
def __init__(self, f): if type(f) == type(''): import __builtin__ f = __builtin__.open(f, 'wb') self.initfp(f)
def open(f, mode=None): if mode is None: if hasattr(f, 'mode'): mode = f.mode else: mode = 'rb' if mode in ('r', 'rb'): return Au_read(f) elif mode in ('w', 'wb'): return Au_write(f) else: raise Error, "mode must be 'r', 'rb', 'w', or 'wb'"
def open(self, name, mode='r', bufsize=-1): import __builtin__ return self.fileopen(__builtin__.open(name, mode, bufsize))
def open(name, mode='r', bufsize=-1): """Public routine to open a file as a posixfile object.""" return _posixfile_().open(name, mode, bufsize)
def __init__(self, f): if type(f) == type(''): f = __builtin__.open(f, 'rb') # else, assume it is an open file object already self.initfp(f) # # User visible methods. #
def __init__(self, f): if type(f) == type(''): filename = f f = __builtin__.open(f, 'wb') else: # else, assume it is an open file object already filename = '???' self.initfp(f) if filename[-5:] == '.aiff': self._aifc = 0 else: self._aifc = 1