Python os 模块,unlink() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.unlink()。
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def _clean_check(cmd, target):
"""
Run the command to download target. If the command fails, clean up before
re-raising the error.
"""
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
if os.access(target, os.F_OK):
os.unlink(target)
raise
def dot2graph(self, dot, format='svg'):
# windows ???????????????????
# ?????? NamedTemporaryFile ?????
with NamedTemporaryFile(delete=False) as dotfile:
dotfile.write(dot)
outfile = NamedTemporaryFile(delete=False)
os.system('dot -Efontname=sans -Nfontname=sans %s -o%s -T%s' % (
dotfile.name, outfile.name, format))
result = outfile.read()
outfile.close()
os.unlink(dotfile.name)
os.unlink(outfile.name)
return result
def test_rs3topng():
"""rs3 file is converted to PNG"""
png_str = rstviewer.rs3topng(RS3_FILEPATH)
temp = tempfile.NamedTemporaryFile(suffix='.png', delete=False)
temp.close()
rstviewer.rs3topng(RS3_FILEPATH, temp.name)
with open(temp.name, 'r') as png_file:
assert png_str == png_file.read()
os.unlink(temp.name)
# generated images might not be 100% identical, probably
# because of the font used
with open(EXPECTED_PNG1, 'r') as expected_png_file:
ident1 = png_str == expected_png_file.read()
with open(EXPECTED_PNG2, 'r') as expected_png_file:
ident2 = png_str == expected_png_file.read()
assert ident1 or ident2
def test_cli_rs3topng():
"""conversion to PNG on the commandline"""
temp_png = tempfile.NamedTemporaryFile(suffix='.png', delete=False)
temp_png.close()
# calling `rstviewer -f png input.rs3 output.png` will end the program
# with sys.exit(0), so we'll have to catch this here.
with pytest.raises(SystemExit) as serr:
cli(['-f', 'png', RS3_FILEPATH, temp_png.name])
out, err = pytest.capsys.readouterr()
assert err == 0
with open(temp_png.name, 'r') as png_file:
png_str = png_file.read()
os.unlink(temp_png.name)
# generated images might not be 100% identical, probably
# because of the font used
with open(EXPECTED_PNG1, 'r') as expected_png_file:
ident1 = png_str == expected_png_file.read()
with open(EXPECTED_PNG2, 'r') as expected_png_file:
ident2 = png_str == expected_png_file.read()
assert ident1 or ident2
def is_running(name):
"""Test whether task is running under ``name``.
:param name: name of task
:type name: ``unicode``
:returns: ``True`` if task with name ``name`` is running, else ``False``
:rtype: ``Boolean``
"""
pidfile = _pid_file(name)
if not os.path.exists(pidfile):
return False
with open(pidfile, 'rb') as file_obj:
pid = int(file_obj.read().strip())
if _process_exists(pid):
return True
elif os.path.exists(pidfile):
os.unlink(pidfile)
return False
def _delete_directory_contents(self, dirpath, filter_func):
"""Delete all files in a directory.
:param dirpath: path to directory to clear
:type dirpath: ``unicode`` or ``str``
:param filter_func function to determine whether a file shall be
deleted or not.
:type filter_func ``callable``
"""
if os.path.exists(dirpath):
for filename in os.listdir(dirpath):
if not filter_func(filename):
continue
path = os.path.join(dirpath, filename)
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.unlink(path)
self.logger.debug('Deleted : %r', path)
def _start_kuryr_manage_daemon(self):
LOG.info("Pool manager started")
server_address = oslo_cfg.CONF.pool_manager.sock_file
try:
os.unlink(server_address)
except OSError:
if os.path.exists(server_address):
raise
try:
httpd = UnixDomainHttpServer(server_address, RequestHandler)
httpd.serve_forever()
except KeyboardInterrupt:
pass
except Exception:
LOG.exception('Failed to start Pool Manager.')
httpd.socket.close()
def run(self):
self.run_command("egg_info")
from glob import glob
for pattern in self.match:
pattern = self.distribution.get_name() + '*' + pattern
files = glob(os.path.join(self.dist_dir, pattern))
files = [(os.path.getmtime(f), f) for f in files]
files.sort()
files.reverse()
log.info("%d file(s) matching %s", len(files), pattern)
files = files[self.keep:]
for (t, f) in files:
log.info("Deleting %s", f)
if not self.dry_run:
if os.path.isdir(f):
shutil.rmtree(f)
else:
os.unlink(f)
def save(self):
"""Write changed .pth file back to disk"""
if not self.dirty:
return
rel_paths = list(map(self.make_relative, self.paths))
if rel_paths:
log.debug("Saving %s", self.filename)
lines = self._wrap_lines(rel_paths)
data = '\n'.join(lines) + '\n'
if os.path.islink(self.filename):
os.unlink(self.filename)
with open(self.filename, 'wt') as f:
f.write(data)
elif os.path.exists(self.filename):
log.debug("Deleting empty %s", self.filename)
os.unlink(self.filename)
self.dirty = False
def zap_pyfiles(self):
log.info("Removing .py files from temporary directory")
for base, dirs, files in walk_egg(self.bdist_dir):
for name in files:
path = os.path.join(base, name)
if name.endswith('.py'):
log.debug("Deleting %s", path)
os.unlink(path)
if base.endswith('__pycache__'):
path_old = path
pattern = r'(?P<name>.+)\.(?P<magic>[^.]+)\.pyc'
m = re.match(pattern, name)
path_new = os.path.join(base, os.pardir, m.group('name') + '.pyc')
log.info("Renaming file from [%s] to [%s]" % (path_old, path_new))
try:
os.remove(path_new)
except OSError:
pass
os.rename(path_old, path_new)
def run(self):
self.run_command("egg_info")
from glob import glob
for pattern in self.match:
pattern = self.distribution.get_name() + '*' + pattern
files = glob(os.path.join(self.dist_dir, pattern))
files = [(os.path.getmtime(f), f) for f in files]
files.sort()
files.reverse()
log.info("%d file(s) matching %s", len(files), pattern)
files = files[self.keep:]
for (t, f) in files:
log.info("Deleting %s", f)
if not self.dry_run:
if os.path.isdir(f):
shutil.rmtree(f)
else:
os.unlink(f)
def save(self):
"""Write changed .pth file back to disk"""
if not self.dirty:
return
rel_paths = list(map(self.make_relative, self.paths))
if rel_paths:
log.debug("Saving %s", self.filename)
lines = self._wrap_lines(rel_paths)
data = '\n'.join(lines) + '\n'
if os.path.islink(self.filename):
os.unlink(self.filename)
with open(self.filename, 'wt') as f:
f.write(data)
elif os.path.exists(self.filename):
log.debug("Deleting empty %s", self.filename)
os.unlink(self.filename)
self.dirty = False
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def generate_pdf(card):
"""
Make a PDF from a card
:param card: dict from fetcher.py
:return: Binary PDF buffer
"""
from eclaire.base import SPECIAL_LABELS
pdf = FPDF('L', 'mm', (62, 140))
pdf.set_margins(2.8, 2.8, 2.8)
pdf.set_auto_page_break(False, margin=0)
pdf.add_page()
font = pkg_resources.resource_filename('eclaire', 'font/Clairifont.ttf')
pdf.add_font('Clairifont', fname=font, uni=True)
pdf.set_font('Clairifont', size=48)
pdf.multi_cell(0, 18, txt=card.name.upper(), align='L')
qrcode = generate_qr_code(card.url)
qrcode_file = mktemp(suffix='.png', prefix='trello_qr_')
qrcode.save(qrcode_file)
pdf.image(qrcode_file, 118, 35, 20, 20)
os.unlink(qrcode_file)
# May we never speak of this again.
pdf.set_fill_color(255, 255, 255)
pdf.rect(0, 55, 140, 20, 'F')
pdf.set_font('Clairifont', '', 16)
pdf.set_y(-4)
labels = ', '.join([label.name for label in card.labels
if label.name not in SPECIAL_LABELS])
pdf.multi_cell(0, 0, labels, 0, 'R')
return pdf.output(dest='S')
def makelink(self, tarinfo, targetpath):
"""Make a (symbolic) link called targetpath. If it cannot be created
(platform limitation), we try to make a copy of the referenced file
instead of a link.
"""
if hasattr(os, "symlink") and hasattr(os, "link"):
# For systems that support symbolic and hard links.
if tarinfo.issym():
if os.path.lexists(targetpath):
os.unlink(targetpath)
os.symlink(tarinfo.linkname, targetpath)
else:
# See extract().
if os.path.exists(tarinfo._link_target):
if os.path.lexists(targetpath):
os.unlink(targetpath)
os.link(tarinfo._link_target, targetpath)
else:
self._extract_member(self._find_link_target(tarinfo), targetpath)
else:
try:
self._extract_member(self._find_link_target(tarinfo), targetpath)
except KeyError:
raise ExtractError("unable to resolve link inside archive")
def _find_grail_rc(self):
import glob
import pwd
import socket
import tempfile
tempdir = os.path.join(tempfile.gettempdir(),
".grail-unix")
user = pwd.getpwuid(os.getuid())[0]
filename = os.path.join(tempdir, user + "-*")
maybes = glob.glob(filename)
if not maybes:
return None
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
for fn in maybes:
# need to PING each one until we find one that's live
try:
s.connect(fn)
except socket.error:
# no good; attempt to clean it out, but don't fail:
try:
os.unlink(fn)
except IOError:
pass
else:
return s
def __init__(self, proxies=None, **x509):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
self.key_file = x509.get('key_file')
self.cert_file = x509.get('cert_file')
self.addheaders = [('User-Agent', self.version)]
self.__tempfiles = []
self.__unlink = os.unlink # See cleanup()
self.tempcache = None
# Undocumented feature: if you assign {} to tempcache,
# it is used to cache files retrieved with
# self.retrieve(). This is not enabled by default
# since it does not work for changing documents (and I
# haven't got the logic to check expiration headers
# yet).
self.ftpcache = ftpcache
# Undocumented feature: you can use a different
# ftp cache by assigning to the .ftpcache member;
# in case you want logically independent URL openers
# XXX This is not threadsafe. Bah.
def nextfile(self):
savestdout = self._savestdout
self._savestdout = 0
if savestdout:
sys.stdout = savestdout
output = self._output
self._output = 0
if output:
output.close()
file = self._file
self._file = 0
if file and not self._isstdin:
file.close()
backupfilename = self._backupfilename
self._backupfilename = 0
if backupfilename and not self._backup:
try: os.unlink(backupfilename)
except OSError: pass
self._isstdin = False
self._buffer = []
self._bufindex = 0
def close(self,
remove=os.unlink,error=os.error):
if self.pipe:
rc = self.pipe.close()
else:
rc = 255
if self.tmpfile:
try:
remove(self.tmpfile)
except error:
pass
return rc
# Alias
def putsequences(self, sequences):
"""Write the set of sequences back to the folder."""
fullname = self.getsequencesfilename()
f = None
for key, seq in sequences.iteritems():
s = IntSet('', ' ')
s.fromlist(seq)
if not f: f = open(fullname, 'w')
f.write('%s: %s\n' % (key, s.tostring()))
if not f:
try:
os.unlink(fullname)
except os.error:
pass
else:
f.close()
def removemessages(self, list):
"""Remove one or more messages -- may raise os.error."""
errors = []
deleted = []
for n in list:
path = self.getmessagefilename(n)
commapath = self.getmessagefilename(',' + str(n))
try:
os.unlink(commapath)
except os.error:
pass
try:
os.rename(path, commapath)
except os.error, msg:
errors.append(msg)
else:
deleted.append(n)
if deleted:
self.removefromallsequences(deleted)
if errors:
if len(errors) == 1:
raise os.error, errors[0]
else:
raise os.error, ('multiple errors:', errors)
def copymessage(self, n, tofolder, ton):
"""Copy one message over a specific destination message,
which may or may not already exist."""
path = self.getmessagefilename(n)
# Open it to check that it exists
f = open(path)
f.close()
del f
topath = tofolder.getmessagefilename(ton)
backuptopath = tofolder.getmessagefilename(',%d' % ton)
try:
os.rename(topath, backuptopath)
except os.error:
pass
ok = 0
try:
tofolder.setlast(None)
shutil.copy2(path, topath)
ok = 1
finally:
if not ok:
try:
os.unlink(topath)
except os.error:
pass
def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, prev + 1))
if hasattr(os, 'link'):
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
else:
os.rename(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
prev += 1
self._next_key = prev + 1
if len(changes) == 0:
return
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def _findLib_gcc(name):
expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)
fdout, ccout = tempfile.mkstemp()
os.close(fdout)
cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \
'$CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name
try:
f = os.popen(cmd)
try:
trace = f.read()
finally:
rv = f.close()
finally:
try:
os.unlink(ccout)
except OSError, e:
if e.errno != errno.ENOENT:
raise
if rv == 10:
raise OSError, 'gcc or cc command not found'
res = re.search(expr, trace)
if not res:
return None
return res.group(0)
def test_save_svgz_filename():
import gzip
qr = segno.make_qr('test')
f = tempfile.NamedTemporaryFile('wb', suffix='.svgz', delete=False)
f.close()
qr.save(f.name)
f = open(f.name, mode='rb')
expected = b'\x1f\x8b\x08' # gzip magic number
val = f.read(len(expected))
f.close()
f = gzip.open(f.name)
try:
content = f.read(6)
finally:
f.close()
os.unlink(f.name)
assert expected == val
assert b'<?xml ' == content
def test_write_unicode_filename():
qr = segno.make_qr('test')
f = tempfile.NamedTemporaryFile('wt', suffix='.svg', delete=False)
f.close()
title = 'mürrische Mädchen'
desc = '?'
qr.save(f.name, title=title, desc=desc)
f = open(f.name, mode='rb')
root = _parse_xml(f)
f.seek(0)
val = f.read(6)
f.close()
os.unlink(f.name)
assert b'<?xml ' == val
assert title == _get_title(root).text
assert desc == _get_desc(root).text
def grab(bbox=None):
if sys.platform == "darwin":
f, file = tempfile.mkstemp('.png')
os.close(f)
subprocess.call(['screencapture', '-x', file])
im = Image.open(file)
im.load()
os.unlink(file)
else:
size, data = grabber()
im = Image.frombytes(
"RGB", size, data,
# RGB, 32-bit line padding, origo in lower left corner
"raw", "BGR", (size[0]*3 + 3) & -4, -1
)
if bbox:
im = im.crop(bbox)
return im
def test_infile_outfile(self):
with tempfile.NamedTemporaryFile() as infile:
infile.write(self.data.encode())
infile.flush()
# outfile will get overwritten by tool, so the delete
# may not work on some platforms. Do it manually.
outfile = tempfile.NamedTemporaryFile()
try:
self.assertEqual(
self.runTool(args=[infile.name, outfile.name]),
''.encode())
with open(outfile.name, 'rb') as f:
self.assertEqual(f.read(), self.expect.encode())
finally:
outfile.close()
if os.path.exists(outfile.name):
os.unlink(outfile.name)
def finalize(self):
assert (self.__asynchronous == 0) and not self.__dirty
if self.__buildIdCache is not None:
self.__buildIdCache.close()
self.__buildIdCache = None
if self.__lock:
try:
os.unlink(self.__lock)
except FileNotFoundError:
from .tty import colorize
from sys import stderr
print(colorize("Warning: lock file was deleted while Bob was still running!", "33"),
file=stderr)
except OSError as e:
from .tty import colorize
from sys import stderr
print(colorize("Warning: cannot unlock workspace: "+str(e), "33"),
file=stderr)
def download_file(file, url, optional = False):
try:
print "Downloading", url + "...",
sys.stdout.flush()
my_urlretrieve(url, file)
print "OK"
sys.stdout.flush()
except IOError, e:
if optional:
print "missing but optional, so that's OK"
else:
print e
sys.stdout.flush()
if os.path.exists(file):
os.unlink(file)
raise
# Create a file of the given size, containing NULs, without holes.
def install(self):
# This is needed for Xen and noemu, where we get the kernel
# from the dist rather than the installed image
self.dist.set_workdir(self.workdir)
if self.vmm == 'noemu':
self.dist.download()
self._install()
else:
# Already installed?
if os.path.exists(self.wd0_path()):
return
try:
self._install()
except:
if os.path.exists(self.wd0_path()):
os.unlink(self.wd0_path())
raise
# Boot the virtual machine (installing it first if it's not
# installed already). The vmm_args argument applies when
# booting, but not when installing. Does not wait for
# a login prompt.
def test_plotscene():
tempfilename = tempfile.NamedTemporaryFile(suffix='.svg').name
print("using %s as a temporary file" % tempfilename)
pg.setConfigOption('foreground', (0,0,0))
w = pg.GraphicsWindow()
w.show()
p1 = w.addPlot()
p2 = w.addPlot()
p1.plot([1,3,2,3,1,6,9,8,4,2,3,5,3], pen={'color':'k'})
p1.setXRange(0,5)
p2.plot([1,5,2,3,4,6,1,2,4,2,3,5,3], pen={'color':'k', 'cosmetic':False, 'width': 0.3})
app.processEvents()
app.processEvents()
ex = pg.exporters.SVGExporter(w.scene())
ex.export(fileName=tempfilename)
# clean up after the test is done
os.unlink(tempfilename)
def test_plotscene():
tempfilename = tempfile.NamedTemporaryFile(suffix='.svg').name
print("using %s as a temporary file" % tempfilename)
pg.setConfigOption('foreground', (0,0,0))
w = pg.GraphicsWindow()
w.show()
p1 = w.addPlot()
p2 = w.addPlot()
p1.plot([1,3,2,3,1,6,9,8,4,2,3,5,3], pen={'color':'k'})
p1.setXRange(0,5)
p2.plot([1,5,2,3,4,6,1,2,4,2,3,5,3], pen={'color':'k', 'cosmetic':False, 'width': 0.3})
app.processEvents()
app.processEvents()
ex = pg.exporters.SVGExporter(w.scene())
ex.export(fileName=tempfilename)
# clean up after the test is done
os.unlink(tempfilename)
def is_running(name):
"""
Test whether task is running under ``name``
:param name: name of task
:type name: ``unicode``
:returns: ``True`` if task with name ``name`` is running, else ``False``
:rtype: ``Boolean``
"""
pidfile = _pid_file(name)
if not os.path.exists(pidfile):
return False
with open(pidfile, 'rb') as file_obj:
pid = int(file_obj.read().strip())
if _process_exists(pid):
return True
elif os.path.exists(pidfile):
os.unlink(pidfile)
return False
def _delete_directory_contents(self, dirpath, filter_func):
"""Delete all files in a directory
:param dirpath: path to directory to clear
:type dirpath: ``unicode`` or ``str``
:param filter_func function to determine whether a file shall be
deleted or not.
:type filter_func ``callable``
"""
if os.path.exists(dirpath):
for filename in os.listdir(dirpath):
if not filter_func(filename):
continue
path = os.path.join(dirpath, filename)
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.unlink(path)
self.logger.debug('Deleted : %r', path)
def is_running(name):
"""
Test whether task is running under ``name``
:param name: name of task
:type name: ``unicode``
:returns: ``True`` if task with name ``name`` is running, else ``False``
:rtype: ``Boolean``
"""
pidfile = _pid_file(name)
if not os.path.exists(pidfile):
return False
with open(pidfile, 'rb') as file_obj:
pid = int(file_obj.read().strip())
if _process_exists(pid):
return True
elif os.path.exists(pidfile):
os.unlink(pidfile)
return False
def service_resume(service_name, init_dir="/etc/init",
initd_dir="/etc/init.d"):
"""Resume a system service.
Reenable starting again at boot. Start the service"""
upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
sysv_file = os.path.join(initd_dir, service_name)
if init_is_systemd():
service('enable', service_name)
elif os.path.exists(upstart_file):
override_path = os.path.join(
init_dir, '{}.override'.format(service_name))
if os.path.exists(override_path):
os.unlink(override_path)
elif os.path.exists(sysv_file):
subprocess.check_call(["update-rc.d", service_name, "enable"])
else:
raise ValueError(
"Unable to detect {0} as SystemD, Upstart {1} or"
" SysV {2}".format(
service_name, upstart_file, sysv_file))
started = service_running(service_name)
if not started:
started = service_start(service_name)
return started
def download(self, source, dest):
"""
Download an archive file.
:param str source: URL pointing to an archive file.
:param str dest: Local path location to download archive file to.
"""
# propogate all exceptions
# URLError, OSError, etc
proto, netloc, path, params, query, fragment = urlparse(source)
if proto in ('http', 'https'):
auth, barehost = splituser(netloc)
if auth is not None:
source = urlunparse((proto, barehost, path, params, query, fragment))
username, password = splitpasswd(auth)
passman = HTTPPasswordMgrWithDefaultRealm()
# Realm is set to None in add_password to force the username and password
# to be used whatever the realm
passman.add_password(None, source, username, password)
authhandler = HTTPBasicAuthHandler(passman)
opener = build_opener(authhandler)
install_opener(opener)
response = urlopen(source)
try:
with open(dest, 'wb') as dest_file:
dest_file.write(response.read())
except Exception as e:
if os.path.isfile(dest):
os.unlink(dest)
raise e
# Mandatory file validation via Sha1 or MD5 hashing.
def test_cli_rs3tohtml():
"""conversion to HTML on the commandline"""
temp_html = tempfile.NamedTemporaryFile(suffix='.html', delete=False)
temp_html.close()
cli([RS3_FILEPATH, temp_html.name])
with open(temp_html.name, 'r') as html_file:
assert EXPECTED_HTML in html_file.read()
os.unlink(temp_html.name)
def rs3topng(rs3_filepath, png_filepath=None):
"""Convert a RS3 file into a PNG image of the RST tree.
If no output filename is given, the PNG image is returned
as a string (which is useful for embedding).
"""
try:
from selenium import webdriver
from selenium.common.exceptions import WebDriverException
except ImportError:
raise ImportError(
'Please install selenium: pip install selenium')
html_str = rs3tohtml(rs3_filepath)
temp = tempfile.NamedTemporaryFile(suffix='.html', delete=False)
temp.write(html_str.encode('utf8'))
temp.close()
try:
driver = webdriver.PhantomJS()
except WebDriverException as err:
raise WebDriverException(
'Please install phantomjs: http://phantomjs.org/\n' + err.msg)
driver.get(temp.name)
os.unlink(temp.name)
png_str = driver.get_screenshot_as_png()
if png_filepath:
with open(png_filepath, 'w') as png_file:
png_file.write(png_str)
else:
return png_str
def release(self):
"""Release the lock by deleting `self.lockfile`."""
self._locked = False
try:
os.unlink(self.lockfile)
except (OSError, IOError) as err: # pragma: no cover
if err.errno != 2:
raise err
def cache_data(self, name, data):
"""Save ``data`` to cache under ``name``.
If ``data`` is ``None``, the corresponding cache file will be
deleted.
:param name: name of datastore
:param data: data to store. This may be any object supported by
the cache serializer
"""
serializer = manager.serializer(self.cache_serializer)
cache_path = self.cachefile('%s.%s' % (name, self.cache_serializer))
if data is None:
if os.path.exists(cache_path):
os.unlink(cache_path)
self.logger.debug('Deleted cache file : %s', cache_path)
return
with atomic_writer(cache_path, 'wb') as file_obj:
serializer.dump(data, file_obj)
self.logger.debug('Cached data saved at : %s', cache_path)
def test_bearer_token(self, m_cfg, m_get):
token_content = (
"eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3Nl"
"cnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc"
"3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bn"
"Qvc2VjcmV0Lm5hbWUiOiJkZWZhdWx0LXRva2VuLWh4M3QxIiwia3ViZXJuZXRlcy5"
"pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImRlZmF1bHQi"
"LCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51a"
"WQiOiIxYTkyM2ZmNi00MDkyLTExZTctOTMwYi1mYTE2M2VkY2ViMDUiLCJzdWIiOi"
"JzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06ZGVmYXVsdCJ9.lzcPef"
"DQ-uzF5cD-5pLwTKpRvtvvxKB4LX8TLymrPLMTth8WGr1vT6jteJPmLiDZM2C5dZI"
"iFJpOw4LL1XLullik-ls-CmnTWq97NvlW1cZolC0mNyRz6JcL7gkH8WfUSjLA7x80"
"ORalanUxtl9-ghMGKCtKIACAgvr5gGT4iznGYQQRx_hKURs4O6Js5vhwNM6UuOKeW"
"GDDAlhgHMG0u59z3bhiBLl6jbQktZsu8c3diXniQb3sYqYQcGKUm1IQFujyA_ByDb"
"5GUtCv1BOPL_-IjYtvdJD8ZzQ_UnPFoYQklpDyJLB7_7qCGcfVEQbnSCh907NdKo4"
"w_8Wkn2y-Tg")
token_file = tempfile.NamedTemporaryFile(mode="w+t", delete=False)
try:
m_cfg.kubernetes.token_file = token_file.name
token_file.write(token_content)
token_file.close()
m_cfg.kubernetes.ssl_verify_server_crt = False
path = '/test'
client = k8s_client.K8sClient(self.base_url)
client.get(path)
headers = {
'Authorization': 'Bearer {}'.format(token_content)}
m_get.assert_called_once_with(
self.base_url + path, cert=(None, None), headers=headers,
verify=False)
finally:
os.unlink(m_cfg.kubernetes.token_file)
def get_exit_code(self):
self.process.wait()
os.unlink(self.tmp_script_filename)
return self.process.returncode
def release(self):
if not self.is_locked():
raise NotLocked("%s is not locked" % self.path)
elif not self.i_am_locking():
raise NotMyLock("%s is locked, but not by me" % self.path)
os.unlink(self.lock_file)
def release(self):
if not self.is_locked():
raise NotLocked("%s is not locked" % self.path)
elif not os.path.exists(self.unique_name):
raise NotMyLock("%s is locked, but not by me" % self.path)
os.unlink(self.unique_name)
os.rmdir(self.lock_file)
def break_lock(self):
if os.path.exists(self.lock_file):
for name in os.listdir(self.lock_file):
os.unlink(os.path.join(self.lock_file, name))
os.rmdir(self.lock_file)
def acquire(self, timeout=None):
try:
open(self.unique_name, "wb").close()
except IOError:
raise LockFailed("failed to create %s" % self.unique_name)
timeout = timeout if timeout is not None else self.timeout
end_time = time.time()
if timeout is not None and timeout > 0:
end_time += timeout
while True:
# Try and create a hard link to it.
try:
os.link(self.unique_name, self.lock_file)
except OSError:
# Link creation failed. Maybe we've double-locked?
nlinks = os.stat(self.unique_name).st_nlink
if nlinks == 2:
# The original link plus the one I created == 2. We're
# good to go.
return
else:
# Otherwise the lock creation failed.
if timeout is not None and time.time() > end_time:
os.unlink(self.unique_name)
if timeout > 0:
raise LockTimeout("Timeout waiting to acquire"
" lock for %s" %
self.path)
else:
raise AlreadyLocked("%s is already locked" %
self.path)
time.sleep(timeout is not None and timeout / 10 or 0.1)
else:
# Link creation succeeded. We're good to go.
return