我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.tempdir()。
def test_exports(self): # There are no surprising symbols in the tempfile module dict = tempfile.__dict__ expected = { "NamedTemporaryFile" : 1, "TemporaryFile" : 1, "mkstemp" : 1, "mkdtemp" : 1, "mktemp" : 1, "TMP_MAX" : 1, "gettempprefix" : 1, "gettempdir" : 1, "tempdir" : 1, "template" : 1, "SpooledTemporaryFile" : 1 } unexp = [] for key in dict: if key[0] != '_' and key not in expected: unexp.append(key) self.assertTrue(len(unexp) == 0, "unexpected keys: %s" % unexp)
def download_and_import(self, repo): try: response = urllib2.urlopen(GITHUB_LINK.format(repo)) response_sio = StringIO.StringIO(response.read()) with zipfile.ZipFile(response_sio) as repo_zip: repo_zip.extractall(tempfile.tempdir) deck_base_name = repo.split("/")[-1] deck_directory_wb = Path(tempfile.tempdir).joinpath(deck_base_name + "-" + BRANCH_NAME) deck_directory = Path(tempfile.tempdir).joinpath(deck_base_name) utils.fs_remove(deck_directory) deck_directory_wb.rename(deck_directory) # Todo progressbar on download AnkiJsonImporter.import_deck(self.collection, deck_directory) except (urllib2.URLError, urllib2.HTTPError, OSError) as error: aqt.utils.showWarning("Error while trying to get deck from Github: {}".format(error)) raise
def setUp(self): """ Tests setup. """ self._orig_environ = dict(os.environ) # create a temporary default config file _, self.file_path = tempfile.mkstemp( suffix='.yml', dir=tempfile.tempdir ) with open(self.file_path, 'w') as outfile: yaml.dump(TEST_CONFIG, outfile, default_flow_style=False) os.environ['VIDEO_PIPELINE_CFG'] = self.file_path # create a temporary static config file _, self.static_file_path = tempfile.mkstemp( suffix='.yml', dir=tempfile.tempdir ) with open(self.static_file_path, 'w') as outfile: yaml.dump(TEST_STATIC_CONFIG, outfile, default_flow_style=False)
def test_explicit_file_uri(self): tmp_dir = tempfile.tempdir or '' uri_str = 'file://%s' % urllib.request.pathname2url(tmp_dir) uri = boto.storage_uri(uri_str, validate=False, suppress_consec_slashes=False) self.assertEqual('file', uri.scheme) self.assertEqual(uri_str, uri.uri) self.assertFalse(hasattr(uri, 'versionless_uri')) self.assertEqual('', uri.bucket_name) self.assertEqual(tmp_dir, uri.object_name) self.assertFalse(hasattr(uri, 'version_id')) self.assertFalse(hasattr(uri, 'generation')) self.assertFalse(hasattr(uri, 'is_version_specific')) self.assertEqual(uri.names_provider(), False) self.assertEqual(uri.names_bucket(), False) # Don't check uri.names_container(), uri.names_directory(), # uri.names_file(), or uri.names_object(), because for file URIs these # functions look at the file system and apparently unit tests run # chroot'd. self.assertEqual(uri.is_stream(), False)
def test_implicit_file_uri(self): tmp_dir = tempfile.tempdir or '' uri_str = '%s' % urllib.request.pathname2url(tmp_dir) uri = boto.storage_uri(uri_str, validate=False, suppress_consec_slashes=False) self.assertEqual('file', uri.scheme) self.assertEqual('file://%s' % tmp_dir, uri.uri) self.assertFalse(hasattr(uri, 'versionless_uri')) self.assertEqual('', uri.bucket_name) self.assertEqual(tmp_dir, uri.object_name) self.assertFalse(hasattr(uri, 'version_id')) self.assertFalse(hasattr(uri, 'generation')) self.assertFalse(hasattr(uri, 'is_version_specific')) self.assertEqual(uri.names_provider(), False) self.assertEqual(uri.names_bucket(), False) # Don't check uri.names_container(), uri.names_directory(), # uri.names_file(), or uri.names_object(), because for file URIs these # functions look at the file system and apparently unit tests run # chroot'd. self.assertEqual(uri.is_stream(), False)
def make_torrent(path, passkey, output_dir=None): ''' Creates a torrent suitable for uploading to PTH. - `path`: The directory or file to upload. - `passkey`: Your tracker passkey. - `output_dir`: The directory where the torrent will be created. If unspecified, {} will be used. '''.format(tempfile.tempdir) if output_dir is None: output_dir = tempfile.tempdir torrent_path = tempfile.mktemp(dir=output_dir, suffix='.torrent') torrent = metafile.Metafile(torrent_path) announce_url = 'https://please.passtheheadphones.me/{}/announce'.format(passkey) torrent.create(path, [announce_url], private=True, callback=_add_source) return torrent_path
def _createTemporaryDirectory(): """ Creates temporary directory for this run. """ try: if not os.path.isdir(tempfile.gettempdir()): os.makedirs(tempfile.gettempdir()) except IOError, ex: errMsg = "there has been a problem while accessing " errMsg += "system's temporary directory location(s) ('%s'). Please " % ex errMsg += "make sure that there is enough disk space left. If problem persists, " errMsg += "try to set environment variable 'TEMP' to a location " errMsg += "writeable by the current user" raise SqlmapSystemException, errMsg if "sqlmap" not in (tempfile.tempdir or ""): tempfile.tempdir = tempfile.mkdtemp(prefix="sqlmap", suffix=str(os.getpid())) kb.tempDir = tempfile.tempdir if not os.path.isdir(tempfile.tempdir): os.makedirs(tempfile.tempdir)
def test_exports(self): # There are no surprising symbols in the tempfile module dict = tempfile.__dict__ expected = { "NamedTemporaryFile" : 1, "TemporaryFile" : 1, "mkstemp" : 1, "mkdtemp" : 1, "mktemp" : 1, "TMP_MAX" : 1, "gettempprefix" : 1, "gettempdir" : 1, "tempdir" : 1, "template" : 1, "SpooledTemporaryFile" : 1, "TemporaryDirectory" : 1, } unexp = [] for key in dict: if key[0] != '_' and key not in expected: unexp.append(key) self.assertTrue(len(unexp) == 0, "unexpected keys: %s" % unexp)
def default(): """ Obtain the global default instance of TemporaryDirectory, creating it first if necessary. Failures are propagated to caller. Most callers are expected to use this function rather than instantiating TemporaryDirectory directly, unless they explicitly desdire to have their "own" directory for some reason. This function is thread-safe. """ global _defaultLock global _defaultInstance _defaultLock.acquire() try: if _defaultInstance is None or _defaultInstance.dir() is None: _defaultInstance = TemporaryDirectory(temproot=globals.temproot) # set the temp dir to be the default in tempfile module from now on tempfile.tempdir = _defaultInstance.dir() return _defaultInstance finally: _defaultLock.release()
def override_temp(replacement): """ Monkey-patch tempfile.tempdir with replacement, ensuring it exists """ pkg_resources.py31compat.makedirs(replacement, exist_ok=True) saved = tempfile.tempdir tempfile.tempdir = replacement try: yield finally: tempfile.tempdir = saved
def _prepare_protocol_v1(self, argv, ifile, ofile): debug = environment.splunklib_logger.debug # Provide as much context as possible in advance of parsing the command line and preparing for execution self._input_header.read(ifile) self._protocol_version = 1 self._map_metadata(argv) debug(' metadata=%r, input_header=%r', self._metadata, self._input_header) try: tempfile.tempdir = self._metadata.searchinfo.dispatch_dir except AttributeError: raise RuntimeError('{}.metadata.searchinfo.dispatch_dir is undefined'.format(self.__class__.__name__)) debug(' tempfile.tempdir=%r', tempfile.tempdir) CommandLineParser.parse(self, argv[2:]) self.prepare() if self.record: self.record = False record_argv = [argv[0], argv[1], str(self._options), ' '.join(self.fieldnames)] ifile, ofile = self._prepare_recording(record_argv, ifile, ofile) self._record_writer.ofile = ofile ifile.record(str(self._input_header), '\n\n') if self.show_configuration: self.write_info(self.name + ' command configuration: ' + str(self._configuration)) return ifile # wrapped, if self.record is True
def _make_temp_file_for_upload(self, upload): fname = os.path.join(tempfile.tempdir, "{0}-{1}".format(random.randint(0, 1000000), upload.filename)) with open(fname, "wb") as f: f.write(upload.content) def rmfile(): os.unlink(fname) self.addCleanup(rmfile) return fname
def override_temp(replacement): """ Monkey-patch tempfile.tempdir with replacement, ensuring it exists """ if not os.path.isdir(replacement): os.makedirs(replacement) saved = tempfile.tempdir tempfile.tempdir = replacement try: yield finally: tempfile.tempdir = saved
def tempdir(**kwargs): tempfile.tempdir = CONF.tempdir tmpdir = tempfile.mkdtemp(**kwargs) try: yield tmpdir finally: try: shutil.rmtree(tmpdir) except OSError as e: LOG.error(_LE('Could not remove tmpdir: %s'), e)
def check_dir(directory_to_check=None, required_space=1): """Check a directory is usable. This function can be used by drivers to check that directories they need to write to are usable. This should be called from the drivers init function. This function checks that the directory exists and then calls check_dir_writable and check_dir_free_space. If directory_to_check is not provided the default is to use the temp directory. :param directory_to_check: the directory to check. :param required_space: amount of space to check for in MiB. :raises: PathNotFound if directory can not be found :raises: DirectoryNotWritable if user is unable to write to the directory :raises InsufficientDiskSpace: if free space is < required space """ # check if directory_to_check is passed in, if not set to tempdir if directory_to_check is None: directory_to_check = ( tempfile.gettempdir() if CONF.tempdir is None else CONF.tempdir) LOG.debug("checking directory: %s", directory_to_check) if not os.path.exists(directory_to_check): raise exception.PathNotFound(dir=directory_to_check) _check_dir_writable(directory_to_check) _check_dir_free_space(directory_to_check, required_space)
def __init__(self, prefix: str = '', suffix: str = '.scratchdir', base: typing.Optional[str] = None, root: typing.Optional[str] = tempfile.tempdir, wd: typing.Optional[str] = None) -> None: self.prefix = prefix self.suffix = suffix self.base = base self.root = root self.wd = wd
def setup(self) -> None: """ Setup the scratch dir by creating a temporary directory to become the root of all new temporary directories and files created by this instance. :return: Nothing :rtype: :class:`~NoneType` """ tempfile.tempdir = self.root self.wd = self.wd or tempfile.mkdtemp(self.suffix, self.prefix, self.base)
def profile(func): """ Profile decorator: Add @profile.profile on a function to profile it. """ tempfile.tempdir = "/tmp/taemin" profile_path = tempfile.gettempdir() def wraps(*args, **kwargs): """ Wrapper function""" profiling = cProfile.Profile() start = time.time() res = profiling.runcall(func, *args, **kwargs) # Execution time: delta = time.time() - start stats_file = "%s_%f_%fs_%s" % (profile_path, time.time(), delta, func.__name__) LOGGER.info("Dumping stats to %r" % stats_file) (pstats.Stats(profiling, stream=io.StringIO()) .sort_stats("cumulative") .dump_stats(stats_file)) return res return wraps
def setUp(self): self.tmpdir = realpath(tempfile.mkdtemp()) tempfile.tempdir = self.tmpdir
def tearDown(self): # This is safe - the module will just call gettempdir() again tempfile.tempdir = None rmtree(self.tmpdir)
def _(): import atexit import tempfile tempfile.tempdir = tempdir = tempfile.mkdtemp() @atexit.register def cleanup_tempdir(): import shutil shutil.rmtree(tempdir)
def cliTempLogger(): file_name = "sovrin_cli_test.log" file_path = os.path.join(tempfile.tempdir, file_name) with open(file_path, 'w') as f: pass return file_path
def init_env(deps=all_deps): dest_dir = PREFIX ensure_clear_dir(dest_dir) tdir = tempfile.tempdir if iswindows else os.path.join(tempfile.gettempdir(), 't') ensure_clear_dir(tdir) set_tdir(tdir) install_pkgs(deps, dest_dir) return dest_dir