Python tempfile 模块,mkstemp() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.mkstemp()。
def empty_db(self):
cmd = [
"mysqldump",
"-u%(user)s" % self.db_config,
"-h%(host)s" % self.db_config,
"--add_drop-table",
"--no-data",
"%(name)s" % self.db_config,
]
tmphandle, tmppath = tempfile.mkstemp(text=True)
tmpfile = os.fdopen(tmphandle, "w")
sql_data = subprocess.check_output(cmd, stderr=None).split('\n')
tmpfile.write("SET FOREIGN_KEY_CHECKS = 0;\n")
tmpfile.write("use %(name)s;\n" % self.db_config)
for line in sql_data:
if line.startswith("DROP"):
tmpfile.write(line + '\n')
tmpfile.close()
self._run_mysql_cmd("source %s" % tmppath)
os.remove(tmppath)
def _dump(self, file=None, format=None):
import tempfile
suffix = ''
if format:
suffix = '.'+format
if not file:
f, file = tempfile.mkstemp(suffix)
os.close(f)
self.load()
if not format or format == "PPM":
self.im.save_ppm(file)
else:
if not file.endswith(format):
file = file + "." + format
self.save(file, format)
return file
def mkstemp(suffix=None, prefix=None, dir=None, text=False):
"""
Args:
suffix (`pathlike` or `None`): suffix or `None` to use the default
prefix (`pathlike` or `None`): prefix or `None` to use the default
dir (`pathlike` or `None`): temp dir or `None` to use the default
text (bool): if the file should be opened in text mode
Returns:
Tuple[`int`, `fsnative`]:
A tuple containing the file descriptor and the file path
Raises:
EnvironmentError
Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative`
path.
"""
suffix = fsnative() if suffix is None else path2fsn(suffix)
prefix = gettempprefix() if prefix is None else path2fsn(prefix)
dir = gettempdir() if dir is None else path2fsn(dir)
return tempfile.mkstemp(suffix, prefix, dir, text)
def mkdtemp(suffix=None, prefix=None, dir=None):
"""
Args:
suffix (`pathlike` or `None`): suffix or `None` to use the default
prefix (`pathlike` or `None`): prefix or `None` to use the default
dir (`pathlike` or `None`): temp dir or `None` to use the default
Returns:
`fsnative`: A path to a directory
Raises:
EnvironmentError
Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative` path.
"""
suffix = fsnative() if suffix is None else path2fsn(suffix)
prefix = gettempprefix() if prefix is None else path2fsn(prefix)
dir = gettempdir() if dir is None else path2fsn(dir)
return tempfile.mkdtemp(suffix, prefix, dir)
def grab(bbox=None):
if sys.platform == "darwin":
f, file = tempfile.mkstemp('.png')
os.close(f)
subprocess.call(['screencapture', '-x', file])
im = Image.open(file)
im.load()
os.unlink(file)
else:
size, data = grabber()
im = Image.frombytes(
"RGB", size, data,
# RGB, 32-bit line padding, origo in lower left corner
"raw", "BGR", (size[0]*3 + 3) & -4, -1
)
if bbox:
im = im.crop(bbox)
return im
def _openDownloadFile(self, buildId, suffix):
(tmpFd, tmpName) = mkstemp()
url = self._makeUrl(buildId, suffix)
try:
os.close(tmpFd)
env = { k:v for (k,v) in os.environ.items() if k in self.__whiteList }
env["BOB_LOCAL_ARTIFACT"] = tmpName
env["BOB_REMOTE_ARTIFACT"] = url
ret = subprocess.call(["/bin/bash", "-ec", self.__downloadCmd],
stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
cwd="/tmp", env=env)
if ret == 0:
ret = tmpName
tmpName = None
return CustomDownloader(ret)
else:
raise ArtifactDownloadError("failed (exit {})".format(ret))
finally:
if tmpName is not None: os.unlink(tmpName)
def grab(bbox=None):
if sys.platform == "darwin":
fh, filepath = tempfile.mkstemp('.png')
os.close(fh)
subprocess.call(['screencapture', '-x', filepath])
im = Image.open(filepath)
im.load()
os.unlink(filepath)
else:
size, data = grabber()
im = Image.frombytes(
"RGB", size, data,
# RGB, 32-bit line padding, origin lower left corner
"raw", "BGR", (size[0]*3 + 3) & -4, -1
)
if bbox:
im = im.crop(bbox)
return im
def _dump(self, file=None, format=None):
import tempfile
suffix = ''
if format:
suffix = '.'+format
if not file:
f, file = tempfile.mkstemp(suffix)
os.close(f)
self.load()
if not format or format == "PPM":
self.im.save_ppm(file)
else:
if not file.endswith(format):
file = file + "." + format
self.save(file, format)
return file
def _compile_module_file(template, text, filename, outputpath, module_writer):
source, lexer = _compile(template, text, filename,
generate_magic_comment=True)
if isinstance(source, compat.text_type):
source = source.encode(lexer.encoding or 'ascii')
if module_writer:
module_writer(source, outputpath)
else:
# make tempfiles in the same location as the ultimate
# location. this ensures they're on the same filesystem,
# avoiding synchronization issues.
(dest, name) = tempfile.mkstemp(dir=os.path.dirname(outputpath))
os.write(dest, source)
os.close(dest)
shutil.move(name, outputpath)
def set(self, key, value, timeout=None):
if timeout is None:
timeout = int(time() + self.default_timeout)
elif timeout != 0:
timeout = int(time() + timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def test_transfer_with_cli(self):
test_instances = [
TestUrlInstance(url="/data1", data=b"data1"),
TestUrlInstance(url="/data2", data=b"data2")
]
self.httpd.test_instances = test_instances
try:
fd, filename = tempfile.mkstemp()
os.close(fd)
cmd = [TestRequestHandler.ticket_url, "-O", filename]
parser = cli.get_htsget_parser()
args = parser.parse_args(cmd)
with mock.patch("sys.exit") as mocked_exit:
cli.run(args)
mocked_exit.assert_called_once_with(0)
all_data = b"".join(test_instance.data for test_instance in test_instances)
with open(filename, "rb") as f:
self.assertEqual(f.read(), all_data)
finally:
os.unlink(filename)
def create_thumb_js(self, mode=None, pth=None):
""" Create the thumbnail using SmartCrop.js """
if pth is None:
raise ValueError("path can't be None")
# save a copy of the image with the correct orientation in a temporary
# file
_, tmpfname = tempfile.mkstemp(suffix='.'+settings.output_format)
self.original_image.save(tmpfname, quality=95)
# Load smartcrop and set options
nwidth, nheight = self.resize_dims(mode)
logging.info("[%s] SmartCrop.js new dimensions: %ix%i" % (self.name,
nwidth, nheight))
command = [settings.smartcrop_js_path, '--width', str(nwidth),
'--height', str(nheight), tmpfname, pth]
logging.info("[%s] SmartCrop.js running crop command." % self.name)
check_output(command)
# remove the temporary file
os.remove(tmpfname)
return pth
def Set(self,key,data):
path = self._GetPath(key)
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
if not os.path.isdir(directory):
raise _FileCacheError('%s exists but is not a directory' % directory)
temp_fd, temp_path = tempfile.mkstemp()
temp_fp = os.fdopen(temp_fd, 'w')
temp_fp.write(data)
temp_fp.close()
if not path.startswith(self._root_directory):
raise _FileCacheError('%s does not appear to live under %s' %
(path, self._root_directory))
if os.path.exists(path):
os.remove(path)
os.rename(temp_path, path)
def setUp(self):
# CRN used in this test
self.crn_rxn = 'A + B -> A + D\n'
# Establish random temporary filenames
self.tdir = mkdtemp(prefix='piperine_test')
fid, self.basename = mkstemp(dir=self.tdir)
os.close(fid)
endings = ['.crn', '.fixed', '.pil', '.mfe', '{}_strands.txt', '{}.seqs']
self.filenames = [ self.basename + suf for suf in endings ]
self.crn, self.fixed, self.pil, self.mfe, self.strands, self.seqs = self.filenames
fid, self.fixedscore = mkstemp(suffix='_fixed_score.csv', dir=self.tdir)
os.close(fid)
fid, self.reportfile = mkstemp(dir=self.tdir)
os.close(fid)
# Write CRN to basename.crn
with open(self.crn, 'w') as f:
f.write(self.crn_rxn)
# Modules and module strings for import tests
proc = subprocess.Popen(['piperine-design {} -n 3 -D -q'.format(self.crn)], stdout=subprocess.PIPE, shell=True)
(out, err) = proc.communicate()
self.ef = energetics.energyfuncs(targetdG=7.7)
self.trans = translation
def exec_response_command(self, cmd, **kw):
# not public yet
try:
tmp = None
if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192:
program = cmd[0] #unquoted program name, otherwise exec_command will fail
cmd = [self.quote_response_command(x) for x in cmd]
(fd, tmp) = tempfile.mkstemp()
os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode())
os.close(fd)
cmd = [program, '@' + tmp]
# no return here, that's on purpose
ret = self.generator.bld.exec_command(cmd, **kw)
finally:
if tmp:
try:
os.remove(tmp)
except OSError:
pass # anti-virus and indexers can keep the files open -_-
return ret
########## stupid evil command modification: concatenate the tokens /Fx, /doc, and /x: with the next token
def exec_response_command(self, cmd, **kw):
# not public yet
try:
tmp = None
if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192:
program = cmd[0] #unquoted program name, otherwise exec_command will fail
cmd = [self.quote_response_command(x) for x in cmd]
(fd, tmp) = tempfile.mkstemp()
os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode())
os.close(fd)
cmd = [program, '@' + tmp]
# no return here, that's on purpose
ret = super(self.__class__, self).exec_command(cmd, **kw)
finally:
if tmp:
try:
os.remove(tmp)
except OSError:
pass # anti-virus and indexers can keep the files open -_-
return ret
def get_pkg_dir(self, pkgname, pkgver, subdir):
pkgdir = os.path.join(get_distnet_cache(), pkgname, pkgver)
if not os.path.isdir(pkgdir):
os.makedirs(pkgdir)
target = os.path.join(pkgdir, subdir)
if os.path.exists(target):
return target
(fd, tmp) = tempfile.mkstemp(dir=pkgdir)
try:
os.close(fd)
self.download_to_file(pkgname, pkgver, subdir, tmp)
if subdir == REQUIRES:
os.rename(tmp, target)
else:
self.extract_tar(subdir, pkgdir, tmp)
finally:
try:
os.remove(tmp)
except OSError:
pass
return target
def exec_response_command(self, cmd, **kw):
# not public yet
try:
tmp = None
if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192:
program = cmd[0] #unquoted program name, otherwise exec_command will fail
cmd = [self.quote_response_command(x) for x in cmd]
(fd, tmp) = tempfile.mkstemp()
os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode())
os.close(fd)
cmd = [program, '@' + tmp]
# no return here, that's on purpose
ret = super(self.__class__, self).exec_command(cmd, **kw)
finally:
if tmp:
try:
os.remove(tmp)
except OSError:
pass # anti-virus and indexers can keep the files open -_-
return ret
def get_pkg_dir(self, pkgname, pkgver, subdir):
pkgdir = os.path.join(get_distnet_cache(), pkgname, pkgver)
if not os.path.isdir(pkgdir):
os.makedirs(pkgdir)
target = os.path.join(pkgdir, subdir)
if os.path.exists(target):
return target
(fd, tmp) = tempfile.mkstemp(dir=pkgdir)
try:
os.close(fd)
self.download_to_file(pkgname, pkgver, subdir, tmp)
if subdir == REQUIRES:
os.rename(tmp, target)
else:
self.extract_tar(subdir, pkgdir, tmp)
finally:
try:
os.remove(tmp)
except OSError:
pass
return target
def exec_response_command(self, cmd, **kw):
# not public yet
try:
tmp = None
if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192:
program = cmd[0] #unquoted program name, otherwise exec_command will fail
cmd = [self.quote_response_command(x) for x in cmd]
(fd, tmp) = tempfile.mkstemp()
os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode())
os.close(fd)
cmd = [program, '@' + tmp]
# no return here, that's on purpose
ret = self.generator.bld.exec_command(cmd, **kw)
finally:
if tmp:
try:
os.remove(tmp)
except OSError:
pass # anti-virus and indexers can keep the files open -_-
return ret
########## stupid evil command modification: concatenate the tokens /Fx, /doc, and /x: with the next token
def get_pkg_dir(self, pkgname, pkgver, subdir):
pkgdir = os.path.join(get_distnet_cache(), pkgname, pkgver)
if not os.path.isdir(pkgdir):
os.makedirs(pkgdir)
target = os.path.join(pkgdir, subdir)
if os.path.exists(target):
return target
(fd, tmp) = tempfile.mkstemp(dir=pkgdir)
try:
os.close(fd)
self.download_to_file(pkgname, pkgver, subdir, tmp)
if subdir == REQUIRES:
os.rename(tmp, target)
else:
self.extract_tar(subdir, pkgdir, tmp)
finally:
try:
os.remove(tmp)
except OSError:
pass
return target
def open_compressed(filename, open_flag='r', compression_type='bz2'):
"""Opens a compressed HDF5File with the given opening flags.
For the 'r' flag, the given compressed file will be extracted to a local space.
For 'w', an empty HDF5File is created.
In any case, the opened HDF5File is returned, which needs to be closed using the close_compressed() function.
"""
# create temporary HDF5 file name
hdf5_file_name = tempfile.mkstemp('.hdf5', 'bob_')[1]
if open_flag == 'r':
# extract the HDF5 file from the given file name into a temporary file name
tar = tarfile.open(filename, mode="r:" + compression_type)
memory_file = tar.extractfile(tar.next())
real_file = open(hdf5_file_name, 'wb')
real_file.write(memory_file.read())
del memory_file
real_file.close()
tar.close()
return bob.io.base.HDF5File(hdf5_file_name, open_flag)
def set(self, key, value, timeout=None):
if timeout is None:
timeout = int(time() + self.default_timeout)
elif timeout != 0:
timeout = int(time() + timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def __init__(self, path, threaded=True, timeout=None):
"""
>>> lock = SQLiteLockFile('somefile')
>>> lock = SQLiteLockFile('somefile', threaded=False)
"""
LockBase.__init__(self, path, threaded, timeout)
self.lock_file = unicode(self.lock_file)
self.unique_name = unicode(self.unique_name)
if SQLiteLockFile.testdb is None:
import tempfile
_fd, testdb = tempfile.mkstemp()
os.close(_fd)
os.unlink(testdb)
del _fd, tempfile
SQLiteLockFile.testdb = testdb
import sqlite3
self.connection = sqlite3.connect(SQLiteLockFile.testdb)
c = self.connection.cursor()
try:
c.execute("create table locks"
"("
" lock_file varchar(32),"
" unique_name varchar(32)"
")")
except sqlite3.OperationalError:
pass
else:
self.connection.commit()
import atexit
atexit.register(os.unlink, SQLiteLockFile.testdb)
def get_tpm_rand_block(size=4096):
global warned
randpath = None
try:
#make a temp file for the output
randfd,randpath = tempfile.mkstemp()
command = "getrandom -size %d -out %s" % (size,randpath)
tpm_exec.run(command)
# read in the quote
f = open(randpath,"rb")
rand = f.read()
f.close()
os.close(randfd)
except Exception as e:
if not warned:
logger.warn("TPM randomness not available: %s"%e)
warned=True
return []
finally:
if randpath is not None:
os.remove(randpath)
return rand
def write_key_nvram(key):
if common.STUB_TPM:
storage = open("tpm_nvram","wb")
storage.write(key)
storage.close()
return
owner_pw = tpm_initialize.get_tpm_metadata('owner_pw')
keyFile = None
try:
# write out quote
keyfd,keypath = tempfile.mkstemp()
keyFile = open(keypath,"wb")
keyFile.write(key)
keyFile.close()
os.close(keyfd)
tpm_exec.run("nv_definespace -pwdo %s -in 1 -sz %d -pwdd %s -per 40004"%(owner_pw,common.BOOTSTRAP_KEY_SIZE,owner_pw))
tpm_exec.run("nv_writevalue -pwdd %s -in 1 -if %s"%(owner_pw,keyFile.name))
finally:
if keyFile is not None:
os.remove(keyFile.name)
return
def test_ownerpw(owner_pw,reentry=False):
tmppath = None
try:
#make a temp file for the output
_,tmppath = tempfile.mkstemp()
(output,code) = tpm_exec.run("getpubek -pwdo %s -ok %s"%(owner_pw,tmppath),raiseOnError=False)
if code!=tpm_exec.EXIT_SUCESS:
if len(output)>0 and output[0].startswith("Error Authentication failed (Incorrect Password) from TPM_OwnerReadPubek"):
return False
elif len(output)>0 and output[0].startswith("Error Defend lock running from TPM_OwnerReadPubek"):
if reentry:
logger.error("Unable to unlock TPM")
return False
# tpm got locked. lets try to unlock it
logger.error("TPM is locked from too many invalid owner password attempts, attempting to unlock with password: %s"%owner_pw)
# i have no idea why, but runnig this twice seems to actually work
tpm_exec.run("resetlockvalue -pwdo %s"%owner_pw,raiseOnError=False)
tpm_exec.run("resetlockvalue -pwdo %s"%owner_pw,raiseOnError=False)
return test_ownerpw(owner_pw,True)
else:
raise Exception("test ownerpw, getpubek failed with code "+str(code)+": "+str(output))
finally:
if tmppath is not None:
os.remove(tmppath)
return True
def get_pub_ek(): # assumes that owner_pw is correct at this point
owner_pw = get_tpm_metadata('owner_pw')
tmppath = None
try:
#make a temp file for the output
tmpfd,tmppath = tempfile.mkstemp()
(output,code) = tpm_exec.run("getpubek -pwdo %s -ok %s"%(owner_pw,tmppath),raiseOnError=False) # generates pubek.pem
if code!=tpm_exec.EXIT_SUCESS:
raise Exception("getpubek failed with code "+str(code)+": "+str(output))
# read in the output
f = open(tmppath,"rb")
ek = f.read()
f.close()
os.close(tmpfd)
finally:
if tmppath is not None:
os.remove(tmppath)
set_tpm_metadata('ek',ek)
def write_to_tempfile(content, path=None, suffix='', prefix='tmp'):
"""Create temporary file or use existing file.
This util is needed for creating temporary file with
specified content, suffix and prefix. If path is not None,
it will be used for writing content. If the path doesn't
exist it'll be created.
:param content: content for temporary file.
:param path: same as parameter 'dir' for mkstemp
:param suffix: same as parameter 'suffix' for mkstemp
:param prefix: same as parameter 'prefix' for mkstemp
For example: it can be used in database tests for creating
configuration files.
"""
if path:
ensure_tree(path)
(fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix)
try:
os.write(fd, content)
finally:
os.close(fd)
return path
def _dump(self, file=None, format=None, **options):
import tempfile
suffix = ''
if format:
suffix = '.'+format
if not file:
f, file = tempfile.mkstemp(suffix)
os.close(f)
self.load()
if not format or format == "PPM":
self.im.save_ppm(file)
else:
if not file.endswith(format):
file = file + "." + format
self.save(file, format, **options)
return file
def convert(cls, report, data):
"converts the report data to another mimetype if necessary"
input_format = report.template_extension
output_format = report.extension or report.template_extension
if output_format in MIMETYPES:
return output_format, data
fd, path = tempfile.mkstemp(suffix=(os.extsep + input_format),
prefix='trytond_')
oext = FORMAT2EXT.get(output_format, output_format)
with os.fdopen(fd, 'wb+') as fp:
fp.write(data)
cmd = ['unoconv', '--connection=%s' % config.get('report', 'unoconv'),
'-f', oext, '--stdout', path]
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
stdoutdata, stderrdata = proc.communicate()
if proc.wait() != 0:
raise Exception(stderrdata)
return oext, stdoutdata
finally:
os.remove(path)
def test_basic_config():
fd, path = tempfile.mkstemp()
f = os.fdopen(fd,'w')
f.write(yaml.dump(testcfg))
f.flush()
cfg = ny.get_config(path)
ny.write_supervisor_conf()
config = ConfigParser.ConfigParser()
config.readfp(open(cfg['supervisor.conf']))
# from IPython import embed
# embed()
print(config.get('program:testtunnel','command'))
assert 'sshuttle -r 1.1.1.1 2.2.2.2 -x 3.3.3.3' in config.get('program:testtunnel','command')
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
self._createdir() # Cache dir can be deleted at any time.
fname = self._key_to_file(key, version)
self._cull() # make some room if necessary
fd, tmp_path = tempfile.mkstemp(dir=self._dir)
renamed = False
try:
with io.open(fd, 'wb') as f:
expiry = self.get_backend_timeout(timeout)
f.write(pickle.dumps(expiry, -1))
f.write(zlib.compress(pickle.dumps(value), -1))
file_move_safe(tmp_path, fname, allow_overwrite=True)
renamed = True
finally:
if not renamed:
os.remove(tmp_path)
def set(self, key, value, timeout=None):
if timeout is None:
timeout = int(time() + self.default_timeout)
elif timeout != 0:
timeout = int(time() + timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def create_temporary_ca_file(anchor_list):
"""
Concatenate all the certificates (PEM format for the export) in
'anchor_list' and write the result to file to a temporary file
using mkstemp() from tempfile module. On success 'filename' is
returned, None otherwise.
If you are used to OpenSSL tools, this function builds a CAfile
that can be used for certificate and CRL check.
Also see create_temporary_ca_file().
"""
try:
f, fname = tempfile.mkstemp()
for a in anchor_list:
s = a.output(fmt="PEM")
l = os.write(f, s)
os.close(f)
except:
return None
return fname
def validate(self, inline=False):
"""Validate workflow object.
This method currently validates the workflow object with the use of
cwltool. It writes the workflow to a tmp CWL file, reads it, validates
it and removes the tmp file again. By default, the workflow is written
to file using absolute paths to the steps. Optionally, the steps can be
saved inline.
"""
# define tmpfile
(fd, tmpfile) = tempfile.mkstemp()
os.close(fd)
try:
# save workflow object to tmpfile,
# do not recursively call validate function
self.save(tmpfile, inline=inline, validate=False, relative=False,
wd=False)
# load workflow from tmpfile
document_loader, processobj, metadata, uri = load_cwl(tmpfile)
finally:
# cleanup tmpfile
os.remove(tmpfile)
def _pack(self, fname, encoding):
"""Save workflow with ``--pack`` option
This means that al tools and subworkflows are included in the workflow
file that is created. A packed workflow cannot be loaded and used in
scriptcwl.
"""
(fd, tmpfile) = tempfile.mkstemp()
os.close(fd)
try:
self.save(tmpfile, validate=False, wd=False, inline=False,
relative=False, pack=False)
document_loader, processobj, metadata, uri = load_cwl(tmpfile)
finally:
# cleanup tmpfile
os.remove(tmpfile)
with codecs.open(fname, 'wb', encoding=encoding) as f:
f.write(print_pack(document_loader, processobj, uri, metadata))
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
self._createdir() # Cache dir can be deleted at any time.
fname = self._key_to_file(key, version)
self._cull() # make some room if necessary
fd, tmp_path = tempfile.mkstemp(dir=self._dir)
renamed = False
try:
with io.open(fd, 'wb') as f:
expiry = self.get_backend_timeout(timeout)
f.write(pickle.dumps(expiry, pickle.HIGHEST_PROTOCOL))
f.write(zlib.compress(pickle.dumps(value, pickle.HIGHEST_PROTOCOL)))
file_move_safe(tmp_path, fname, allow_overwrite=True)
renamed = True
finally:
if not renamed:
os.remove(tmp_path)
def create_temporary_ca_file(anchor_list):
"""
Concatenate all the certificates (PEM format for the export) in
'anchor_list' and write the result to file to a temporary file
using mkstemp() from tempfile module. On success 'filename' is
returned, None otherwise.
If you are used to OpenSSL tools, this function builds a CAfile
that can be used for certificate and CRL check.
Also see create_temporary_ca_file().
"""
try:
f, fname = tempfile.mkstemp()
for a in anchor_list:
s = a.output(fmt="PEM")
l = os.write(f, s)
os.close(f)
except:
return None
return fname
def _findLib_gcc(name):
expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)
fdout, ccout = tempfile.mkstemp()
os.close(fdout)
cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \
'LANG=C LC_ALL=C $CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name
try:
f = os.popen(cmd)
try:
trace = f.read()
finally:
rv = f.close()
finally:
try:
os.unlink(ccout)
except OSError, e:
if e.errno != errno.ENOENT:
raise
if rv == 10:
raise OSError, 'gcc or cc command not found'
res = re.search(expr, trace)
if not res:
return None
return res.group(0)
def mktemp(self, *args, **kwds):
"""create temp file that's cleaned up at end of test"""
self.require_writeable_filesystem()
fd, path = tempfile.mkstemp(*args, **kwds)
os.close(fd)
queue = self._mktemp_queue
if queue is None:
queue = self._mktemp_queue = []
def cleaner():
for path in queue:
if os.path.exists(path):
os.remove(path)
del queue[:]
self.addCleanup(cleaner)
queue.append(path)
return path
def set(self, key, value, timeout=None):
timeout = self._normalize_timeout(timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def set(self, key, value, timeout=None):
timeout = self._normalize_timeout(timeout)
filename = self._get_filename(key)
self._prune()
try:
fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
dir=self._path)
with os.fdopen(fd, 'wb') as f:
pickle.dump(timeout, f, 1)
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
rename(tmp, filename)
os.chmod(filename, self._mode)
except (IOError, OSError):
return False
else:
return True
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
crypto = _get_openssl_crypto_module()
import tempfile
import atexit
cert, pkey = generate_adhoc_ssl_pair()
cert_handle, cert_file = tempfile.mkstemp()
pkey_handle, pkey_file = tempfile.mkstemp()
atexit.register(os.remove, pkey_file)
atexit.register(os.remove, cert_file)
os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
os.close(cert_handle)
os.close(pkey_handle)
ctx = load_ssl_context(cert_file, pkey_file)
return ctx
def test_ffi_buffer_with_file(self):
import tempfile, os, array
fd, filename = tempfile.mkstemp()
f = os.fdopen(fd, 'r+b')
a = ffi.new("int[]", list(range(1005)))
try:
ffi.buffer(a, 512)
except NotImplementedError as e:
py.test.skip(str(e))
f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
f.seek(0)
assert f.read() == array.array('i', range(1000)).tostring()
f.seek(0)
b = ffi.new("int[]", 1005)
f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
f.close()
os.unlink(filename)
def test_ffi_buffer_with_file(self):
ffi = FFI(backend=self.Backend())
import tempfile, os, array
fd, filename = tempfile.mkstemp()
f = os.fdopen(fd, 'r+b')
a = ffi.new("int[]", list(range(1005)))
try:
ffi.buffer(a, 512)
except NotImplementedError as e:
py.test.skip(str(e))
f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
f.seek(0)
assert f.read() == array.array('i', range(1000)).tostring()
f.seek(0)
b = ffi.new("int[]", 1005)
f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
f.close()
os.unlink(filename)
def __init__(self, executable_path, port=0, service_args=None, log_path=None):
"""
Creates a new instance of the Service
:Args:
- executable_path : Path to PhantomJS binary
- port : Port the service is running on
- service_args : A List of other command line options to pass to PhantomJS
- log_path: Path for PhantomJS service to log to
"""
self.service_args= service_args
if self.service_args is None:
self.service_args = []
else:
self.service_args=service_args[:]
if not log_path:
log_path = "ghostdriver.log"
if not self._args_contain("--cookies-file="):
self._cookie_temp_file = tempfile.mkstemp()[1]
self.service_args.append("--cookies-file=" + self._cookie_temp_file)
else:
self._cookie_temp_file = None
service.Service.__init__(self, executable_path, port=port, log_file=open(log_path, 'w'))