我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.mkstemp()。
def empty_db(self): cmd = [ "mysqldump", "-u%(user)s" % self.db_config, "-h%(host)s" % self.db_config, "--add_drop-table", "--no-data", "%(name)s" % self.db_config, ] tmphandle, tmppath = tempfile.mkstemp(text=True) tmpfile = os.fdopen(tmphandle, "w") sql_data = subprocess.check_output(cmd, stderr=None).split('\n') tmpfile.write("SET FOREIGN_KEY_CHECKS = 0;\n") tmpfile.write("use %(name)s;\n" % self.db_config) for line in sql_data: if line.startswith("DROP"): tmpfile.write(line + '\n') tmpfile.close() self._run_mysql_cmd("source %s" % tmppath) os.remove(tmppath)
def _dump(self, file=None, format=None): import tempfile suffix = '' if format: suffix = '.'+format if not file: f, file = tempfile.mkstemp(suffix) os.close(f) self.load() if not format or format == "PPM": self.im.save_ppm(file) else: if not file.endswith(format): file = file + "." + format self.save(file, format) return file
def mkstemp(suffix=None, prefix=None, dir=None, text=False): """ Args: suffix (`pathlike` or `None`): suffix or `None` to use the default prefix (`pathlike` or `None`): prefix or `None` to use the default dir (`pathlike` or `None`): temp dir or `None` to use the default text (bool): if the file should be opened in text mode Returns: Tuple[`int`, `fsnative`]: A tuple containing the file descriptor and the file path Raises: EnvironmentError Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative` path. """ suffix = fsnative() if suffix is None else path2fsn(suffix) prefix = gettempprefix() if prefix is None else path2fsn(prefix) dir = gettempdir() if dir is None else path2fsn(dir) return tempfile.mkstemp(suffix, prefix, dir, text)
def mkdtemp(suffix=None, prefix=None, dir=None): """ Args: suffix (`pathlike` or `None`): suffix or `None` to use the default prefix (`pathlike` or `None`): prefix or `None` to use the default dir (`pathlike` or `None`): temp dir or `None` to use the default Returns: `fsnative`: A path to a directory Raises: EnvironmentError Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative` path. """ suffix = fsnative() if suffix is None else path2fsn(suffix) prefix = gettempprefix() if prefix is None else path2fsn(prefix) dir = gettempdir() if dir is None else path2fsn(dir) return tempfile.mkdtemp(suffix, prefix, dir)
def grab(bbox=None): if sys.platform == "darwin": f, file = tempfile.mkstemp('.png') os.close(f) subprocess.call(['screencapture', '-x', file]) im = Image.open(file) im.load() os.unlink(file) else: size, data = grabber() im = Image.frombytes( "RGB", size, data, # RGB, 32-bit line padding, origo in lower left corner "raw", "BGR", (size[0]*3 + 3) & -4, -1 ) if bbox: im = im.crop(bbox) return im
def _openDownloadFile(self, buildId, suffix): (tmpFd, tmpName) = mkstemp() url = self._makeUrl(buildId, suffix) try: os.close(tmpFd) env = { k:v for (k,v) in os.environ.items() if k in self.__whiteList } env["BOB_LOCAL_ARTIFACT"] = tmpName env["BOB_REMOTE_ARTIFACT"] = url ret = subprocess.call(["/bin/bash", "-ec", self.__downloadCmd], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, cwd="/tmp", env=env) if ret == 0: ret = tmpName tmpName = None return CustomDownloader(ret) else: raise ArtifactDownloadError("failed (exit {})".format(ret)) finally: if tmpName is not None: os.unlink(tmpName)
def grab(bbox=None): if sys.platform == "darwin": fh, filepath = tempfile.mkstemp('.png') os.close(fh) subprocess.call(['screencapture', '-x', filepath]) im = Image.open(filepath) im.load() os.unlink(filepath) else: size, data = grabber() im = Image.frombytes( "RGB", size, data, # RGB, 32-bit line padding, origin lower left corner "raw", "BGR", (size[0]*3 + 3) & -4, -1 ) if bbox: im = im.crop(bbox) return im
def _compile_module_file(template, text, filename, outputpath, module_writer): source, lexer = _compile(template, text, filename, generate_magic_comment=True) if isinstance(source, compat.text_type): source = source.encode(lexer.encoding or 'ascii') if module_writer: module_writer(source, outputpath) else: # make tempfiles in the same location as the ultimate # location. this ensures they're on the same filesystem, # avoiding synchronization issues. (dest, name) = tempfile.mkstemp(dir=os.path.dirname(outputpath)) os.write(dest, source) os.close(dest) shutil.move(name, outputpath)
def set(self, key, value, timeout=None): if timeout is None: timeout = int(time() + self.default_timeout) elif timeout != 0: timeout = int(time() + timeout) filename = self._get_filename(key) self._prune() try: fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix, dir=self._path) with os.fdopen(fd, 'wb') as f: pickle.dump(timeout, f, 1) pickle.dump(value, f, pickle.HIGHEST_PROTOCOL) rename(tmp, filename) os.chmod(filename, self._mode) except (IOError, OSError): return False else: return True
def test_transfer_with_cli(self): test_instances = [ TestUrlInstance(url="/data1", data=b"data1"), TestUrlInstance(url="/data2", data=b"data2") ] self.httpd.test_instances = test_instances try: fd, filename = tempfile.mkstemp() os.close(fd) cmd = [TestRequestHandler.ticket_url, "-O", filename] parser = cli.get_htsget_parser() args = parser.parse_args(cmd) with mock.patch("sys.exit") as mocked_exit: cli.run(args) mocked_exit.assert_called_once_with(0) all_data = b"".join(test_instance.data for test_instance in test_instances) with open(filename, "rb") as f: self.assertEqual(f.read(), all_data) finally: os.unlink(filename)
def create_thumb_js(self, mode=None, pth=None): """ Create the thumbnail using SmartCrop.js """ if pth is None: raise ValueError("path can't be None") # save a copy of the image with the correct orientation in a temporary # file _, tmpfname = tempfile.mkstemp(suffix='.'+settings.output_format) self.original_image.save(tmpfname, quality=95) # Load smartcrop and set options nwidth, nheight = self.resize_dims(mode) logging.info("[%s] SmartCrop.js new dimensions: %ix%i" % (self.name, nwidth, nheight)) command = [settings.smartcrop_js_path, '--width', str(nwidth), '--height', str(nheight), tmpfname, pth] logging.info("[%s] SmartCrop.js running crop command." % self.name) check_output(command) # remove the temporary file os.remove(tmpfname) return pth
def Set(self,key,data): path = self._GetPath(key) directory = os.path.dirname(path) if not os.path.exists(directory): os.makedirs(directory) if not os.path.isdir(directory): raise _FileCacheError('%s exists but is not a directory' % directory) temp_fd, temp_path = tempfile.mkstemp() temp_fp = os.fdopen(temp_fd, 'w') temp_fp.write(data) temp_fp.close() if not path.startswith(self._root_directory): raise _FileCacheError('%s does not appear to live under %s' % (path, self._root_directory)) if os.path.exists(path): os.remove(path) os.rename(temp_path, path)
def setUp(self): # CRN used in this test self.crn_rxn = 'A + B -> A + D\n' # Establish random temporary filenames self.tdir = mkdtemp(prefix='piperine_test') fid, self.basename = mkstemp(dir=self.tdir) os.close(fid) endings = ['.crn', '.fixed', '.pil', '.mfe', '{}_strands.txt', '{}.seqs'] self.filenames = [ self.basename + suf for suf in endings ] self.crn, self.fixed, self.pil, self.mfe, self.strands, self.seqs = self.filenames fid, self.fixedscore = mkstemp(suffix='_fixed_score.csv', dir=self.tdir) os.close(fid) fid, self.reportfile = mkstemp(dir=self.tdir) os.close(fid) # Write CRN to basename.crn with open(self.crn, 'w') as f: f.write(self.crn_rxn) # Modules and module strings for import tests proc = subprocess.Popen(['piperine-design {} -n 3 -D -q'.format(self.crn)], stdout=subprocess.PIPE, shell=True) (out, err) = proc.communicate() self.ef = energetics.energyfuncs(targetdG=7.7) self.trans = translation
def exec_response_command(self, cmd, **kw): # not public yet try: tmp = None if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192: program = cmd[0] #unquoted program name, otherwise exec_command will fail cmd = [self.quote_response_command(x) for x in cmd] (fd, tmp) = tempfile.mkstemp() os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode()) os.close(fd) cmd = [program, '@' + tmp] # no return here, that's on purpose ret = self.generator.bld.exec_command(cmd, **kw) finally: if tmp: try: os.remove(tmp) except OSError: pass # anti-virus and indexers can keep the files open -_- return ret ########## stupid evil command modification: concatenate the tokens /Fx, /doc, and /x: with the next token
def exec_response_command(self, cmd, **kw): # not public yet try: tmp = None if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192: program = cmd[0] #unquoted program name, otherwise exec_command will fail cmd = [self.quote_response_command(x) for x in cmd] (fd, tmp) = tempfile.mkstemp() os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode()) os.close(fd) cmd = [program, '@' + tmp] # no return here, that's on purpose ret = super(self.__class__, self).exec_command(cmd, **kw) finally: if tmp: try: os.remove(tmp) except OSError: pass # anti-virus and indexers can keep the files open -_- return ret
def get_pkg_dir(self, pkgname, pkgver, subdir): pkgdir = os.path.join(get_distnet_cache(), pkgname, pkgver) if not os.path.isdir(pkgdir): os.makedirs(pkgdir) target = os.path.join(pkgdir, subdir) if os.path.exists(target): return target (fd, tmp) = tempfile.mkstemp(dir=pkgdir) try: os.close(fd) self.download_to_file(pkgname, pkgver, subdir, tmp) if subdir == REQUIRES: os.rename(tmp, target) else: self.extract_tar(subdir, pkgdir, tmp) finally: try: os.remove(tmp) except OSError: pass return target
def open_compressed(filename, open_flag='r', compression_type='bz2'): """Opens a compressed HDF5File with the given opening flags. For the 'r' flag, the given compressed file will be extracted to a local space. For 'w', an empty HDF5File is created. In any case, the opened HDF5File is returned, which needs to be closed using the close_compressed() function. """ # create temporary HDF5 file name hdf5_file_name = tempfile.mkstemp('.hdf5', 'bob_')[1] if open_flag == 'r': # extract the HDF5 file from the given file name into a temporary file name tar = tarfile.open(filename, mode="r:" + compression_type) memory_file = tar.extractfile(tar.next()) real_file = open(hdf5_file_name, 'wb') real_file.write(memory_file.read()) del memory_file real_file.close() tar.close() return bob.io.base.HDF5File(hdf5_file_name, open_flag)
def generate_adhoc_ssl_context(): """Generates an adhoc SSL context for the development server.""" crypto = _get_openssl_crypto_module() import tempfile import atexit cert, pkey = generate_adhoc_ssl_pair() cert_handle, cert_file = tempfile.mkstemp() pkey_handle, pkey_file = tempfile.mkstemp() atexit.register(os.remove, pkey_file) atexit.register(os.remove, cert_file) os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)) os.close(cert_handle) os.close(pkey_handle) ctx = load_ssl_context(cert_file, pkey_file) return ctx
def __init__(self, path, threaded=True, timeout=None): """ >>> lock = SQLiteLockFile('somefile') >>> lock = SQLiteLockFile('somefile', threaded=False) """ LockBase.__init__(self, path, threaded, timeout) self.lock_file = unicode(self.lock_file) self.unique_name = unicode(self.unique_name) if SQLiteLockFile.testdb is None: import tempfile _fd, testdb = tempfile.mkstemp() os.close(_fd) os.unlink(testdb) del _fd, tempfile SQLiteLockFile.testdb = testdb import sqlite3 self.connection = sqlite3.connect(SQLiteLockFile.testdb) c = self.connection.cursor() try: c.execute("create table locks" "(" " lock_file varchar(32)," " unique_name varchar(32)" ")") except sqlite3.OperationalError: pass else: self.connection.commit() import atexit atexit.register(os.unlink, SQLiteLockFile.testdb)
def get_tpm_rand_block(size=4096): global warned randpath = None try: #make a temp file for the output randfd,randpath = tempfile.mkstemp() command = "getrandom -size %d -out %s" % (size,randpath) tpm_exec.run(command) # read in the quote f = open(randpath,"rb") rand = f.read() f.close() os.close(randfd) except Exception as e: if not warned: logger.warn("TPM randomness not available: %s"%e) warned=True return [] finally: if randpath is not None: os.remove(randpath) return rand
def write_key_nvram(key): if common.STUB_TPM: storage = open("tpm_nvram","wb") storage.write(key) storage.close() return owner_pw = tpm_initialize.get_tpm_metadata('owner_pw') keyFile = None try: # write out quote keyfd,keypath = tempfile.mkstemp() keyFile = open(keypath,"wb") keyFile.write(key) keyFile.close() os.close(keyfd) tpm_exec.run("nv_definespace -pwdo %s -in 1 -sz %d -pwdd %s -per 40004"%(owner_pw,common.BOOTSTRAP_KEY_SIZE,owner_pw)) tpm_exec.run("nv_writevalue -pwdd %s -in 1 -if %s"%(owner_pw,keyFile.name)) finally: if keyFile is not None: os.remove(keyFile.name) return
def test_ownerpw(owner_pw,reentry=False): tmppath = None try: #make a temp file for the output _,tmppath = tempfile.mkstemp() (output,code) = tpm_exec.run("getpubek -pwdo %s -ok %s"%(owner_pw,tmppath),raiseOnError=False) if code!=tpm_exec.EXIT_SUCESS: if len(output)>0 and output[0].startswith("Error Authentication failed (Incorrect Password) from TPM_OwnerReadPubek"): return False elif len(output)>0 and output[0].startswith("Error Defend lock running from TPM_OwnerReadPubek"): if reentry: logger.error("Unable to unlock TPM") return False # tpm got locked. lets try to unlock it logger.error("TPM is locked from too many invalid owner password attempts, attempting to unlock with password: %s"%owner_pw) # i have no idea why, but runnig this twice seems to actually work tpm_exec.run("resetlockvalue -pwdo %s"%owner_pw,raiseOnError=False) tpm_exec.run("resetlockvalue -pwdo %s"%owner_pw,raiseOnError=False) return test_ownerpw(owner_pw,True) else: raise Exception("test ownerpw, getpubek failed with code "+str(code)+": "+str(output)) finally: if tmppath is not None: os.remove(tmppath) return True
def get_pub_ek(): # assumes that owner_pw is correct at this point owner_pw = get_tpm_metadata('owner_pw') tmppath = None try: #make a temp file for the output tmpfd,tmppath = tempfile.mkstemp() (output,code) = tpm_exec.run("getpubek -pwdo %s -ok %s"%(owner_pw,tmppath),raiseOnError=False) # generates pubek.pem if code!=tpm_exec.EXIT_SUCESS: raise Exception("getpubek failed with code "+str(code)+": "+str(output)) # read in the output f = open(tmppath,"rb") ek = f.read() f.close() os.close(tmpfd) finally: if tmppath is not None: os.remove(tmppath) set_tpm_metadata('ek',ek)
def write_to_tempfile(content, path=None, suffix='', prefix='tmp'): """Create temporary file or use existing file. This util is needed for creating temporary file with specified content, suffix and prefix. If path is not None, it will be used for writing content. If the path doesn't exist it'll be created. :param content: content for temporary file. :param path: same as parameter 'dir' for mkstemp :param suffix: same as parameter 'suffix' for mkstemp :param prefix: same as parameter 'prefix' for mkstemp For example: it can be used in database tests for creating configuration files. """ if path: ensure_tree(path) (fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix) try: os.write(fd, content) finally: os.close(fd) return path
def _dump(self, file=None, format=None, **options): import tempfile suffix = '' if format: suffix = '.'+format if not file: f, file = tempfile.mkstemp(suffix) os.close(f) self.load() if not format or format == "PPM": self.im.save_ppm(file) else: if not file.endswith(format): file = file + "." + format self.save(file, format, **options) return file
def convert(cls, report, data): "converts the report data to another mimetype if necessary" input_format = report.template_extension output_format = report.extension or report.template_extension if output_format in MIMETYPES: return output_format, data fd, path = tempfile.mkstemp(suffix=(os.extsep + input_format), prefix='trytond_') oext = FORMAT2EXT.get(output_format, output_format) with os.fdopen(fd, 'wb+') as fp: fp.write(data) cmd = ['unoconv', '--connection=%s' % config.get('report', 'unoconv'), '-f', oext, '--stdout', path] try: proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) stdoutdata, stderrdata = proc.communicate() if proc.wait() != 0: raise Exception(stderrdata) return oext, stdoutdata finally: os.remove(path)
def test_basic_config(): fd, path = tempfile.mkstemp() f = os.fdopen(fd,'w') f.write(yaml.dump(testcfg)) f.flush() cfg = ny.get_config(path) ny.write_supervisor_conf() config = ConfigParser.ConfigParser() config.readfp(open(cfg['supervisor.conf'])) # from IPython import embed # embed() print(config.get('program:testtunnel','command')) assert 'sshuttle -r 1.1.1.1 2.2.2.2 -x 3.3.3.3' in config.get('program:testtunnel','command')
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None): self._createdir() # Cache dir can be deleted at any time. fname = self._key_to_file(key, version) self._cull() # make some room if necessary fd, tmp_path = tempfile.mkstemp(dir=self._dir) renamed = False try: with io.open(fd, 'wb') as f: expiry = self.get_backend_timeout(timeout) f.write(pickle.dumps(expiry, -1)) f.write(zlib.compress(pickle.dumps(value), -1)) file_move_safe(tmp_path, fname, allow_overwrite=True) renamed = True finally: if not renamed: os.remove(tmp_path)
def create_temporary_ca_file(anchor_list): """ Concatenate all the certificates (PEM format for the export) in 'anchor_list' and write the result to file to a temporary file using mkstemp() from tempfile module. On success 'filename' is returned, None otherwise. If you are used to OpenSSL tools, this function builds a CAfile that can be used for certificate and CRL check. Also see create_temporary_ca_file(). """ try: f, fname = tempfile.mkstemp() for a in anchor_list: s = a.output(fmt="PEM") l = os.write(f, s) os.close(f) except: return None return fname
def validate(self, inline=False): """Validate workflow object. This method currently validates the workflow object with the use of cwltool. It writes the workflow to a tmp CWL file, reads it, validates it and removes the tmp file again. By default, the workflow is written to file using absolute paths to the steps. Optionally, the steps can be saved inline. """ # define tmpfile (fd, tmpfile) = tempfile.mkstemp() os.close(fd) try: # save workflow object to tmpfile, # do not recursively call validate function self.save(tmpfile, inline=inline, validate=False, relative=False, wd=False) # load workflow from tmpfile document_loader, processobj, metadata, uri = load_cwl(tmpfile) finally: # cleanup tmpfile os.remove(tmpfile)
def _pack(self, fname, encoding): """Save workflow with ``--pack`` option This means that al tools and subworkflows are included in the workflow file that is created. A packed workflow cannot be loaded and used in scriptcwl. """ (fd, tmpfile) = tempfile.mkstemp() os.close(fd) try: self.save(tmpfile, validate=False, wd=False, inline=False, relative=False, pack=False) document_loader, processobj, metadata, uri = load_cwl(tmpfile) finally: # cleanup tmpfile os.remove(tmpfile) with codecs.open(fname, 'wb', encoding=encoding) as f: f.write(print_pack(document_loader, processobj, uri, metadata))
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None): self._createdir() # Cache dir can be deleted at any time. fname = self._key_to_file(key, version) self._cull() # make some room if necessary fd, tmp_path = tempfile.mkstemp(dir=self._dir) renamed = False try: with io.open(fd, 'wb') as f: expiry = self.get_backend_timeout(timeout) f.write(pickle.dumps(expiry, pickle.HIGHEST_PROTOCOL)) f.write(zlib.compress(pickle.dumps(value, pickle.HIGHEST_PROTOCOL))) file_move_safe(tmp_path, fname, allow_overwrite=True) renamed = True finally: if not renamed: os.remove(tmp_path)
def _findLib_gcc(name): expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name) fdout, ccout = tempfile.mkstemp() os.close(fdout) cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \ 'LANG=C LC_ALL=C $CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name try: f = os.popen(cmd) try: trace = f.read() finally: rv = f.close() finally: try: os.unlink(ccout) except OSError, e: if e.errno != errno.ENOENT: raise if rv == 10: raise OSError, 'gcc or cc command not found' res = re.search(expr, trace) if not res: return None return res.group(0)
def mktemp(self, *args, **kwds): """create temp file that's cleaned up at end of test""" self.require_writeable_filesystem() fd, path = tempfile.mkstemp(*args, **kwds) os.close(fd) queue = self._mktemp_queue if queue is None: queue = self._mktemp_queue = [] def cleaner(): for path in queue: if os.path.exists(path): os.remove(path) del queue[:] self.addCleanup(cleaner) queue.append(path) return path
def set(self, key, value, timeout=None): timeout = self._normalize_timeout(timeout) filename = self._get_filename(key) self._prune() try: fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix, dir=self._path) with os.fdopen(fd, 'wb') as f: pickle.dump(timeout, f, 1) pickle.dump(value, f, pickle.HIGHEST_PROTOCOL) rename(tmp, filename) os.chmod(filename, self._mode) except (IOError, OSError): return False else: return True
def test_ffi_buffer_with_file(self): import tempfile, os, array fd, filename = tempfile.mkstemp() f = os.fdopen(fd, 'r+b') a = ffi.new("int[]", list(range(1005))) try: ffi.buffer(a, 512) except NotImplementedError as e: py.test.skip(str(e)) f.write(ffi.buffer(a, 1000 * ffi.sizeof("int"))) f.seek(0) assert f.read() == array.array('i', range(1000)).tostring() f.seek(0) b = ffi.new("int[]", 1005) f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int"))) assert list(a)[:1000] + [0] * (len(a)-1000) == list(b) f.close() os.unlink(filename)
def test_ffi_buffer_with_file(self): ffi = FFI(backend=self.Backend()) import tempfile, os, array fd, filename = tempfile.mkstemp() f = os.fdopen(fd, 'r+b') a = ffi.new("int[]", list(range(1005))) try: ffi.buffer(a, 512) except NotImplementedError as e: py.test.skip(str(e)) f.write(ffi.buffer(a, 1000 * ffi.sizeof("int"))) f.seek(0) assert f.read() == array.array('i', range(1000)).tostring() f.seek(0) b = ffi.new("int[]", 1005) f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int"))) assert list(a)[:1000] + [0] * (len(a)-1000) == list(b) f.close() os.unlink(filename)
def __init__(self, executable_path, port=0, service_args=None, log_path=None): """ Creates a new instance of the Service :Args: - executable_path : Path to PhantomJS binary - port : Port the service is running on - service_args : A List of other command line options to pass to PhantomJS - log_path: Path for PhantomJS service to log to """ self.service_args= service_args if self.service_args is None: self.service_args = [] else: self.service_args=service_args[:] if not log_path: log_path = "ghostdriver.log" if not self._args_contain("--cookies-file="): self._cookie_temp_file = tempfile.mkstemp()[1] self.service_args.append("--cookies-file=" + self._cookie_temp_file) else: self._cookie_temp_file = None service.Service.__init__(self, executable_path, port=port, log_file=open(log_path, 'w'))