Python tempfile 模块,mktemp() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.mktemp()。
def test_SADParser(self):
sad = parsers.SADParser.parse("sdr/dom/waveforms/CommandWrapperWithPropertyOverride/CommandWrapper.sad.xml")
self.assertEqual(sad.get_id(), "DCE:d206ab51-6342-4976-bac3-55e6902f3489")
self.assertEqual(sad.get_name(), "CommandWrapperWithPropertyOverride")
self.assertEqual(len(sad.componentfiles.get_componentfile()), 1)
self.assertEqual(len(sad.partitioning.get_componentplacement()), 1)
self.assertEqual(sad.partitioning.get_componentplacement()[0].componentfileref.refid, "CommandWrapper_592b8bd6-b011-4468-9417-705af45e907b")
self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].id_, "DCE:8c129782-a6a4-4095-8212-757f01de0c09")
self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].get_usagename(), "CommandWrapper1")
self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].componentproperties.get_simpleref()[0].refid, "DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e")
self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].componentproperties.get_simpleref()[0].value, "/bin/date")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
sad.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "SAD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_get_swift_hash_env(self, mock_config, mock_service_name):
mock_config.return_value = None
mock_service_name.return_value = "testsvc"
tmpfile = tempfile.mktemp()
swift_context.SWIFT_HASH_FILE = tmpfile
with mock.patch('lib.swift_context.os.environ.get') as mock_env_get:
mock_env_get.return_value = str(uuid.uuid4())
hash_ = swift_context.get_swift_hash()
mock_env_get.assert_has_calls([
mock.call('JUJU_MODEL_UUID'),
mock.call('JUJU_ENV_UUID',
mock_env_get.return_value)
])
with open(tmpfile, 'r') as fd:
self.assertEqual(hash_, fd.read())
self.assertTrue(mock_config.called)
def generate_pdf(card):
"""
Make a PDF from a card
:param card: dict from fetcher.py
:return: Binary PDF buffer
"""
from eclaire.base import SPECIAL_LABELS
pdf = FPDF('L', 'mm', (62, 140))
pdf.set_margins(2.8, 2.8, 2.8)
pdf.set_auto_page_break(False, margin=0)
pdf.add_page()
font = pkg_resources.resource_filename('eclaire', 'font/Clairifont.ttf')
pdf.add_font('Clairifont', fname=font, uni=True)
pdf.set_font('Clairifont', size=48)
pdf.multi_cell(0, 18, txt=card.name.upper(), align='L')
qrcode = generate_qr_code(card.url)
qrcode_file = mktemp(suffix='.png', prefix='trello_qr_')
qrcode.save(qrcode_file)
pdf.image(qrcode_file, 118, 35, 20, 20)
os.unlink(qrcode_file)
# May we never speak of this again.
pdf.set_fill_color(255, 255, 255)
pdf.rect(0, 55, 140, 20, 'F')
pdf.set_font('Clairifont', '', 16)
pdf.set_y(-4)
labels = ', '.join([label.name for label in card.labels
if label.name not in SPECIAL_LABELS])
pdf.multi_cell(0, 0, labels, 0, 'R')
return pdf.output(dest='S')
def test_exit_crash():
# For each Widget subclass, run a simple python script that creates an
# instance and then shuts down. The intent is to check for segmentation
# faults when each script exits.
tmp = tempfile.mktemp(".py")
path = os.path.dirname(pg.__file__)
initArgs = {
'CheckTable': "[]",
'ProgressDialog': '"msg"',
'VerticalLabel': '"msg"',
}
for name in dir(pg):
obj = getattr(pg, name)
if not isinstance(obj, type) or not issubclass(obj, pg.QtGui.QWidget):
continue
print(name)
argstr = initArgs.get(name, "")
open(tmp, 'w').write(code.format(path=path, classname=name, args=argstr))
proc = subprocess.Popen([sys.executable, tmp])
assert proc.wait() == 0
os.remove(tmp)
def test_exit_crash():
# For each Widget subclass, run a simple python script that creates an
# instance and then shuts down. The intent is to check for segmentation
# faults when each script exits.
tmp = tempfile.mktemp(".py")
path = os.path.dirname(pg.__file__)
initArgs = {
'CheckTable': "[]",
'ProgressDialog': '"msg"',
'VerticalLabel': '"msg"',
}
for name in dir(pg):
obj = getattr(pg, name)
if not isinstance(obj, type) or not issubclass(obj, pg.QtGui.QWidget):
continue
print(name)
argstr = initArgs.get(name, "")
open(tmp, 'w').write(code.format(path=path, classname=name, args=argstr))
proc = subprocess.Popen([sys.executable, tmp])
assert proc.wait() == 0
os.remove(tmp)
def _smcra_to_str(self, smcra, temp_dir='/tmp/'):
"""
WHATIF's input are PDB format files.
Converts a SMCRA object to a PDB formatted string.
"""
temp_path = tempfile.mktemp( '.pdb', dir=temp_dir )
io = PDBIO()
io.set_structure(smcra)
io.save(temp_path)
f = open(temp_path, 'r')
string = f.read()
f.close()
os.remove(temp_path)
return string
def test_binary(self, enable_randomness=True, times=1, timeout=15):
"""
Test the binary generated
"""
# dump the binary code
pov_binary_filename = tempfile.mktemp(dir='/tmp', prefix='rex-pov-')
self.dump_binary(filename=pov_binary_filename)
os.chmod(pov_binary_filename, 0755)
pov_tester = CGCPovSimulator()
result = pov_tester.test_binary_pov(
pov_binary_filename,
self.crash.binary,
enable_randomness=enable_randomness,
timeout=timeout,
times=times)
# remove the generated pov
os.remove(pov_binary_filename)
return result
def generate_report(self, register_setters, leakers):
stat_name = tempfile.mktemp(dir=".", prefix='rex-results-')
l.info("exploitation report being written to '%s'", stat_name)
f = open(stat_name, 'w')
f.write("Binary %s:\n" % os.path.basename(self.crash.project.filename))
f.write("Register setting exploits:\n")
for register_setter in register_setters:
f.write("\t%s\n" % str(register_setter))
f.write("\n")
f.write("Leaker exploits:\n")
for leaker in leakers:
f.write("\t%s\n" % str(leaker))
f.close()
def start_docker_compose(clients=1):
inline_logs = conftest.inline_logs
docker_compose_cmd("up -d")
if clients > 1:
docker_compose_cmd("scale mender-client=%d" % clients)
if inline_logs:
docker_compose_cmd("logs -f &")
else:
tfile = tempfile.mktemp("mender_testing")
docker_compose_cmd("logs -f --no-color > %s 2>&1 &" % tfile)
logger.info("docker-compose log file stored here: %s" % tfile)
log_files.append(tfile)
ssh_is_opened()
common.set_setup_type(common.ST_OneClient)
def tearDown(self):
"""
Clean up any files or directories created using L{TestCase.mktemp}.
Subclasses must invoke this method if they override it or the
cleanup will not occur.
"""
if self._temporaryFiles is not None:
for temp in self._temporaryFiles:
if os.path.isdir(temp):
shutil.rmtree(temp)
elif os.path.exists(temp):
os.unlink(temp)
try:
_exception_from_error_queue()
except Error, e:
if e.args != ([],):
self.fail("Left over errors in OpenSSL error queue: " + repr(e))
def _getBatchOutput(self, f):
fn = tempfile.mktemp()
open(fn, 'w').write(f)
port = self.server.getHost().port
cmds = ('-p %i -l testuser '
'-K unix '
'-a '
'-v -b %s 127.0.0.1') % (port, fn)
cmds = test_conch._makeArgs(cmds.split(), mod='cftp')[1:]
log.msg('running %s %s' % (sys.executable, cmds))
env = os.environ.copy()
env['PYTHONPATH'] = os.pathsep.join(sys.path)
self.server.factory.expectedLoseConnection = 1
d = getProcessOutputAndValue(sys.executable, cmds, env=env)
def _cleanup(res):
os.remove(fn)
return res
d.addCallback(lambda res: res[0])
d.addBoth(_cleanup)
return d
def loopbackUNIX(server, client, noisy=True):
"""Run session between server and client protocol instances over UNIX socket."""
path = tempfile.mktemp()
from twisted.internet import reactor
f = policies.WrappingFactory(protocol.Factory())
serverWrapper = _FireOnClose(f, server)
f.noisy = noisy
f.buildProtocol = lambda addr: serverWrapper
serverPort = reactor.listenUNIX(path, f)
clientF = LoopbackClientFactory(client)
clientF.noisy = noisy
reactor.connectUNIX(path, clientF)
d = clientF.deferred
d.addCallback(lambda x: serverWrapper.deferred)
d.addCallback(lambda x: serverPort.stopListening())
return d
def get_all_species(self):
import tempfile
outfile = tempfile.mktemp() + '.txt.gz'
try:
self.logger.info('Downloading "species.txt.gz"...')
out_f = open(outfile, 'wb')
ftp = FTP(self.__class__.ENSEMBL_FTP_HOST)
ftp.login()
species_file = '/pub/release-%s/mysql/ensembl_production_%s/species.txt.gz' % (self.release, self.release)
ftp.retrbinary("RETR " + species_file, out_f.write)
out_f.close()
self.logger.info('Done.')
#load saved file
self.logger.info('Parsing "species.txt.gz"...')
species_li = tab2list(outfile, (1, 2, 7), header=0) # db_name,common_name,taxid
species_li = [x[:-1] + [is_int(x[-1]) and int(x[-1]) or None] for x in species_li]
# as of ensembl 87, there are also mouse strains. keep only the "original" one
species_li = [s for s in species_li if not s[0].startswith("mus_musculus_")]
self.logger.info('Done.')
finally:
os.remove(outfile)
pass
return species_li
def __init__(self,
in_file,
exception_handler,
bug_handler,
copy = None,
run_level = 1,
):
self.__file = in_file
self.__bug_handler = bug_handler
self.__copy = copy
self.__run_level = run_level
self.__write_to = tempfile.mktemp()
self.__exception_handler = exception_handler
self.__bug_handler = bug_handler
self.__state = 'outside'
self.__utf_exp = re.compile(r'&#x(.*?);')
def __init__(self,
in_file,
bug_handler,
out_file,
copy = None,
orig_file = None,
run_level = 1,
):
self.__file = in_file
self.__bug_handler = bug_handler
self.__copy = copy
self.__run_level = run_level
self.__write_to = tempfile.mktemp()
self.__bracket_count = 0
self.__ob_count = 0
self.__cb_count = 0
self.__pict_count = 0
self.__in_pict = 0
self.__already_found_pict = 0
self.__orig_file = orig_file
self.__initiate_pict_dict()
self.__out_file = out_file
# this is left over
self.__no_ask = 1
def __init__(self,
in_file ,
bug_handler,
copy = None,
run_level = 1,
):
self.__file = in_file
self.__bug_handler = bug_handler
self.__copy = copy
self.__write_to = tempfile.mktemp()
self.__bracket_count=0
self.__ob_count = 0
self.__cb_count = 0
self.__after_asterisk = 0
self.__delete = 0
self.__initiate_allow()
self.__ob = 0
self.__write_cb = 0
self.__run_level = run_level
self.__found_delete = 0
self.__list = 0
def __test_bad(self):
paths = os.listdir(self.__invalid_dir)
for path in paths:
path = os.path.join(self.__invalid_dir, path)
filename, ext = os.path.splitext(path)
if ext != '.rtf':
continue
if os.path.isdir(path):
continue
new_filename = tempfile.mktemp()
out_path = os.path.join(self.__out_dir, new_filename)
status, msg = self.__run_script(in_file = path, out_file = out_path)
if not status:
info = [path, 'Should have produced an error']
self.__error.append(info)
try:
os.remove(out_path)
except OSError:
pass
def test_smrt_archiver_lasttime():
tmpfile = tempfile.mktemp()
archiver = Archiver(dbfile=tmpfile)
rule = 'test/smrt/rules/archiver.yml'
feed = 'lasttime'
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) > 0
f = {i.indicator: i.__dict__() for i in x}
assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z'
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) == 0
def test_smrt_archiver_firsttime():
tmpfile = tempfile.mktemp()
archiver = Archiver(dbfile=tmpfile)
rule = 'test/smrt/rules/archiver.yml'
feed = 'firsttime'
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) > 0
f = {i.indicator: i.__dict__() for i in x}
assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z'
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) == 0
def test_smrt_archiver_both():
tmpfile = tempfile.mktemp()
archiver = Archiver(dbfile=tmpfile)
rule = 'test/smrt/rules/archiver.yml'
feed = 'both'
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) > 0
f = {i.indicator: i.__dict__() for i in x}
assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z'
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) == 0
def test_smrt_archiver_neither():
tmpfile = tempfile.mktemp()
archiver = Archiver(dbfile=tmpfile)
rule = 'test/smrt/rules/archiver.yml'
feed = 'neither'
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) > 0
f = {i.indicator: i.__dict__() for i in x}
assert f['216.243.31.2'].get('lasttime') is None
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) == 0
def test_smrt_archiver_lasttime_clear():
tmpfile = tempfile.mktemp()
archiver = Archiver(dbfile=tmpfile)
rule = 'test/smrt/rules/archiver.yml'
feed = 'lasttime'
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) > 0
f = {i.indicator: i.__dict__() for i in x}
assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z'
archiver.clear_memcache()
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) == 0
def get_local_filename(self):
""" get_local_filename()
If the filename is an existing file on this filesystem, return
that. Otherwise a temporary file is created on the local file
system which can be used by the format to read from or write to.
"""
if self._uri_type == URI_FILENAME:
return self._filename
else:
# Get filename
ext = os.path.splitext(self._filename)[1]
self._filename_local = tempfile.mktemp(ext, 'imageio_')
# Write stuff to it?
if self.mode[0] == 'r':
with open(self._filename_local, 'wb') as file:
shutil.copyfileobj(self.get_file(), file)
return self._filename_local
def test_version_2_0_memmap():
# requires more than 2 byte for header
dt = [(("%d" % i) * 100, float) for i in range(500)]
d = np.ones(1000, dtype=dt)
tf = tempfile.mktemp('', 'mmap', dir=tempdir)
# 1.0 requested but data cannot be saved this way
assert_raises(ValueError, format.open_memmap, tf, mode='w+', dtype=d.dtype,
shape=d.shape, version=(1, 0))
ma = format.open_memmap(tf, mode='w+', dtype=d.dtype,
shape=d.shape, version=(2, 0))
ma[...] = d
del ma
with warnings.catch_warnings(record=True) as w:
warnings.filterwarnings('always', '', UserWarning)
ma = format.open_memmap(tf, mode='w+', dtype=d.dtype,
shape=d.shape, version=None)
assert_(w[0].category is UserWarning)
ma[...] = d
del ma
ma = format.open_memmap(tf, mode='r')
assert_array_equal(ma, d)
def testAll(self):
output_name = tempfile.mktemp("-pycom-test")
cmd = "cscript //nologo testxslt.js doesnt_matter.xml testxslt.xsl " + output_name
win32com.test.util.ExecuteShellCommand(cmd, self)
try:
f=open(output_name)
try:
got = f.read()
if got != expected_output:
print "ERROR: XSLT expected output of %r" % (expected_output,)
print "but got %r" % (got,)
finally:
f.close()
finally:
try:
os.unlink(output_name)
except os.error:
pass
def _SetCurrent(self, image):
filename = '%d.iso' % (image['timestamp'])
path = os.path.join(self._image_dir, filename)
current_path = os.path.join(self._image_dir, 'current')
try:
link = os.readlink(current_path)
link_path = os.path.join(self._image_dir, link)
if link_path == path:
return
except FileNotFoundError:
pass
print('Changing current link to:', filename, flush=True)
temp_path = tempfile.mktemp(dir=self._image_dir)
os.symlink(filename, temp_path)
os.rename(temp_path, current_path)
def _write_styles_file(self):
self.styles_xml = tempfile.mktemp()
try:
self.f = open(self.styles_xml,"wb")
except IOError as msg:
errmsg = "%s\n%s" % (_("Could not create %s") % self.styles_xml, msg)
raise ReportError(errmsg)
except:
raise ReportError(_("Could not create %s") % self.styles_xml)
self.f = open(self.styles_xml,"w")
self.f.write('<?xml version="1.0" encoding="UTF-8"?>\n')
self.f.write(
'<office:document-styles ' +
_XMLNS +
'office:version="1.0"> '
)
self.f.write(_STYLES_FONTS)
self.f.write(_STYLES_STYLES)
self.f.write(_STYLES_AUTOMATIC)
self.f.write(_STYLES_MASTER)
self.f.write('</office:document-styles>\n')
self.f.close()
def _write_meta_file(self):
self.meta_xml = tempfile.mktemp()
try:
self.f = open(self.meta_xml,"wb")
except IOError as msg:
errmsg = "%s\n%s" % (_("Could not create %s") % self.meta_xml, msg)
raise ReportError(errmsg)
except:
raise ReportError(_("Could not create %s") % self.meta_xml)
self.f = open(self.meta_xml,"w")
self.f.write(_META %
{'program': PROGRAM_NAME,
'version': VERSION,
'name' : self.name,
'time' : self.time,
}
)
self.f.close()
def __getstate__(self):
'''This function is neccessary for pickling'''
# Translate everything but the svm because that cannot be simply pickled.
state = {}
for key,value in self.__dict__.iteritems():
if key == '_model':
filename = tempfile.mktemp()
self._model.save(filename)
buffer = open(filename).read()
os.remove(filename)
state[key] = buffer
continue
state[key] = value
return state
def __getstate__(self):
'''This function is neccessary for pickling'''
# Translate everything but the svm because that cannot be simply pickled.
state = {}
for key,value in self.__dict__.iteritems():
if key == '_model':
filename = tempfile.mktemp()
self._model.save(filename)
buffer = open(filename).read()
os.remove(filename)
state[key] = buffer
continue
state[key] = value
return state
def test_resampling_value(shapefile, bins, samples):
samples = (samples//bins) * bins
lonlats, filename = shapefile
random_filename = tempfile.mktemp() + ".shp"
resampling.resample_by_magnitude(filename,
random_filename,
target_field='lat',
bins=bins,
output_samples=samples,
bootstrap=True
)
resampled_sf = shp.Reader(random_filename)
resampled_shapefields = [f[0] for f in resampled_sf.fields[1:]]
new_coords, new_val, new_othervals = \
geoio.load_shapefile(random_filename, 'lat')
assert 'lat' in resampled_shapefields
assert 'lon' not in resampled_shapefields
assert np.all((samples, 2) == new_coords.shape)
assert new_othervals == {} # only the target is retained after resampling
def test_resampling_spatial(shapefile, rows, cols, samples):
tiles = rows * cols
samples = (samples // tiles) * tiles
lonlats, filename = shapefile
random_filename = tempfile.mktemp() + ".shp"
resampling.resample_spatially(filename,
random_filename,
target_field='lat',
rows=rows,
cols=cols,
output_samples=samples,
bootstrap=True
)
resampled_sf = shp.Reader(random_filename)
resampled_shapefields = [f[0] for f in resampled_sf.fields[1:]]
new_coords, new_val, new_othervals = \
geoio.load_shapefile(random_filename, 'lat')
assert 'lat' in resampled_shapefields
assert 'lon' not in resampled_shapefields
assert np.all((samples, 2) >= new_coords.shape)
assert new_othervals == {} # only the target is retained after resampling
def test_do_work(self):
# input_file, mask_file, output_file, resampling, extents, jpeg
output_file = tempfile.mktemp(suffix='.tif')
options = Options(resampling='bilinear',
extents=self.extents,
jpeg=True,
reproject=True)
crop.do_work(input_file=self.std2000_no_mask,
output_file=output_file,
options=options,
mask_file=self.mask)
# output file was created
self.assertTrue(exists(output_file))
# assert jpeg was created
self.assertTrue(exists(
join(crop.TMPDIR, basename(output_file).split('.')[0] + '.jpg')))
os.remove(output_file)
def atomic_write(path):
tmp = tempfile.mktemp(
prefix='.' + os.path.basename(path),
dir=os.path.dirname(path),
)
try:
with open(tmp, 'w') as f:
yield f
except BaseException:
os.remove(tmp)
raise
else:
os.rename(tmp, path)
# TODO: at some point there will be so many options we'll want to make a config
# object or similar instead of adding more arguments here
def screenshot(self, filename=None):
"""
Return:
PIL.Image
Raises:
EnvironmentError
"""
tmpfile = tempfile.mktemp(prefix='atx-screencap-', suffix='.tiff')
try:
idevice("screenshot", "--udid", self.udid, tmpfile)
except subprocess.CalledProcessError as e:
sys.exit(e.message)
try:
image = Image.open(tmpfile)
image.load()
if filename:
image.save(filename)
return image
finally:
if os.path.exists(tmpfile):
os.unlink(tmpfile)
def _adb_screencap(self, scale=1.0):
"""
capture screen with adb shell screencap
"""
remote_file = tempfile.mktemp(dir='/data/local/tmp/', prefix='screencap-', suffix='.png')
local_file = tempfile.mktemp(prefix='atx-screencap-', suffix='.png')
self.shell('screencap', '-p', remote_file)
try:
self.pull(remote_file, local_file)
image = imutils.open_as_pillow(local_file)
if scale is not None and scale != 1.0:
image = image.resize([int(scale * s) for s in image.size], Image.BICUBIC)
rotation = self.rotation()
if rotation:
method = getattr(Image, 'ROTATE_{}'.format(rotation*90))
image = image.transpose(method)
return image
finally:
self.remove(remote_file)
os.unlink(local_file)
def _adb_minicap(self, scale=1.0):
"""
capture screen with minicap
https://github.com/openstf/minicap
"""
remote_file = tempfile.mktemp(dir='/data/local/tmp/', prefix='minicap-', suffix='.jpg')
local_file = tempfile.mktemp(prefix='atx-minicap-', suffix='.jpg')
(w, h, r) = self.display
params = '{x}x{y}@{rx}x{ry}/{r}'.format(x=w, y=h, rx=int(w*scale), ry=int(h*scale), r=r*90)
try:
self.shell('LD_LIBRARY_PATH=/data/local/tmp', self.__minicap, '-s', '-P', params, '>', remote_file)
self.pull(remote_file, local_file)
image = imutils.open_as_pillow(local_file)
return image
finally:
self.remove(remote_file)
os.unlink(local_file)
def test_blocking_lock_file(self):
my_file = tempfile.mktemp()
lock_file = BlockingLockFile(my_file)
lock_file._obtain_lock()
# next one waits for the lock
start = time.time()
wait_time = 0.1
wait_lock = BlockingLockFile(my_file, 0.05, wait_time)
self.failUnlessRaises(IOError, wait_lock._obtain_lock)
elapsed = time.time() - start
extra_time = 0.02
if is_win:
# for Appveyor
extra_time *= 6 # NOTE: Indeterministic failures here...
self.assertLess(elapsed, wait_time + extra_time)
def test_SPDParser(self):
# Verify the input is valid
spdPath = os.path.abspath("sdr/dom/components/CommandWrapper/CommandWrapper.spd.xml")
status = self._xmllint(spdPath, "SPD")
self.assertEqual(status, 0, "Input XML isn't DTD valid")
spd = parsers.SPDParser.parse(spdPath)
self.assertEqual(spd.get_id(), "DCE:458872f6-a316-4082-b1eb-ce5704f5c49d")
self.assertEqual(spd.get_name(), "CommandWrapper")
self.assertEqual(str(spd.get_author()[0].get_name()[0]), "REDHAWK test author")
self.assertEqual(spd.get_propertyfile().get_type(), "PRF")
self.assertEqual(spd.get_propertyfile().get_localfile().get_name(), "CommandWrapper.prf.xml")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
spd.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "SPD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_PRFParser(self):
prf = parsers.PRFParser.parse("sdr/dom/components/CommandWrapper/CommandWrapper.prf.xml")
props = {}
for property in prf.get_simple():
props[property.get_id()] = property
for property in prf.get_simplesequence():
props[property.get_id()] = property
self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_mode(), "readwrite")
self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_name(), "command")
self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_type(), "string")
self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_value(), "/bin/echo")
self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_kind()[0].get_kindtype(), "configure")
self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_mode(), "readwrite")
self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_name(), "args")
self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_type(), "string")
self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_values().get_value()[0], "Hello World")
self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_kind()[0].get_kindtype(), "configure")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
prf.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "PRF")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_SCDParser(self):
scd = parsers.SCDParser.parse("sdr/dom/components/CommandWrapper/CommandWrapper.scd.xml")
self.assertEqual(scd.get_corbaversion(), "2.2")
self.assertEqual(scd.get_componentrepid().get_repid(), "IDL:CF/Resource:1.0")
self.assertEqual(scd.get_componenttype(), "resource")
self.assertEqual(scd.get_componentfeatures().get_supportsinterface()[0].get_repid(), "IDL:CF/Resource:1.0")
self.assertEqual(scd.get_componentfeatures().get_supportsinterface()[0].get_supportsname(), "Resource")
self.assertEqual(scd.get_interfaces().get_interface()[0].get_name(), "Resource")
self.assertEqual(scd.get_interfaces().get_interface()[0].get_inheritsinterface()[0].get_repid(), "IDL:CF/LifeCycle:1.0")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
scd.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "SCD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_SADParser_usesdeviceref(self):
sad = parsers.SADParser.parse("sdr/parser_tests/usesdeviceref.sad.xml")
self.assertEqual(sad.get_id(), "colloc_usesdev_1")
self.assertEqual(sad.get_name(), "colloc_usesdev")
self.assertEqual(len(sad.componentfiles.get_componentfile()), 1)
self.assertEqual(len(sad.partitioning.get_hostcollocation()), 1)
colloc=sad.partitioning.get_hostcollocation()[0]
self.assertEqual(len(colloc.get_componentplacement()),1)
comp_place =colloc.get_componentplacement()[0]
self.assertEqual(len(comp_place.get_componentinstantiation()),1)
comp_ci=comp_place.get_componentinstantiation()[0]
self.assertEqual(comp_ci.id_, "P1_1")
self.assertEqual(comp_ci.get_usagename(), "P1_1")
self.assertEqual(len(colloc.get_usesdeviceref()),1)
udev_ref =colloc.get_usesdeviceref()[0]
self.assertEqual(udev_ref.refid, "FrontEndTuner_1")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
sad.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "SAD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_SADParser_devicerequires(self):
sad = parsers.SADParser.parse("sdr/parser_tests/devicerequires.sad.xml")
self.assertEqual(sad.get_id(), "device_requires_multicolor")
self.assertEqual(sad.get_name(), "device_requires_multicolor")
self.assertEqual(len(sad.componentfiles.get_componentfile()), 1)
self.assertEqual(len(sad.partitioning.get_componentplacement()), 2)
comp_place=sad.partitioning.get_componentplacement()[0]
comp_in=comp_place.get_componentinstantiation()[0]
self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1")
self.assertEqual(comp_in.id_, "SimpleComponent_Red")
self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Red")
self.assertEqual(len(comp_in.devicerequires.get_requires()),2)
self.assertEqual(comp_in.devicerequires.get_requires()[0].id, "color")
self.assertEqual(comp_in.devicerequires.get_requires()[0].value, "RED")
self.assertEqual(comp_in.devicerequires.get_requires()[1].id, "rank")
self.assertEqual(comp_in.devicerequires.get_requires()[1].value, "15")
comp_place=sad.partitioning.get_componentplacement()[1]
comp_in=comp_place.get_componentinstantiation()[0]
self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1")
self.assertEqual(comp_in.id_, "SimpleComponent_Green")
self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Green")
self.assertEqual(len(comp_in.devicerequires.get_requires()),1)
self.assertEqual(comp_in.devicerequires.get_requires()[0].id, "color")
self.assertEqual(comp_in.devicerequires.get_requires()[0].value, "GREEN")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
sad.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "SAD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_SADParser_loggingconfig(self):
sad = parsers.SADParser.parse("sdr/parser_tests/loggingconfig.sad.xml")
self.assertEqual(sad.get_id(), "device_requires_multicolor")
self.assertEqual(sad.get_name(), "device_requires_multicolor")
self.assertEqual(len(sad.componentfiles.get_componentfile()), 1)
self.assertEqual(len(sad.partitioning.get_componentplacement()), 2)
comp_place=sad.partitioning.get_componentplacement()[0]
comp_in=comp_place.get_componentinstantiation()[0]
self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1")
self.assertEqual(comp_in.id_, "SimpleComponent_Red")
self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Red")
self.assertEqual(comp_in.loggingconfig.level, "ERROR")
self.assertEqual(comp_in.loggingconfig.value, "path/to/my/log/file")
comp_place=sad.partitioning.get_componentplacement()[1]
comp_in=comp_place.get_componentinstantiation()[0]
self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1")
self.assertEqual(comp_in.id_, "SimpleComponent_Green")
self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Green")
self.assertEqual(comp_in.loggingconfig.value, "path/to/my/log/file2")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
sad.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "SAD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_SADParser_affinityconfig(self):
sad = parsers.SADParser.parse("sdr/parser_tests/affinity.sad.xml")
self.assertEqual(sad.get_id(), "device_requires_multicolor")
self.assertEqual(sad.get_name(), "device_requires_multicolor")
self.assertEqual(len(sad.componentfiles.get_componentfile()), 1)
self.assertEqual(len(sad.partitioning.get_componentplacement()), 1)
comp_place=sad.partitioning.get_componentplacement()[0]
comp_in=comp_place.get_componentinstantiation()[0]
self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1")
self.assertEqual(comp_in.id_, "SimpleComponent_Red")
self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Red")
self.assertEqual(comp_in.loggingconfig.level, "ERROR")
self.assertEqual(comp_in.loggingconfig.value, "path/to/my/log/file")
self.assertEqual(len(comp_in.affinity.get_simpleref()),2)
self.assertEqual(comp_in.affinity.get_simpleref()[0].refid, "affinity::exec_directive_class")
self.assertEqual(comp_in.affinity.get_simpleref()[0].value, "socket")
self.assertEqual(comp_in.affinity.get_simpleref()[1].refid, "affinity::exec_directive_value")
self.assertEqual(comp_in.affinity.get_simpleref()[1].value, "0")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
sad.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "SAD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_DCDParser_loggingconfig(self):
dcd = parsers.DCDParser.parse("sdr/parser_tests/loggingconfig.dcd.xml")
self.assertEqual(dcd.get_id(), "test_GPP_green")
self.assertEqual(dcd.get_name(), "test_GPP_green")
self.assertEqual(len(dcd.componentfiles.get_componentfile()), 1)
self.assertEqual(len(dcd.partitioning.get_componentplacement()), 1)
gpp=dcd.partitioning.get_componentplacement()[0]
gpp_ci=gpp.get_componentinstantiation()[0]
self.assertEqual(gpp.get_componentfileref().get_refid(), "GPP1_file_1")
self.assertEqual(gpp_ci.get_id(), "test_GPP_green::GPP_1")
self.assertEqual(gpp_ci.get_usagename(), "test_GPP_green::GPP_1")
self.assertEqual(gpp_ci.loggingconfig.level, "ERROR")
self.assertEqual(gpp_ci.loggingconfig.value, "path/to/my/log/file")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
dcd.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "DCD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def test_DCDParser_affinity(self):
dcd = parsers.DCDParser.parse("sdr/parser_tests/affinity.dcd.xml")
self.assertEqual(dcd.get_id(), "affinity_parse_1")
self.assertEqual(dcd.get_name(), "test_affinity_node_socket")
self.assertEqual(len(dcd.componentfiles.get_componentfile()), 1)
self.assertEqual(len(dcd.partitioning.get_componentplacement()), 1)
gpp=dcd.partitioning.get_componentplacement()[0]
gpp_ci=gpp.get_componentinstantiation()[0]
self.assertEqual(gpp.get_componentfileref().get_refid(), "GPP_File_1")
self.assertEqual(gpp_ci.get_id(), "test_affinity_node:GPP_1")
self.assertEqual(gpp_ci.get_usagename(), "GPP_1")
self.assertEqual(len(gpp_ci.affinity.get_simpleref()),2)
self.assertEqual(gpp_ci.affinity.get_simpleref()[0].refid, "affinity::exec_directive_class")
self.assertEqual(gpp_ci.affinity.get_simpleref()[0].value, "socket")
self.assertEqual(gpp_ci.affinity.get_simpleref()[1].refid, "affinity::exec_directive_value")
self.assertEqual(gpp_ci.affinity.get_simpleref()[1].value, "0")
# Verify that we can write the output and still be DTD valid
tmpfile = tempfile.mktemp()
try:
tmp = open(tmpfile, "w")
dcd.export(tmp, 0)
tmp.close()
status = self._xmllint(tmpfile, "DCD")
self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
finally:
try:
os.remove(tmpfile)
except OSError:
pass
def setUp(self):
self._dbfile = tempfile.mktemp()
self._domainBooter_1, self._domainManager_1 = self.launchDomainManager(endpoint="giop:tcp::5679", dbURI=self._dbfile)
self._domainBooter_2, self._domainManager_2 = launchDomain(2, self._root)
def load_logging_config_uri(orb, uri, binding=None):
scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
if scheme == "file":
ossie.utils.log4py.config.fileConfig(path, binding)
elif scheme == "sca":
q = dict([x.split("=") for x in query.split("&")])
try:
fileSys = orb.string_to_object(q["fs"])
except KeyError:
logging.warning("sca URI missing fs query parameter")
else:
if fileSys == None:
logging.warning("Failed to lookup file system")
else:
try:
t = tempfile.mktemp()
tf = open(t, "w+")
scaFile = fileSys.open(path, True)
fileSize = scaFile.sizeOf()
buf = scaFile.read(fileSize)
tf.write(buf)
tf.close()
scaFile.close()
ossie.utils.log4py.config.fileConfig(t)
finally:
os.remove(t)
else:
# Invalid scheme
logging.warning("Invalid logging config URI scheme")