Python tempfile 模块,_get_default_tempdir() 实例源码
我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用tempfile._get_default_tempdir()。
def upx_unpack(self, seed=None):
# dump bytez to a temporary file
tmpfilename = os.path.join(
tempfile._get_default_tempdir(), next(tempfile._get_candidate_names()))
with open(tmpfilename, 'wb') as outfile:
outfile.write(self.bytez)
with open(os.devnull, 'w') as DEVNULL:
retcode = subprocess.call(
['upx', tmpfilename, '-d', '-o', tmpfilename + '_unpacked'], stdout=DEVNULL, stderr=DEVNULL)
os.unlink(tmpfilename)
if retcode == 0: # sucessfully unpacked
with open(tmpfilename + '_unpacked', 'rb') as result:
self.bytez = result.read()
os.unlink(tmpfilename + '_unpacked')
return self.bytez
def test_file_loading1(self):
data = self.data[:1000]
directory = tempfile._get_default_tempdir()
filename = next(tempfile._get_candidate_names())
filename = directory + os.sep + filename + ".txt"
np.savetxt(filename, data)
consumer = ChainConsumer()
consumer.add_chain(filename)
summary = consumer.analysis.get_summary()
actual = np.array(list(summary.values())[0])
assert np.abs(actual[1] - 5.0) < 0.5
def test_file_loading2(self):
data = self.data[:1000]
directory = tempfile._get_default_tempdir()
filename = next(tempfile._get_candidate_names())
filename = directory + os.sep + filename + ".npy"
np.save(filename, data)
consumer = ChainConsumer()
consumer.add_chain(filename)
summary = consumer.analysis.get_summary()
actual = np.array(list(summary.values())[0])
assert np.abs(actual[1] - 5.0) < 0.5
def get_temp_file_name(tmp_dir=None, extension=''):
"""Return an availiable name for temporary file."""
tmp_name = next(tempfile._get_candidate_names())
if not tmp_dir:
tmp_dir = tempfile._get_default_tempdir()
if extension is not None:
tmp_name = tmp_name + '.' + extension
return os.path.join(tmp_dir, tmp_name)
def get_temp_file_name(tmp_dir=None, extension=''):
"""Return an availiable name for temporary file."""
if tmp_dir is None:
tmp_dir = iCount.TMP_ROOT
# pylint: disable=protected-access
tmp_name = next(tempfile._get_candidate_names())
if not tmp_dir:
# pylint: disable=protected-access
tmp_dir = tempfile._get_default_tempdir()
if extension is not None:
tmp_name = tmp_name + '.' + extension
return os.path.join(tmp_dir, tmp_name)
def process(self, fuzzresult):
temp_name = next(tempfile._get_candidate_names())
defult_tmp_dir = tempfile._get_default_tempdir()
filename = os.path.join(defult_tmp_dir, temp_name + ".png")
subprocess.call(['cutycapt', '--url=%s' % pipes.quote(fuzzresult.url), '--out=%s' % filename])
self.add_result("Screnshot taken, output at %s" % filename)
def upx_pack(self, seed=None):
# tested with UPX 3.91
random.seed(seed)
tmpfilename = os.path.join(
tempfile._get_default_tempdir(), next(tempfile._get_candidate_names()))
# dump bytez to a temporary file
with open(tmpfilename, 'wb') as outfile:
outfile.write(self.bytez)
options = ['--force', '--overlay=copy']
compression_level = random.randint(1, 9)
options += ['-{}'.format(compression_level)]
# --exact
# compression levels -1 to -9
# --overlay=copy [default]
# optional things:
# --compress-exports=0/1
# --compress-icons=0/1/2/3
# --compress-resources=0/1
# --strip-relocs=0/1
options += ['--compress-exports={}'.format(random.randint(0, 1))]
options += ['--compress-icons={}'.format(random.randint(0, 3))]
options += ['--compress-resources={}'.format(random.randint(0, 1))]
options += ['--strip-relocs={}'.format(random.randint(0, 1))]
with open(os.devnull, 'w') as DEVNULL:
retcode = subprocess.call(
['upx'] + options + [tmpfilename, '-o', tmpfilename + '_packed'], stdout=DEVNULL, stderr=DEVNULL)
os.unlink(tmpfilename)
if retcode == 0: # successfully packed
with open(tmpfilename + '_packed', 'rb') as infile:
self.bytez = infile.read()
os.unlink(tmpfilename + '_packed')
return self.bytez
def test_no_files_left_behind(self):
# use a private empty directory
our_temp_directory = tempfile.mkdtemp()
try:
# force _get_default_tempdir() to consider our empty directory
def our_candidate_list():
return [our_temp_directory]
with support.swap_attr(tempfile, "_candidate_tempdir_list",
our_candidate_list):
# verify our directory is empty after _get_default_tempdir()
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
def raise_OSError(*args, **kwargs):
raise OSError(-1)
with support.swap_attr(io, "open", raise_OSError):
# test again with failing io.open()
with self.assertRaises(IOError) as cm:
tempfile._get_default_tempdir()
self.assertEqual(cm.exception.errno, errno.ENOENT)
self.assertEqual(os.listdir(our_temp_directory), [])
open = io.open
def bad_writer(*args, **kwargs):
fp = open(*args, **kwargs)
fp.write = raise_OSError
return fp
with support.swap_attr(io, "open", bad_writer):
# test again with failing write()
with self.assertRaises(IOError) as cm:
tempfile._get_default_tempdir()
self.assertEqual(cm.exception.errno, errno.ENOENT)
self.assertEqual(os.listdir(our_temp_directory), [])
finally:
shutil.rmtree(our_temp_directory)
def test_no_files_left_behind(self):
# use a private empty directory
our_temp_directory = tempfile.mkdtemp()
try:
# force _get_default_tempdir() to consider our empty directory
def our_candidate_list():
return [our_temp_directory]
with support.swap_attr(tempfile, "_candidate_tempdir_list",
our_candidate_list):
# verify our directory is empty after _get_default_tempdir()
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
def raise_OSError(*args, **kwargs):
raise OSError(-1)
with support.swap_attr(io, "open", raise_OSError):
# test again with failing io.open()
with self.assertRaises(IOError) as cm:
tempfile._get_default_tempdir()
self.assertEqual(cm.exception.errno, errno.ENOENT)
self.assertEqual(os.listdir(our_temp_directory), [])
open = io.open
def bad_writer(*args, **kwargs):
fp = open(*args, **kwargs)
fp.write = raise_OSError
return fp
with support.swap_attr(io, "open", bad_writer):
# test again with failing write()
with self.assertRaises(IOError) as cm:
tempfile._get_default_tempdir()
self.assertEqual(cm.exception.errno, errno.ENOENT)
self.assertEqual(os.listdir(our_temp_directory), [])
finally:
shutil.rmtree(our_temp_directory)
def test_wanted_dirs(self):
# _candidate_tempdir_list contains the expected directories
# Make sure the interesting environment variables are all set.
with support.EnvironmentVarGuard() as env:
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = os.getenv(envname)
if not dirname:
env[envname] = os.path.abspath(envname)
cand = tempfile._candidate_tempdir_list()
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = os.getenv(envname)
if not dirname: raise ValueError
self.assertIn(dirname, cand)
try:
dirname = os.getcwd()
except (AttributeError, os.error):
dirname = os.curdir
self.assertIn(dirname, cand)
# Not practical to try to verify the presence of OS-specific
# paths in this list.
# We test _get_default_tempdir some more by testing gettempdir.
def test_no_files_left_behind(self):
# use a private empty directory
with tempfile.TemporaryDirectory() as our_temp_directory:
# force _get_default_tempdir() to consider our empty directory
def our_candidate_list():
return [our_temp_directory]
with support.swap_attr(tempfile, "_candidate_tempdir_list",
our_candidate_list):
# verify our directory is empty after _get_default_tempdir()
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
def raise_OSError(*args, **kwargs):
raise OSError()
with support.swap_attr(io, "open", raise_OSError):
# test again with failing io.open()
with self.assertRaises(FileNotFoundError):
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
open = io.open
def bad_writer(*args, **kwargs):
fp = open(*args, **kwargs)
fp.write = raise_OSError
return fp
with support.swap_attr(io, "open", bad_writer):
# test again with failing write()
with self.assertRaises(FileNotFoundError):
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
def __dump_file(self, temp = False):
# Clear out program data.......
pref_loc = self.__preferences
if temp:
try:
fd = tempfile.NamedTemporaryFile(dir=tempfile._get_default_tempdir(), delete=True)
except Exception as e:
logger.warn('[%s] Unable to create preferences temporary file. (%s)' % (self.chat_id, e))
raise Exception("Unable to create preferences temporary file. (%s)" % e)
else:
# try to open preferences file
try:
fd = open(self.fullpath, 'w', encoding='utf-8')
except Exception as e:
logger.warn('[%s] Unable to open preference file for writing. (%s)' % (self.chat_id, e))
raise Exception("Unable to open preferences file '%s." % self.fullpath)
try:
with fd:
json.dump(pref_loc, fd, indent=4, sort_keys=True, separators=(',', ':'))
except Exception as e:
logger.error('[%s] Unable to write to preferences file. (%s)' % (self.chat_id, e))
raise Exception("Unable to write preferences to file '%s."%self.fullpath)
# close file
fd.close()
def _find_attachment_ids_email(self):
atts = super(InvoiceEletronic, self)._find_attachment_ids_email()
attachment_obj = self.env['ir.attachment']
if self.model not in ('009'):
return atts
tmp = tempfile._get_default_tempdir()
temp_name = os.path.join(tmp, next(tempfile._get_candidate_names()))
command_args = ["--dpi", "84", str(self.url_danfe), temp_name]
wkhtmltopdf = [_get_wkhtmltopdf_bin()] + command_args
process = subprocess.Popen(wkhtmltopdf, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = process.communicate()
if process.returncode not in [0, 1]:
raise UserError(_('Wkhtmltopdf failed (error code: %s). '
'Message: %s') % (str(process.returncode), err))
tmpDanfe = None
with open(temp_name, 'r') as f:
tmpDanfe = f.read()
try:
os.unlink(temp_name)
except (OSError, IOError):
_logger.error('Error when trying to remove file %s' % temp_name)
if tmpDanfe:
danfe_id = attachment_obj.create(dict(
name="Danfe-%08d.pdf" % self.numero,
datas_fname="Danfe-%08d.pdf" % self.numero,
datas=base64.b64encode(tmpDanfe),
mimetype='application/pdf',
res_model='account.invoice',
res_id=self.invoice_id.id,
))
atts.append(danfe_id.id)
return atts
def process(self, fuzzresult):
temp_name = next(tempfile._get_candidate_names())
defult_tmp_dir = tempfile._get_default_tempdir()
filename = os.path.join(defult_tmp_dir, temp_name + ".png")
subprocess.call(['cutycapt', '--url=%s' % pipes.quote(fuzzresult.url), '--out=%s' % filename])
self.add_result("Screnshot taken, output at %s" % filename)
def generate_tmp_filename(extension):
return tempfile._get_default_tempdir() + "/" + next(tempfile._get_candidate_names()) + "." + extension
def test_no_files_left_behind(self):
# use a private empty directory
our_temp_directory = tempfile.mkdtemp()
try:
# force _get_default_tempdir() to consider our empty directory
def our_candidate_list():
return [our_temp_directory]
with support.swap_attr(tempfile, "_candidate_tempdir_list",
our_candidate_list):
# verify our directory is empty after _get_default_tempdir()
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
def raise_OSError(*args, **kwargs):
raise OSError(-1)
with support.swap_attr(io, "open", raise_OSError):
# test again with failing io.open()
with self.assertRaises(IOError) as cm:
tempfile._get_default_tempdir()
self.assertEqual(cm.exception.errno, errno.ENOENT)
self.assertEqual(os.listdir(our_temp_directory), [])
open = io.open
def bad_writer(*args, **kwargs):
fp = open(*args, **kwargs)
fp.write = raise_OSError
return fp
with support.swap_attr(io, "open", bad_writer):
# test again with failing write()
with self.assertRaises(IOError) as cm:
tempfile._get_default_tempdir()
self.assertEqual(cm.exception.errno, errno.ENOENT)
self.assertEqual(os.listdir(our_temp_directory), [])
finally:
shutil.rmtree(our_temp_directory)
def test_wanted_dirs(self):
# _candidate_tempdir_list contains the expected directories
# Make sure the interesting environment variables are all set.
with support.EnvironmentVarGuard() as env:
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = os.getenv(envname)
if not dirname:
env[envname] = os.path.abspath(envname)
cand = tempfile._candidate_tempdir_list()
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = os.getenv(envname)
if not dirname: raise ValueError
self.assertIn(dirname, cand)
try:
dirname = os.getcwd()
except (AttributeError, OSError):
dirname = os.curdir
self.assertIn(dirname, cand)
# Not practical to try to verify the presence of OS-specific
# paths in this list.
# We test _get_default_tempdir some more by testing gettempdir.
def test_no_files_left_behind(self):
# use a private empty directory
with tempfile.TemporaryDirectory() as our_temp_directory:
# force _get_default_tempdir() to consider our empty directory
def our_candidate_list():
return [our_temp_directory]
with support.swap_attr(tempfile, "_candidate_tempdir_list",
our_candidate_list):
# verify our directory is empty after _get_default_tempdir()
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
def raise_OSError(*args, **kwargs):
raise OSError()
with support.swap_attr(io, "open", raise_OSError):
# test again with failing io.open()
with self.assertRaises(FileNotFoundError):
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
open = io.open
def bad_writer(*args, **kwargs):
fp = open(*args, **kwargs)
fp.write = raise_OSError
return fp
with support.swap_attr(io, "open", bad_writer):
# test again with failing write()
with self.assertRaises(FileNotFoundError):
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
def test_no_files_left_behind(self):
# use a private empty directory
our_temp_directory = tempfile.mkdtemp()
try:
# force _get_default_tempdir() to consider our empty directory
def our_candidate_list():
return [our_temp_directory]
with support.swap_attr(tempfile, "_candidate_tempdir_list",
our_candidate_list):
# verify our directory is empty after _get_default_tempdir()
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
def raise_OSError(*args, **kwargs):
raise OSError(-1)
with support.swap_attr(io, "open", raise_OSError):
# test again with failing io.open()
with self.assertRaises(IOError) as cm:
tempfile._get_default_tempdir()
self.assertEqual(cm.exception.errno, errno.ENOENT)
self.assertEqual(os.listdir(our_temp_directory), [])
open = io.open
def bad_writer(*args, **kwargs):
fp = open(*args, **kwargs)
fp.write = raise_OSError
return fp
with support.swap_attr(io, "open", bad_writer):
# test again with failing write()
with self.assertRaises(IOError) as cm:
tempfile._get_default_tempdir()
self.assertEqual(cm.exception.errno, errno.ENOENT)
self.assertEqual(os.listdir(our_temp_directory), [])
finally:
shutil.rmtree(our_temp_directory)
def test_wanted_dirs(self):
# _candidate_tempdir_list contains the expected directories
# Make sure the interesting environment variables are all set.
with support.EnvironmentVarGuard() as env:
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = os.getenv(envname)
if not dirname:
env[envname] = os.path.abspath(envname)
cand = tempfile._candidate_tempdir_list()
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = os.getenv(envname)
if not dirname: raise ValueError
self.assertIn(dirname, cand)
try:
dirname = os.getcwd()
except (AttributeError, OSError):
dirname = os.curdir
self.assertIn(dirname, cand)
# Not practical to try to verify the presence of OS-specific
# paths in this list.
# We test _get_default_tempdir some more by testing gettempdir.
def test_no_files_left_behind(self):
# use a private empty directory
with tempfile.TemporaryDirectory() as our_temp_directory:
# force _get_default_tempdir() to consider our empty directory
def our_candidate_list():
return [our_temp_directory]
with support.swap_attr(tempfile, "_candidate_tempdir_list",
our_candidate_list):
# verify our directory is empty after _get_default_tempdir()
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
def raise_OSError(*args, **kwargs):
raise OSError()
with support.swap_attr(io, "open", raise_OSError):
# test again with failing io.open()
with self.assertRaises(FileNotFoundError):
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
open = io.open
def bad_writer(*args, **kwargs):
fp = open(*args, **kwargs)
fp.write = raise_OSError
return fp
with support.swap_attr(io, "open", bad_writer):
# test again with failing write()
with self.assertRaises(FileNotFoundError):
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])