def mkstemp(suffix=None, prefix=None, dir=None, text=False): """ Args: suffix (`pathlike` or `None`): suffix or `None` to use the default prefix (`pathlike` or `None`): prefix or `None` to use the default dir (`pathlike` or `None`): temp dir or `None` to use the default text (bool): if the file should be opened in text mode Returns: Tuple[`int`, `fsnative`]: A tuple containing the file descriptor and the file path Raises: EnvironmentError Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative` path. """ suffix = fsnative() if suffix is None else path2fsn(suffix) prefix = gettempprefix() if prefix is None else path2fsn(prefix) dir = gettempdir() if dir is None else path2fsn(dir) return tempfile.mkstemp(suffix, prefix, dir, text)
def mkdtemp(suffix=None, prefix=None, dir=None): """ Args: suffix (`pathlike` or `None`): suffix or `None` to use the default prefix (`pathlike` or `None`): prefix or `None` to use the default dir (`pathlike` or `None`): temp dir or `None` to use the default Returns: `fsnative`: A path to a directory Raises: EnvironmentError Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative` path. """ suffix = fsnative() if suffix is None else path2fsn(suffix) prefix = gettempprefix() if prefix is None else path2fsn(prefix) dir = gettempdir() if dir is None else path2fsn(dir) return tempfile.mkdtemp(suffix, prefix, dir)
def runTest(self): """The actual test goes here.""" if os.system( "ruby --help > %s" % os.path.join( tempfile.gettempdir(), tempfile.gettempprefix() ) ): self.stop() raise RuntimeError("""Can't find the "ruby" command.""") self.reportProgres() if not os.path.exists("py2rb/builtins/module.rb"): self.stop() raise RuntimeError("""Can't find the "py2rb/builtins/module.rb" command.""") if not os.path.exists("py2rb/builtins/using.rb"): self.stop() raise RuntimeError("""Can't find the "py2rb/builtins/using.rb" command.""") if not os.path.exists("py2rb/builtins/require.rb"): self.stop() raise RuntimeError("""Can't find the "py2rb/builtins/require.rb" command.""") self.reportProgres()
def serve(content): """Write content to a temp file and serve it in browser""" temp_folder = tempfile.gettempdir() temp_file_name = tempfile.gettempprefix() + str(uuid.uuid4()) + ".html" # Generate a file path with a random name in temporary dir temp_file_path = os.path.join(temp_folder, temp_file_name) # save content to temp file save(temp_file_path, content) # Open templfile in a browser webbrowser.open("file://{}".format(temp_file_path)) # Block the thread while content is served try: while True: time.sleep(1) except KeyboardInterrupt: # cleanup the temp file os.remove(temp_file_path)
def test_exports(self): # There are no surprising symbols in the tempfile module dict = tempfile.__dict__ expected = { "NamedTemporaryFile" : 1, "TemporaryFile" : 1, "mkstemp" : 1, "mkdtemp" : 1, "mktemp" : 1, "TMP_MAX" : 1, "gettempprefix" : 1, "gettempdir" : 1, "tempdir" : 1, "template" : 1, "SpooledTemporaryFile" : 1, "TemporaryDirectory" : 1, } unexp = [] for key in dict: if key[0] != '_' and key not in expected: unexp.append(key) self.assertTrue(len(unexp) == 0, "unexpected keys: %s" % unexp)
def test_usable_template(self): # gettempprefix returns a usable prefix string # Create a temp directory, avoiding use of the prefix. # Then attempt to create a file whose name is # prefix + 'xxxxxx.xxx' in that directory. p = tempfile.gettempprefix() + "xxxxxx.xxx" d = tempfile.mkdtemp(prefix="") try: p = os.path.join(d, p) try: fd = os.open(p, os.O_RDWR | os.O_CREAT) except: self.failOnException("os.open") os.close(fd) os.unlink(p) finally: os.rmdir(d)
def test_exports(self): # There are no surprising symbols in the tempfile module dict = tempfile.__dict__ expected = { "NamedTemporaryFile" : 1, "TemporaryFile" : 1, "mkstemp" : 1, "mkdtemp" : 1, "mktemp" : 1, "TMP_MAX" : 1, "gettempprefix" : 1, "gettempdir" : 1, "tempdir" : 1, "template" : 1, "SpooledTemporaryFile" : 1 } unexp = [] for key in dict: if key[0] != '_' and key not in expected: unexp.append(key) self.assertTrue(len(unexp) == 0, "unexpected keys: %s" % unexp)
def mkstemp(suffix="", prefix=_tempfile.gettempprefix(), dir=None, text=False): """ Create secure temp file """ # pylint: disable = W0622 if dir is None: dir = _tempfile.gettempdir() if text: flags = _text_openflags else: flags = _bin_openflags count = 100 while count > 0: j = _tempfile._counter.get_next() # pylint: disable = E1101, W0212 fname = _os.path.join(dir, prefix + str(j) + suffix) try: fd = _os.open(fname, flags, 0600) except OSError, e: if e.errno == _errno.EEXIST: count -= 1 continue raise _set_cloexec(fd) return fd, _os.path.abspath(fname) raise IOError, (_errno.EEXIST, "No usable temporary file name found")
def test_fetch(self): # Create temp file to copy file = open(os.path.join(self._copyFile.copy_from_prefix, "%sbigFileXYZA" % tempfile.gettempprefix()), 'w') file.write("Lots of interesting stuff") file.close() # Perform copy interpolate_dict = {"project_identifier": tempfile.gettempprefix(), "product_identifier": "bigFile", "run_identifier": "XYZ"} result_paths = self._copyFile.fetch(interpolate_dict=interpolate_dict) # Test copied file exists and DataFile matches self.assertEqual(len(result_paths), 1) df = result_paths[0] self.assertEqual(df.file_name, "%sbigFileXYZB" % tempfile.gettempprefix()) self.assertEqual(df.location, os.path.join(tempfile.gettempdir(), "%sbigFileXYZB" % tempfile.gettempprefix())) self.assertEqual(df.equipment, self._equipmentSequencer) self.assertIs(filecmp.cmp( os.path.join(tempfile.gettempdir(), "%sbigFileXYZA" % tempfile.gettempprefix()), os.path.join(tempfile.gettempdir(), "%sbigFileXYZB" % tempfile.gettempprefix())), True) # Clean up os.remove(os.path.join(tempfile.gettempdir(), "%sbigFileXYZA" % tempfile.gettempprefix())) os.remove(os.path.join(tempfile.gettempdir(), "%sbigFileXYZB" % tempfile.gettempprefix()))
def _run_scan(arguments, scan_target, allow_file=False): tmp_path = arguments.tmp_path if tmp_path is None: tmp_path = os.path.join(tempfile.gettempdir(), tempfile.gettempprefix() + smoke_zephyr.utilities.random_string_alphanumeric(8)) fetch.smart_fetch(scan_target, tmp_path, allow_file=allow_file) scanner = runner.SubprocessRunner( tmp_path, shutil.which('python') ) print('[*] scanning: ' + tmp_path) scanner.run() scanner.wait() if not arguments.save_path: shutil.rmtree(tmp_path) return scanner
def gettempprefix(): """ Returns: `fsnative` Like :func:`python3:tempfile.gettempprefix`, but always returns a `fsnative` path """ return path2fsn(tempfile.gettempprefix())
def transform_command_with_value(command, value, notification_timestamp): python_download_script = 'zk_download_data.py' if len(value) > _LONG_VALUE_THRESHOLD: # If the value is too long (serverset is too large), OSError may be thrown. # Instead of passing it in command line, write to a temp file and # let zk_download_data read from it. value = value.replace("\n", "").replace("\r", "") md5digest = zk_util.get_md5_digest(value) tmp_filename = 'zk_update_largefile_' + md5digest + '_' + str(notification_timestamp) tmp_dir = tempfile.gettempprefix() tmp_filepath = os.path.join('/', tmp_dir, tmp_filename) log.info("This is a long value, write it to temp file %s", tmp_filepath) try: with open(tmp_filepath, 'w') as f: f.write(value + '\n' + md5digest) except Exception as e: log.exception( "%s: Failed to generate temp file %s for storing large size values" % (e.message, tmp_filepath)) return (None, None) finally: f.close() transformed_command = command.replace( python_download_script, "%s -l %s" % ( python_download_script, tmp_filepath)) return transformed_command, tmp_filepath else: transformed_command = command.replace( python_download_script, "%s -v '%s'" % ( python_download_script, value)) return transformed_command, None
def test_sane_template(self): # gettempprefix returns a nonempty prefix string p = tempfile.gettempprefix() self.assertIsInstance(p, str) self.assertTrue(len(p) > 0)
def test_sane_template(self): # gettempprefix returns a nonempty prefix string p = tempfile.gettempprefix() self.assertIsInstance(p, basestring) self.assertTrue(len(p) > 0)
def test_usable_template(self): # gettempprefix returns a usable prefix string # Create a temp directory, avoiding use of the prefix. # Then attempt to create a file whose name is # prefix + 'xxxxxx.xxx' in that directory. p = tempfile.gettempprefix() + "xxxxxx.xxx" d = tempfile.mkdtemp(prefix="") try: p = os.path.join(d, p) fd = os.open(p, os.O_RDWR | os.O_CREAT) os.close(fd) os.unlink(p) finally: os.rmdir(d)
def resolver_dir(config): resolvers = [] config.resolver_dir = os.path.join( tempfile.gettempdir(), "{0}-{1}".format(tempfile.gettempprefix(), "resolver") ) resolvers.append(config.resolver_dir) yield config.resolver_dir resolvers.append(config.resolver_dir) for resolver in filter(None, set(resolvers)): if os.path.isdir(resolver): os.rmdir(resolver)
def NamedTemporaryFile(mode='w+b', bufsize=-1, suffix="tmp", prefix=gettempprefix(), dir=None, delete=True): """ Variation on tempfile.NamedTemporaryFile(…), such that suffixes are passed WITHOUT specifying the period in front (versus the standard library version which makes you pass suffixes WITH the fucking period, ugh). """ from tempfile import _bin_openflags, _text_openflags, \ _mkstemp_inner, _os, \ _TemporaryFileWrapper, \ gettempdir if dir is None: dir = gettempdir() if 'b' in mode: flags = _bin_openflags else: flags = _text_openflags if _os.name == 'nt' and delete: flags |= _os.O_TEMPORARY (fd, name) = _mkstemp_inner(dir, prefix, ".%s" % suffix, flags) try: file = _os.fdopen(fd, mode, bufsize) return _TemporaryFileWrapper(file, name, delete) except BaseException as baseexc: _os.unlink(name) _os.close(fd) raise FilesystemError(str(baseexc))
def test_get_as_csv(self): temp_file = os.path.join(tempfile.gettempdir(), '%s%s.tmp' % (tempfile.gettempprefix(), datetime.now().strftime('%f'))) with open(temp_file, 'wb') as f: self.test_client.datasets.get_csv(self.ds_name, f, page_size=1000) with open(temp_file, 'r') as f: # the +1 is for the headers written to the file self.assertEqual(len(self.data) + 1, len(f.readlines())) os.remove(temp_file)