我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.TemporaryFile()。
def speak(text, lang='es'): """Text to speech. For funp.""" try: from googletrans import Translator from gtts import gTTS from pygame import mixer from tempfile import TemporaryFile translator = Translator() tts = gTTS(text=translator.translate(text, dest=lang).text, lang=lang) mixer.init() sf = TemporaryFile() tts.write_to_fp(sf) sf.seek(0) mixer.music.load(sf) mixer.music.play() except Exception: raise
def test_exports(self): # There are no surprising symbols in the tempfile module dict = tempfile.__dict__ expected = { "NamedTemporaryFile" : 1, "TemporaryFile" : 1, "mkstemp" : 1, "mkdtemp" : 1, "mktemp" : 1, "TMP_MAX" : 1, "gettempprefix" : 1, "gettempdir" : 1, "tempdir" : 1, "template" : 1, "SpooledTemporaryFile" : 1, "TemporaryDirectory" : 1, } unexp = [] for key in dict: if key[0] != '_' and key not in expected: unexp.append(key) self.assertTrue(len(unexp) == 0, "unexpected keys: %s" % unexp)
def _body(self): try: read_func = self.environ['wsgi.input'].read except KeyError: self.environ['wsgi.input'] = BytesIO() return self.environ['wsgi.input'] body_iter = self._iter_chunked if self.chunked else self._iter_body body, body_size, is_temp_file = BytesIO(), 0, False for part in body_iter(read_func, self.MEMFILE_MAX): body.write(part) body_size += len(part) if not is_temp_file and body_size > self.MEMFILE_MAX: body, tmp = TemporaryFile(mode='w+b'), body body.write(tmp.getvalue()) del tmp is_temp_file = True self.environ['wsgi.input'] = body body.seek(0) return body
def run_coala_with_specific_file(working_dir, file): """Run coala in a specified directory.""" command = ["coala", "--json", "--find-config", "--files", file] stdout_file = tempfile.TemporaryFile() kwargs = {"stdout": stdout_file, "cwd": working_dir} process = subprocess.Popen(command, **kwargs) retval = process.wait() output_str = None if retval == 1: stdout_file.seek(0) output_str = stdout_file.read().decode("utf-8", "ignore") if output_str: log("Output =", output_str) else: log("No results for the file") elif retval == 0: log("No issues found") else: log("Exited with:", retval) stdout_file.close() return output_str
def bytes2zip(bytes): """ RETURN COMPRESSED BYTES """ if hasattr(bytes, "read"): buff = TemporaryFile() archive = gzip.GzipFile(fileobj=buff, mode='w') for b in bytes: archive.write(b) archive.close() buff.seek(0) from pyLibrary.env.big_data import FileString, safe_size return FileString(buff) buff = BytesIO() archive = gzip.GzipFile(fileobj=buff, mode='w') archive.write(bytes) archive.close() return buff.getvalue()
def __init__(self, stream, length, _shared=None): """ :param stream: THE STREAM WE WILL GET THE BYTES FROM :param length: THE MAX NUMBER OF BYTES WE ARE EXPECTING :param _shared: FOR INTERNAL USE TO SHARE THE BUFFER :return: """ self.position = 0 file_ = TemporaryFile() if not _shared: self.shared = Data( length=length, locker=Lock(), stream=stream, done_read=0, file=file_, buffer=mmap(file_.fileno(), length) ) else: self.shared = _shared self.shared.ref_count += 1
def assert_exception_writes_error_message(self, exception, message): parser = cli.get_htsget_parser() args = parser.parse_args(["https://some.url"]) saved_stderr = sys.stderr try: with tempfile.TemporaryFile("w+") as tmp_stderr: sys.stderr = tmp_stderr with mock.patch("htsget.get") as mocked_get, \ mock.patch("sys.exit") as mocked_exit, \ mock.patch("logging.basicConfig"): mocked_get.side_effect = exception cli.run(args) tmp_stderr.seek(0) stderr = tmp_stderr.read().strip() mocked_exit.assert_called_once_with(1) finally: sys.stderr = saved_stderr self.assertTrue(stderr.endswith(message))
def capture(self, data, term_instance=None): """ Stores *data* as a temporary file and returns that file's object. *term_instance* can be used by overrides of this function to make adjustments to the terminal emulator after the *data* is captured e.g. to make room for an image. """ # Remove the extra \r's that the terminal adds: data = data.replace(b'\r\n', b'\n') logging.debug("capture() len(data): %s" % len(data)) # Write the data to disk in a temporary location self.file_obj = tempfile.TemporaryFile() self.file_obj.write(data) self.file_obj.flush() # Leave it open return self.file_obj
def setUp(self): file_path = resource_filename(Requirement.parse('search_google'), 'search_google/config.json') with open(file_path, 'r') as in_file: defaults = json.load(in_file) buildargs = { 'serviceName': 'customsearch', 'version': 'v1', 'developerKey': defaults['build_developerKey'] } cseargs = { 'q': 'google', 'num': 1, 'fileType': 'png', 'cx': defaults['cx'] } self.results = search_google.api.results(buildargs, cseargs) tempfile = TemporaryFile() self.tempfile = str(tempfile.name) tempfile.close() self.tempdir = str(TemporaryDirectory().name)
def __call__(self, test_case): module = self.constructor(*self.constructor_args) input = self._get_input() if self.reference_fn is not None: out = test_case._forward(module, input) if isinstance(out, Variable): out = out.data ref_input = self._unpack_input(deepcopy(input)) expected_out = self.reference_fn(ref_input, test_case._get_parameters(module)[0]) test_case.assertEqual(out, expected_out) # TODO: do this with in-memory files as soon as torch.save will support it with TemporaryFile() as f: test_case._forward(module, input) torch.save(module, f) f.seek(0) module_copy = torch.load(f) test_case.assertEqual(test_case._forward(module, input), test_case._forward(module_copy, input)) self._do_test(test_case, module, input)
def __init__(self, targetfd, tmpfile=None, now=True, patchsys=False): """ save targetfd descriptor, and open a new temporary file there. If no tmpfile is specified a tempfile.Tempfile() will be opened in text mode. """ self.targetfd = targetfd if tmpfile is None and targetfd != 0: f = tempfile.TemporaryFile('wb+') tmpfile = dupfile(f, encoding="UTF-8") f.close() self.tmpfile = tmpfile self._savefd = os.dup(self.targetfd) if patchsys: self._oldsys = getattr(sys, patchsysdict[targetfd]) if now: self.start()
def writeorg(self, data): """ write a string to the original file descriptor """ tempfp = tempfile.TemporaryFile() try: os.dup2(self._savefd, tempfp.fileno()) tempfp.write(data) finally: tempfp.close()
def __init__(self, msg, buffer = None, scheduler = None): """Produce this message. @param msg: The message I am to produce. @type msg: L{IMessage} @param buffer: A buffer to hold the message in. If None, I will use a L{tempfile.TemporaryFile}. @type buffer: file-like """ self.msg = msg if buffer is None: buffer = tempfile.TemporaryFile() self.buffer = buffer if scheduler is None: scheduler = iterateInReactor self.scheduler = scheduler self.write = self.buffer.write
def pytest_configure(config): import py if config.option.pastebin == "all": tr = config.pluginmanager.getplugin('terminalreporter') # if no terminal reporter plugin is present, nothing we can do here; # this can happen when this function executes in a slave node # when using pytest-xdist, for example if tr is not None: # pastebin file will be utf-8 encoded binary file config._pastebinfile = tempfile.TemporaryFile('w+b') oldwrite = tr._tw.write def tee_write(s, **kwargs): oldwrite(s, **kwargs) if py.builtin._istext(s): s = s.encode('utf-8') config._pastebinfile.write(s) tr._tw.write = tee_write
def __init__(self, targetfd, tmpfile=None): self.targetfd = targetfd try: self.targetfd_save = os.dup(self.targetfd) except OSError: self.start = lambda: None self.done = lambda: None else: if targetfd == 0: assert not tmpfile, "cannot set tmpfile with stdin" tmpfile = open(os.devnull, "r") self.syscapture = SysCapture(targetfd) else: if tmpfile is None: f = TemporaryFile() with f: tmpfile = safe_text_dupfile(f, mode="wb+") if targetfd in patchsysdict: self.syscapture = SysCapture(targetfd, tmpfile) else: self.syscapture = NoCapture() self.tmpfile = tmpfile self.tmpfile_fd = tmpfile.fileno()
def save_db_objects(db_engine, db_objects): """Saves a collection of SQLAlchemy model objects to the database using a COPY command Args: db_engine (sqlalchemy.engine) db_objects (list) SQLAlchemy model objects, corresponding to a valid table """ with tempfile.TemporaryFile(mode='w+') as f: writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL) for db_object in db_objects: writer.writerow([ getattr(db_object, col.name) for col in db_object.__table__.columns ]) f.seek(0) postgres_copy.copy_from(f, type(db_objects[0]), db_engine, format='csv')
def get_qr_image(session: CashdeskSession) -> TemporaryFile: # TODO: check qr code qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_H, box_size=10, border=4, ) tz = timezone.get_current_timezone() data = '{end}\tEinnahme\t{total}\tKassensession\t#{pk}\t{supervisor}\t{user}'.format( end=session.end.astimezone(tz).strftime('%d.%m.%Y\t%H:%M:%S'), total='{0:,.2f}'.format(session.get_cash_transaction_total()).translate(str.maketrans(',.', '.,')), pk=session.pk, supervisor=session.backoffice_user_after.get_full_name(), user=session.user.get_full_name(), ) qr.add_data(data) qr.make() f = TemporaryFile() img = qr.make_image() img.save(f) return f
def test_exec_command_stdout(): # Regression test for gh-2999 and gh-2915. # There are several packages (nose, scipy.weave.inline, Sage inline # Fortran) that replace stdout, in which case it doesn't have a fileno # method. This is tested here, with a do-nothing command that fails if the # presence of fileno() is assumed in exec_command. # The code has a special case for posix systems, so if we are on posix test # both that the special case works and that the generic code works. # Test posix version: with redirect_stdout(StringIO()): with redirect_stderr(TemporaryFile()): exec_command.exec_command("cd '.'") if os.name == 'posix': # Test general (non-posix) version: with emulate_nonposix(): with redirect_stdout(StringIO()): with redirect_stderr(TemporaryFile()): exec_command.exec_command("cd '.'")
def test_dump_as_file(self): with open(util.get_data_filename('nginx.conf')) as handle: parsed = load(handle) parsed[-1][-1].append(UnspacedList([['server'], [['listen', ' ', '443', ' ', 'ssl'], ['server_name', ' ', 'localhost'], ['ssl_certificate', ' ', 'cert.pem'], ['ssl_certificate_key', ' ', 'cert.key'], ['ssl_session_cache', ' ', 'shared:SSL:1m'], ['ssl_session_timeout', ' ', '5m'], ['ssl_ciphers', ' ', 'HIGH:!aNULL:!MD5'], [['location', ' ', '/'], [['root', ' ', 'html'], ['index', ' ', 'index.html', ' ', 'index.htm']]]]])) with tempfile.TemporaryFile(mode='w+t') as f: dump(parsed, f) f.seek(0) parsed_new = load(f) self.assertEqual(parsed, parsed_new)
def test_comments(self): with open(util.get_data_filename('minimalistic_comments.conf')) as handle: parsed = load(handle) with tempfile.TemporaryFile(mode='w+t') as f: dump(parsed, f) f.seek(0) parsed_new = load(f) self.assertEqual(parsed, parsed_new) self.assertEqual(parsed_new, [ ['#', " Use bar.conf when it's a full moon!"], ['include', 'foo.conf'], ['#', ' Kilroy was here'], ['check_status'], [['server'], [['#', ''], ['#', " Don't forget to open up your firewall!"], ['#', ''], ['listen', '1234'], ['#', ' listen 80;']]], ])
def setUp(self): file_path = resource_filename(Requirement.parse('google_streetview'), 'google_streetview/config.json') with open(file_path, 'r') as in_file: defaults = json.load(in_file) params = [{ 'size': '600x300', # max 640x640 pixels 'location': '46.414382,10.013988', 'heading': '151.78', 'pitch': '-0.76', 'key': defaults['key'] }] self.results = google_streetview.api.results(params) tempfile = TemporaryFile() self.tempfile = str(tempfile.name) tempfile.close() self.tempdir = str(TemporaryDirectory().name)
def remove_line(fname, line): '''Remove line from file by creating a temporary file containing all lines from original file except those matching the given line, then copying the temporary file back into the original file, overwriting its contents. ''' with lockfile.FileLock(fname): tmp = tempfile.TemporaryFile() fp = open(fname, 'rw+') # write all lines from orig file, except if matches given line for l in fp: if l.strip() != line: tmp.write(l) # reset file pointers so entire files are copied fp.seek(0) tmp.seek(0) # copy tmp into fp, then truncate to remove trailing line(s) shutil.copyfileobj(tmp, fp) fp.truncate() fp.close() tmp.close()
def get_read_stream(self, dag_id, task_id, execution_date): key_name = self.get_key_name(dag_id, task_id, execution_date) key = self.bucket.get_key(key_name) if key is not None: import tempfile temp_file_stream = tempfile.TemporaryFile(mode='w+b') key.get_file(temp_file_stream) # Stream has been read in and is now at the end # So reset it to the start temp_file_stream.seek(0) return temp_file_stream message = \ 'S3 key named {key_name} in bucket {bucket_name} does not exist.'.format(key_name=key_name, bucket_name=self.bucket_name) raise StorageDriverError(message)
def add_here_document(self, interp, name, content, io_number=None): if io_number is None: io_number = 0 if name==pyshlex.unquote_wordtree(name): content = interp.expand_here_document(('TOKEN', content)) # Write document content in a temporary file tmp = tempfile.TemporaryFile() try: tmp.write(content) tmp.flush() tmp.seek(0) self._add_descriptor(io_number, FileWrapper('r', tmp)) except: tmp.close() raise
def rman(self, finalscript): self._setenv() debug("RMAN execution starts") BackupLogger.close() starttime = datetime.now() with TemporaryFile() as f: p = Popen([os.path.join(self.oraclehome, 'bin', 'rman'), "log", BackupLogger.logfile, "append"], stdout=f, stderr=f, stdin=PIPE) # Send the script to RMAN p.communicate(input=finalscript) endtime = datetime.now() BackupLogger.init() debug("RMAN execution time %s" % (endtime-starttime)) # If RMAN exists with any code except 0, then there was some error if p.returncode != 0: error("RMAN execution failed with code %d" % p.returncode) raise Exception('rman', "RMAN exited with code %d" % p.returncode) else: debug("RMAN execution successful")
def sqlplus(self, finalscript, silent=False): self._setenv() with TemporaryFile() as f: args = [os.path.join(self.oraclehome, 'bin', 'sqlplus')] if silent: args.append('-S') args.append('/nolog') debug("SQL*Plus execution starts") BackupLogger.close() p = Popen(args, stdout=f, stderr=f, stdin=PIPE) p.communicate(input=finalscript) BackupLogger.init() if p.returncode != 0: error("SQL*Plus exited with code %d" % p.returncode) raise Exception('sqlplus', "sqlplus exited with code %d" % p.returncode) else: debug("SQL*Plus execution successful") if silent: f.seek(0,0) return f.read()
def sqlldr(self, login, finalscript): self._setenv() debug("SQLLDR execution starts") f1 = mkstemp(suffix=".ctl") ftmp = os.fdopen(f1[0], "w") ftmp.write(finalscript) ftmp.close() f2 = mkstemp(suffix=".log") os.close(f2[0]) with TemporaryFile() as f: p = Popen([os.path.join(self.oraclehome, 'bin', 'sqlldr'), login, "control=%s" % f1[1], "log=%s" % f2[1], "errors=0", "silent=all"], stdout=f, stderr=None, stdin=None) p.communicate() if p.returncode != 0: error("SQLLDR exited with code %d" % p.returncode) raise Exception('sqlldr', "sqlldr exited with code %d" % p.returncode) else: debug("SQLLDR execution successful") os.unlink(f1[1]) os.unlink(f2[1])
def setUp(self): tree = ArrayTree(10000, 10) # max value of 10000, each block has 10 numbers for i in range(5000): tree[i] = i # Insert extra copies to test frequency for i in range(3000): tree[i] = i tree.set_range(5000, 9001, 100) tree.root.build_summary() d = {'test': tree} f = tempfile.TemporaryFile() FileArrayTreeDict.dict_to_file( d, f ) f.seek(0) self.filearraytreedict = FileArrayTreeDict(f) self.filearraytree = self.filearraytreedict['test']
def test_has_no_name(self): # TemporaryFile creates files with no names (on this system) dir = tempfile.mkdtemp() f = tempfile.TemporaryFile(dir=dir) f.write(b'blat') # Sneaky: because this file has no name, it should not prevent # us from removing the directory it was created in. try: os.rmdir(dir) except: ei = sys.exc_info() # cleanup f.close() os.rmdir(dir) self.failOnException("rmdir", ei)
def download_glove(glove): if os.path.exists(glove): return print('Downloading glove...') with tempfile.TemporaryFile() as tmp: with urllib.request.urlopen('http://nlp.stanford.edu/data/glove.42B.300d.zip') as res: shutil.copyfileobj(res, tmp) with zipfile.ZipFile(tmp, 'r') as glove_zip: glove_zip.extract('glove.42B.300d.txt', path=os.path.dirname(glove)) print('Done')
def test_FileExistsIfTrue(self): """ test the method that checks if the file exists """ with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryFile(dir=tmpdir) as fp: actual = self.helper.FileExists(fp.name) self.assertTrue(actual)
def make_file(self, binary=None): """Overridable: return a readable & writable file. The file will be used as follows: - data is written to it - seek(0) - data is read from it The 'binary' argument is unused -- the file is always opened in binary mode. This version opens a temporary file for reading and writing, and immediately deletes (unlinks) it. The trick (on Unix!) is that the file can still be used, but it can't be opened by another process, and it will automatically be deleted when it is closed or when the current process terminates. If you want a more permanent file, you derive a class which overrides this method. If you want a visible temporary file that is nevertheless automatically deleted when the script terminates, try defining a __del__ method in a derived class which unlinks the temporary files you have created. """ import tempfile return tempfile.TemporaryFile("w+b") # Backwards Compatibility Classes # ===============================
def __radd__(self, other): new_file = TemporaryFile() new_file.write(other) self.file.seek(0) for l in self.file: new_file.write(l) new_file.seek(0) return FileString(new_file)
def safe_size(source): """ READ THE source UP TO SOME LIMIT, THEN COPY TO A FILE IF TOO BIG RETURN A str() OR A FileString() """ if source is None: return None total_bytes = 0 bytes = [] b = source.read(MIN_READ_SIZE) while b: total_bytes += len(b) bytes.append(b) if total_bytes > MAX_STRING_SIZE: try: data = FileString(TemporaryFile()) for bb in bytes: data.write(bb) del bytes del bb b = source.read(MIN_READ_SIZE) while b: total_bytes += len(b) data.write(b) b = source.read(MIN_READ_SIZE) data.seek(0) Log.note("Using file of size {{length}} instead of str()", length= total_bytes) return data except Exception as e: Log.error("Could not write file > {{num}} bytes", num= total_bytes, cause=e) b = source.read(MIN_READ_SIZE) data = b"".join(bytes) del bytes return data
def __init__(self, archiver, url): self.archiver = archiver self.tmp = TemporaryFile() self.url = url
def test_write_read_string(self): with tempfile.TemporaryFile() as f: value = u'test' write_string(f, value) f.seek(0) self.assertEqual(read_string(f), value)
def test_write_read_longstring(self): with tempfile.TemporaryFile() as f: value = u'test' write_longstring(f, value) f.seek(0) self.assertEqual(read_longstring(f), value)
def test_write_read_stringmap(self): with tempfile.TemporaryFile() as f: value = {'key': 'value'} write_stringmap(f, value) f.seek(0) self.assertEqual(read_stringmap(f), value)
def test_write_read_inet(self): with tempfile.TemporaryFile() as f: value = ('192.168.1.1', 9042) write_inet(f, value) f.seek(0) self.assertEqual(read_inet(f), value) with tempfile.TemporaryFile() as f: value = ('2001:db8:0:f101::1', 9042) write_inet(f, value) f.seek(0) self.assertEqual(read_inet(f), value)
def default_stream_factory(total_content_length, filename, content_type, content_length=None): """The stream factory that is used per default.""" if total_content_length > 1024 * 500: return TemporaryFile('wb+') return BytesIO()
def test_cli_error(self): cmd = [ sys.executable, "htsget_dev.py", TestRequestHandler.ticket_url + "XXX", "-O", self.output_file] with tempfile.TemporaryFile("wb+") as stderr, \ tempfile.TemporaryFile("wb+") as stdout: ret = subprocess.call(cmd, stderr=stderr, stdout=stdout) self.assertEqual(ret, 1) stderr.seek(0) stdout.seek(0) self.assertGreater(len(stderr.read()), 0) self.assertEqual(len(stdout.read()), 0)
def test_bad_scheme(self): with tempfile.TemporaryFile("w+") as temp_file: for bad_scheme in ["htt://as", "file:///home", "ftp://x.y/sdf"]: ticket = get_ticket(urls=[ get_http_ticket("http://a.b"), get_http_ticket("htp")]) dm = StoringUrlsDownloadManager(ticket, temp_file) self.assertRaises(ValueError, dm.run)
def test_basic_http_parsing(self): headers = {"a": "b", "b": "c"} ticket = get_ticket(urls=[get_http_ticket(EXAMPLE_URL, headers)]) with tempfile.TemporaryFile("w+") as temp_file: dm = StoringUrlsDownloadManager(ticket, temp_file) dm.run() self.assertEqual(dm.stored_urls[0], (EXAMPLE_URL, headers))
def test_basic_data_uri_parsing(self): data_uri = "data:application/vnd.ga4gh.bam;base64,SGVsbG8sIFdvcmxkIQ==" ticket = get_ticket(urls=[get_data_uri_ticket(data_uri)]) with tempfile.TemporaryFile("w+") as temp_file: dm = StoringUrlsDownloadManager(ticket, temp_file) dm.run() self.assertEqual(dm.stored_urls[0], urlparse(data_uri))
def test_num_retries(self): ticket = get_ticket(urls=[get_http_ticket(EXAMPLE_URL)]) with tempfile.TemporaryFile("w+") as temp_file: for num_retries in range(10): with mock.patch("time.sleep") as mock_sleep, \ mock.patch("logging.warning") as mock_warning: dm = RetryCountDownloadManager( ticket, temp_file, max_retries=num_retries) self.assertEqual(dm.max_retries, num_retries) self.assertRaises(exceptions.RetryableError, dm.run) self.assertEqual(dm.attempt_counts[EXAMPLE_URL], num_retries + 1) self.assertEqual(mock_sleep.call_count, num_retries) self.assertEqual(mock_warning.call_count, num_retries)