我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.SpooledTemporaryFile()。
def encrypt_file(file, keys=secretKeys()): '''Encrypt file data with the same method as the Send browser/js client''' key = keys.encryptKey iv = keys.encryptIV encData = tempfile.SpooledTemporaryFile(max_size=SPOOL_SIZE, mode='w+b') cipher = Cryptodome.Cipher.AES.new(key, Cryptodome.Cipher.AES.MODE_GCM, iv) pbar = progbar(fileSize(file)) for chunk in iter(lambda: file.read(CHUNK_SIZE), b''): encData.write(cipher.encrypt(chunk)) pbar.update(len(chunk)) pbar.close() encData.write(cipher.digest()) file.close() encData.seek(0) return encData
def test_exports(self): # There are no surprising symbols in the tempfile module dict = tempfile.__dict__ expected = { "NamedTemporaryFile" : 1, "TemporaryFile" : 1, "mkstemp" : 1, "mkdtemp" : 1, "mktemp" : 1, "TMP_MAX" : 1, "gettempprefix" : 1, "gettempdir" : 1, "tempdir" : 1, "template" : 1, "SpooledTemporaryFile" : 1, "TemporaryDirectory" : 1, } unexp = [] for key in dict: if key[0] != '_' and key not in expected: unexp.append(key) self.assertTrue(len(unexp) == 0, "unexpected keys: %s" % unexp)
def test_exports(self): # There are no surprising symbols in the tempfile module dict = tempfile.__dict__ expected = { "NamedTemporaryFile" : 1, "TemporaryFile" : 1, "mkstemp" : 1, "mkdtemp" : 1, "mktemp" : 1, "TMP_MAX" : 1, "gettempprefix" : 1, "gettempdir" : 1, "tempdir" : 1, "template" : 1, "SpooledTemporaryFile" : 1 } unexp = [] for key in dict: if key[0] != '_' and key not in expected: unexp.append(key) self.assertTrue(len(unexp) == 0, "unexpected keys: %s" % unexp)
def test_text_mode(self): # Creating a SpooledTemporaryFile with a text mode should produce # a file object reading and writing (Unicode) text strings. f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10) f.write("abc\n") f.seek(0) self.assertEqual(f.read(), "abc\n") f.write("def\n") f.seek(0) self.assertEqual(f.read(), "abc\ndef\n") f.write("xyzzy\n") f.seek(0) self.assertEqual(f.read(), "abc\ndef\nxyzzy\n") # Check that Ctrl+Z doesn't truncate the file f.write("foo\x1abar\n") f.seek(0) self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n")
def api_download(service, fileId, authorisation): '''Given a Send url, download and return the encrypted data and metadata''' data = tempfile.SpooledTemporaryFile(max_size=SPOOL_SIZE, mode='w+b') headers = {'Authorization' : 'send-v1 ' + unpadded_urlsafe_b64encode(authorisation)} url = service + 'api/download/' + fileId r = requests.get(url, headers=headers, stream=True) r.raise_for_status() content_length = int(r.headers['Content-length']) pbar = progbar(content_length) for chunk in r.iter_content(chunk_size=CHUNK_SIZE): data.write(chunk) pbar.update(len(chunk)) pbar.close() data.seek(0) return data
def test_properties(self): f = tempfile.SpooledTemporaryFile(max_size=10) f.write(b'x' * 10) self.assertFalse(f._rolled) self.assertEqual(f.mode, 'w+b') self.assertIsNone(f.name) with self.assertRaises(AttributeError): f.newlines with self.assertRaises(AttributeError): f.encoding f.write(b'x') self.assertTrue(f._rolled) self.assertEqual(f.mode, 'w+b') self.assertIsNotNone(f.name) with self.assertRaises(AttributeError): f.newlines with self.assertRaises(AttributeError): f.encoding
def initCase(switches, count): _failures.failedItems = [] _failures.failedParseOn = None _failures.failedTraceBack = None paths.SQLMAP_OUTPUT_PATH = tempfile.mkdtemp(prefix="%s%d-" % (MKSTEMP_PREFIX.TESTING, count)) paths.SQLMAP_DUMP_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "dump") paths.SQLMAP_FILES_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "files") logger.debug("using output directory '%s' for this test case" % paths.SQLMAP_OUTPUT_PATH) LOGGER_HANDLER.stream = sys.stdout = tempfile.SpooledTemporaryFile(max_size=0, mode="w+b", prefix="sqlmapstdout-") cmdLineOptions = cmdLineParser() if switches: for key, value in switches.items(): if key in cmdLineOptions.__dict__: cmdLineOptions.__dict__[key] = value initOptions(cmdLineOptions, True) init()
def is_available(cls): if (super(cls, cls).is_available() and diagnose.check_executable('text2wave') and diagnose.check_executable('festival')): logger = logging.getLogger(__name__) cmd = ['festival', '--pipe'] with tempfile.SpooledTemporaryFile() as out_f: with tempfile.SpooledTemporaryFile() as in_f: logger.debug('Executing %s', ' '.join([pipes.quote(arg) for arg in cmd])) subprocess.call(cmd, stdin=in_f, stdout=out_f, stderr=out_f) out_f.seek(0) output = out_f.read().strip() if output: logger.debug("Output was: '%s'", output) return ('No default voice found' not in output) return False
def say(self, phrase, *args): self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG) cmd = ['text2wave'] with tempfile.NamedTemporaryFile(suffix='.wav') as out_f: with tempfile.SpooledTemporaryFile() as in_f: in_f.write(phrase) in_f.seek(0) with tempfile.SpooledTemporaryFile() as err_f: self._logger.debug('Executing %s', ' '.join([pipes.quote(arg) for arg in cmd])) subprocess.call(cmd, stdin=in_f, stdout=out_f, stderr=err_f) err_f.seek(0) output = err_f.read() if output: self._logger.debug("Output was: '%s'", output) self.play(out_f.name)
def say(self, phrase, *args): self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG) cmd = ['flite'] if self.voice: cmd.extend(['-voice', self.voice]) cmd.extend(['-t', phrase]) with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: fname = f.name cmd.append(fname) with tempfile.SpooledTemporaryFile() as out_f: self._logger.debug('Executing %s', ' '.join([pipes.quote(arg) for arg in cmd])) subprocess.call(cmd, stdout=out_f, stderr=out_f) out_f.seek(0) output = out_f.read().strip() if output: self._logger.debug("Output was: '%s'", output) self.play(fname) os.remove(fname)
def transcribe(self, fp, mode=None): cmd = ['julius', '-quiet', '-nolog', '-input', 'stdin', '-dfa', self._vocabulary.dfa_file, '-v', self._vocabulary.dict_file, '-h', self._hmmdefs, '-hlist', self._tiedlist, '-forcedict'] cmd = [str(x) for x in cmd] self._logger.debug('Executing: %r', cmd) with tempfile.SpooledTemporaryFile() as out_f: with tempfile.SpooledTemporaryFile() as err_f: subprocess.call(cmd, stdin=fp, stdout=out_f, stderr=err_f) out_f.seek(0) results = [(int(i), text) for i, text in self._pattern.findall(out_f.read())] transcribed = [text for i, text in sorted(results, key=lambda x: x[0]) if text] if not transcribed: transcribed.append('') self._logger.info('Transcribed: %r', transcribed) return transcribed
def test_properties(self): f = tempfile.SpooledTemporaryFile(max_size=10) f.write(b'x' * 10) self.assertFalse(f._rolled) self.assertEqual(f.mode, 'w+b') self.assertIsNone(f.name) with self.assertRaises(AttributeError): f.newlines with self.assertRaises(AttributeError): f.encoding f.write(b'x') self.assertTrue(f._rolled) self.assertEqual(f.mode, 'rb+') self.assertIsNotNone(f.name) with self.assertRaises(AttributeError): f.newlines with self.assertRaises(AttributeError): f.encoding
def test_text_newline_and_encoding(self): f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, newline='', encoding='utf-8') f.write("\u039B\r\n") f.seek(0) self.assertEqual(f.read(), "\u039B\r\n") self.assertFalse(f._rolled) self.assertEqual(f.mode, 'w+') self.assertIsNone(f.name) self.assertIsNone(f.newlines) self.assertIsNone(f.encoding) f.write("\u039B" * 20 + "\r\n") f.seek(0) self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n") self.assertTrue(f._rolled) self.assertEqual(f.mode, 'w+') self.assertIsNotNone(f.name) self.assertIsNotNone(f.newlines) self.assertEqual(f.encoding, 'utf-8')
def test_truncate_with_size_parameter(self): # A SpooledTemporaryFile can be truncated to zero size f = tempfile.SpooledTemporaryFile(max_size=10) f.write(b'abcdefg\n') f.seek(0) f.truncate() self.assertFalse(f._rolled) self.assertEqual(f._file.getvalue(), b'') # A SpooledTemporaryFile can be truncated to a specific size f = tempfile.SpooledTemporaryFile(max_size=10) f.write(b'abcdefg\n') f.truncate(4) self.assertFalse(f._rolled) self.assertEqual(f._file.getvalue(), b'abcd') # A SpooledTemporaryFile rolls over if truncated to large size f = tempfile.SpooledTemporaryFile(max_size=10) f.write(b'abcdefg\n') f.truncate(20) self.assertTrue(f._rolled) if has_stat: self.assertEqual(os.fstat(f.fileno()).st_size, 20)
def csv_masks(request, hashfile_id): hashfile = get_object_or_404(Hashfile, id=hashfile_id) # didn't found the correct way in pure django... res = Cracked.objects.raw("SELECT id, password_mask, COUNT(*) AS count FROM Hashcat_cracked USE INDEX (hashfileid_id_index) WHERE hashfile_id=%s GROUP BY password_mask ORDER BY count DESC", [hashfile.id]) fp = tempfile.SpooledTemporaryFile(mode='w') csvfile = csv.writer(fp, quotechar='"', quoting=csv.QUOTE_ALL) for item in res: csvfile.writerow([item.count, item.password_mask]) fp.seek(0) # rewind the file handle csvfile_data = fp.read() for query in connection.queries[-1:]: print(query["sql"]) print(query["time"]) response = HttpResponse(csvfile_data, content_type='application/force-download') # mimetype is replaced by content_type for django 1.7 response['Content-Disposition'] = 'attachment; filename=%s_masks.csv' % hashfile.name return response
def download(): trackrels = request.query.tracks.split('|') # write the archive into a temporary in-memory file-like object temp = tempfile.SpooledTemporaryFile() with zipfile.ZipFile(temp, 'w', zipfile.ZIP_DEFLATED) as archive: for trackrel in trackrels: base_wildcard = trackrel.replace("-track.csv", "*") paths = config.TRACKDIR.glob(base_wildcard) for path in paths: archive.write(str(path), str(path.relative_to(config.TRACKDIR)) ) temp.seek(0) # force a download; give it a filename and mime type response.set_header('Content-Disposition', 'attachment; filename="data.zip"') response.set_header('Content-Type', 'application/zip') # relying on garbage collector to delete tempfile object # (and hence the file itself) when done sending return temp
def initCase(switches, count): Failures.failedItems = [] Failures.failedParseOn = None Failures.failedTraceBack = None paths.SQLMAP_OUTPUT_PATH = tempfile.mkdtemp(prefix="sqlmaptest-%d-" % count) paths.SQLMAP_DUMP_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "dump") paths.SQLMAP_FILES_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "files") logger.debug("using output directory '%s' for this test case" % paths.SQLMAP_OUTPUT_PATH) LOGGER_HANDLER.stream = sys.stdout = tempfile.SpooledTemporaryFile(max_size=0, mode="w+b", prefix="sqlmapstdout-") cmdLineOptions = cmdLineParser() if switches: for key, value in switches.items(): if key in cmdLineOptions.__dict__: cmdLineOptions.__dict__[key] = value initOptions(cmdLineOptions, True) init()
def say(self, phrase): cmd = ['text2wave', '-eval', '(voice_%s)' % self.voice] with tempfile.SpooledTemporaryFile() as out_f: with tempfile.SpooledTemporaryFile() as in_f: in_f.write(phrase) in_f.seek(0) with tempfile.SpooledTemporaryFile() as err_f: self._logger.debug( 'Executing %s', ' '.join([pipes.quote(arg) for arg in cmd])) subprocess.call(cmd, stdin=in_f, stdout=out_f, stderr=err_f) err_f.seek(0) output = err_f.read() if output: self._logger.debug("Output was: '%s'", output) out_f.seek(0) return out_f.read()
def say(self, phrase): with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: fname = f.name cmd = [EXECUTABLE, '-o', fname, '--file-format=WAVE', str(phrase)] self._logger.debug('Executing %s', ' '.join([pipes.quote(arg) for arg in cmd])) with tempfile.SpooledTemporaryFile() as f: subprocess.call(cmd, stdout=f, stderr=f) f.seek(0) output = f.read() if output: self._logger.debug("Output was: '%s'", output) with open(fname, 'rb') as f: data = f.read() os.remove(fname) return data
def mp3_to_wave(self, filename): mf = mad.MadFile(filename) with tempfile.SpooledTemporaryFile() as f: wav = wave.open(f, mode='wb') wav.setframerate(mf.samplerate()) wav.setnchannels(1 if mf.mode() == mad.MODE_SINGLE_CHANNEL else 2) # 4L is the sample width of 32 bit audio wav.setsampwidth(4) frame = mf.read() while frame is not None: wav.writeframes(frame) frame = mf.read() wav.close() f.seek(0) data = f.read() return data
def decrypt(self): decrypted_chunk = SpooledTemporaryFile( max_size=self.POOL_SIZE, mode='wb+' ) cipher = AES.new(self.key, AES.MODE_CBC, self.iv) next_chunk = '' finished = False while not finished: chunk, next_chunk = next_chunk, \ self.chunk_stream.read(1024 * AES.block_size) chunk = cipher.decrypt(chunk) if len(next_chunk) == 0: chunk = self.pkcs7_reverse_padded_chunk(chunk) finished = True if chunk: decrypted_chunk.write(chunk) decrypted_chunk.seek(0) return decrypted_chunk
def __setstate__(self, state): disable_cuda = False for key in self.cuda_dependent_attributes_: if key not in state: continue dump = state.pop(key) with tempfile.SpooledTemporaryFile() as f: f.write(dump) f.seek(0) if state['use_cuda'] and not torch.cuda.is_available(): disable_cuda = True val = torch.load( f, map_location=lambda storage, loc: storage) else: val = torch.load(f) state[key] = val if disable_cuda: warnings.warn( "Model configured to use CUDA but no CUDA devices " "available. Loading on CPU instead.", DeviceWarning) state['use_cuda'] = False self.__dict__.update(state)
def __call__(self, environ, start_response): if environ.get('HTTP_CONTENT_ENCODING', '') == 'gzip': try: environ['wsgi.input'].tell() wsgi_input = environ['wsgi.input'] except (AttributeError, IOError, NotImplementedError): # The gzip implementation in the standard library of Python 2.x # requires working '.seek()' and '.tell()' methods on the input # stream. Read the data into a temporary file to work around # this limitation. wsgi_input = tempfile.SpooledTemporaryFile(16 * 1024 * 1024) shutil.copyfileobj(environ['wsgi.input'], wsgi_input) wsgi_input.seek(0) environ['wsgi.input'] = gzip.GzipFile(filename=None, fileobj=wsgi_input, mode='r') del environ['HTTP_CONTENT_ENCODING'] if 'CONTENT_LENGTH' in environ: del environ['CONTENT_LENGTH'] return self.app(environ, start_response)
def buffer_iter(cls, orig_iter, buff_size=65536): out = SpooledTemporaryFile(buff_size) size = 0 for buff in orig_iter: size += len(buff) out.write(buff) content_length_str = str(size) out.seek(0) def read_iter(): while True: buff = out.read(buff_size) if not buff: break yield buff return content_length_str, read_iter() # ============================================================================
def getFile(self, site, inner_path): # Use streamFile if client supports it if config.stream_downloads and self.connection and self.connection.handshake and self.connection.handshake["rev"] > 310: return self.streamFile(site, inner_path) location = 0 if config.use_tempfiles: buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b') else: buff = StringIO() s = time.time() while True: # Read in 512k parts res = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location}) if not res or "body" not in res: # Error return False buff.write(res["body"]) res["body"] = None # Save memory if res["location"] == res["size"]: # End of file break else: location = res["location"] self.download_bytes += res["location"] self.download_time += (time.time() - s) if self.site: self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + res["location"] buff.seek(0) return buff # Download file out of msgpack context to save memory and cpu
def streamFile(self, site, inner_path): location = 0 if config.use_tempfiles: buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b') else: buff = StringIO() s = time.time() while True: # Read in 512k parts res = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location}, stream_to=buff) if not res: # Error self.log("Invalid response: %s" % res) return False if res["location"] == res["size"]: # End of file break else: location = res["location"] self.download_bytes += res["location"] self.download_time += (time.time() - s) self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + res["location"] buff.seek(0) return buff # Send a ping request
def spooled(self, max_size: int = 0, mode: str = 'w+b', buffering: int = -1, encoding: typing.Optional[str] = None, newline: typing.Optional[str] = None, suffix: typing.Optional[str] = DEFAULT_SUFFIX, prefix: typing.Optional[str] = DEFAULT_PREFIX, dir: typing.Optional[str] = None) -> typing.IO: """ Create a new spooled temporary file within the scratch dir. This returns a :class:`~tempfile.SpooledTemporaryFile` which is a specialized object that wraps a :class:`StringIO`/:class:`BytesIO` instance that transparently overflows into a file on the disk once it reaches a certain size. By default, a spooled file will never roll over to disk. :param max_size: (Optional) max size before the in-memory buffer rolls over to disk :type max_size: :class:`~int` :param mode: (Optional) mode to open the file with :type mode: :class:`~str` :param buffering: (Optional) size of the file buffer :type buffering: :class:`~int` :param encoding: (Optional) encoding to open the file with :type encoding: :class:`~str` :param newline: (Optional) newline argument to open the file with :type newline: :class:`~str` or :class:`~NoneType` :param suffix: (Optional) filename suffix :type suffix: :class:`~str` or :class:`~NoneType` :param prefix: (Optional) filename prefix :type prefix: :class:`~str` or :class:`~NoneType` :param dir: (Optional) relative path to directory within the scratch dir where the file should exist :type dir: :class:`~bool` :return: SpooledTemporaryFile instance :rtype: :class:`~tempfile.SpooledTemporaryFile` """ return tempfile.SpooledTemporaryFile(max_size, mode, buffering, encoding, newline, suffix, prefix, self.join(dir))
def _create_temp_file(cls): return tempfile.SpooledTemporaryFile(max_size=512*1024) # ============================================================================
def readFromBytes(self, b, headerEndianess='>'): with tempfile.SpooledTemporaryFile(mode='w+b') as f: f.write(b) f.seek(0) self.readFromFile(f, headerEndianess)
def __enter__(self): """ Create the temporary file in memory first, when it uses too much memory it is automatically relocated to the filesystem. """ self.file = tempfile.SpooledTemporaryFile(max_size=self.max_size) return self.file
def generate_hash(self): """Requests the image as found in `url` and generates a perceptual_hash from it""" # This is slow: it has to get the image and spool it to a tempfile, then compute # the hash return None # req = requests.get(self.url) # if req.status_code == 200: # buff = tempfile.SpooledTemporaryFile(max_size=1e9) # downloaded = 0 # filesize = int(req.headers.get('content-length', 1000)) # Set a default length for the test client # for chunk in req.iter_content(): # downloaded += len(chunk) # buff.write(chunk) # buff.seek(0) # im = PillowImage.open(io.BytesIO(buff.read())) # return str(imagehash.average_hash(im))
def do_create(self, max_size=0, dir=None, pre="", suf=""): if dir is None: dir = tempfile.gettempdir() try: file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf) except: self.failOnException("SpooledTemporaryFile") return file
def test_basic(self): # SpooledTemporaryFile can create files f = self.do_create() self.assertFalse(f._rolled) f = self.do_create(max_size=100, pre="a", suf=".txt") self.assertFalse(f._rolled)