Python io 模块,BytesIO() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.BytesIO()。
def fetch_data():
try:
r = requests.get(MTG_JSON_URL)
except requests.ConnectionError:
r = requests.get(FALLBACK_MTG_JSON_URL)
with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive:
unzipped_files = archive.infolist()
if len(unzipped_files) != 1:
raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.")
data = archive.read(archive.infolist()[0])
decoded_data = data.decode('utf-8')
sets_data = json.loads(decoded_data)
return sets_data
def zip_dir(directory):
"""zip a directory tree into a BytesIO object"""
result = io.BytesIO()
dlen = len(directory)
with ZipFile(result, "w") as zf:
for root, dirs, files in os.walk(directory):
for name in files:
full = os.path.join(root, name)
rel = root[dlen:]
dest = os.path.join(rel, name)
zf.write(full, dest)
return result
#
# Simple progress bar
#
def encode_jpeg(arr):
assert arr.dtype == np.uint8
# simulate multi-channel array for single channel arrays
if len(arr.shape) == 3:
arr = np.expand_dims(arr, 3) # add channels to end of x,y,z
arr = arr.transpose((3,2,1,0)) # channels, z, y, x
reshaped = arr.reshape(arr.shape[3] * arr.shape[2], arr.shape[1] * arr.shape[0])
if arr.shape[0] == 1:
img = Image.fromarray(reshaped, mode='L')
elif arr.shape[0] == 3:
img = Image.fromarray(reshaped, mode='RGB')
else:
raise ValueError("Number of image channels should be 1 or 3. Got: {}".format(arr.shape[3]))
f = io.BytesIO()
img.save(f, "JPEG")
return f.getvalue()
def fromqimage(im):
buffer = QBuffer()
buffer.open(QIODevice.ReadWrite)
# preserve alha channel with png
# otherwise ppm is more friendly with Image.open
if im.hasAlphaChannel():
im.save(buffer, 'png')
else:
im.save(buffer, 'ppm')
b = BytesIO()
try:
b.write(buffer.data())
except TypeError:
# workaround for Python 2
b.write(str(buffer.data()))
buffer.close()
b.seek(0)
return Image.open(b)
def tell(self):
"""
Allows reference to our object from within a Codec()
"""
if not self.filepath:
# If there is no filepath, then we're probably dealing with a
# stream in memory like a StringIO or BytesIO stream.
if self.stream:
# Advance to the end of the file
return self.stream.tell()
else:
if self.stream and self._dirty is True:
self.stream.flush()
self._dirty = False
if not self.stream:
if not self.open(mode=NNTPFileMode.BINARY_RO):
return None
return self.stream.tell()
def openStream(self, source):
"""Produces a file object from source.
source can be either a file object, local filename or a string.
"""
# Already a file object
if hasattr(source, 'read'):
stream = source
else:
stream = BytesIO(source)
try:
stream.seek(stream.tell())
except: # pylint:disable=bare-except
stream = BufferedStream(stream)
return stream
def rollover(self):
if self._rolled: return
file = self._file
newfile = self._file = TemporaryFile(**self._TemporaryFileArgs)
del self._TemporaryFileArgs
newfile.write(file.getvalue())
newfile.seek(file.tell(), 0)
self._rolled = True
# The method caching trick from NamedTemporaryFile
# won't work here, because _file may change from a
# BytesIO/StringIO instance to a real file. So we list
# all the methods directly.
# Context management protocol
def test_load_config(self):
"""
Test loading of the config attribute
"""
usrmgr = self.__get_dummy_object()
with patch("os.path.exists", return_value=True),\
patch("ownbot.usermanager.open") as open_mock:
open_mock.return_value = io.BytesIO(b"""
foogroup:
unverified:
- '@foouser'""")
expected_config = {"foogroup": {"unverified": ["@foouser"]}}
self.assertEqual(usrmgr.config, expected_config)
self.assertTrue(open_mock.called)
def zip_dir(directory):
"""zip a directory tree into a BytesIO object"""
result = io.BytesIO()
dlen = len(directory)
with ZipFile(result, "w") as zf:
for root, dirs, files in os.walk(directory):
for name in files:
full = os.path.join(root, name)
rel = root[dlen:]
dest = os.path.join(rel, name)
zf.write(full, dest)
return result
#
# Simple progress bar
#
def close(self, *args, **kwargs):
"""
Engine closed, copy file to DB
"""
super(DatabaseWrapper, self).close(*args, **kwargs)
signature_version = self.settings_dict.get("SIGNATURE_VERSION", "s3v4")
s3 = boto3.resource('s3',
config=botocore.client.Config(signature_version=signature_version))
try:
with open(self.settings_dict['NAME'], 'rb') as f:
fb = f.read()
bytesIO = BytesIO()
bytesIO.write(fb)
bytesIO.seek(0)
s3_object = s3.Object(self.settings_dict['BUCKET'], self.settings_dict['REMOTE_NAME'])
result = s3_object.put('rb', Body=bytesIO)
except Exception as e:
print(e)
logging.debug("Saved to remote DB!")
def getWebPage(url, headers, cookies, postData=None):
try:
if (postData):
params = urllib.parse.urlencode(postData)
params = params.encode('utf-8')
request = urllib.request.Request(url, data=params, headers=headers)
else:
print('Fetching '+url)
request = urllib.request.Request(url, None, headers)
request.add_header('Cookie', cookies)
if (postData):
response = urllib.request.build_opener(urllib.request.HTTPCookieProcessor).open(request)
else:
response = urllib.request.urlopen(request)
if response.info().get('Content-Encoding') == 'gzip':
buf = BytesIO(response.read())
f = gzip.GzipFile(fileobj=buf)
r = f.read()
else:
r = response.read()
return r
except Exception as e:
print("Error processing webpage: "+str(e))
return None
## https://stackoverflow.com/questions/480214/how-do-you-remove-duplicates-from-a-list-in-python-whilst-preserving-order
def display_graph(g, format='svg', include_asset_exists=False):
"""
Display a TermGraph interactively from within IPython.
"""
try:
import IPython.display as display
except ImportError:
raise NoIPython("IPython is not installed. Can't display graph.")
if format == 'svg':
display_cls = display.SVG
elif format in ("jpeg", "png"):
display_cls = partial(display.Image, format=format, embed=True)
out = BytesIO()
_render(g, out, format, include_asset_exists=include_asset_exists)
return display_cls(data=out.getvalue())
def _body(self):
try:
read_func = self.environ['wsgi.input'].read
except KeyError:
self.environ['wsgi.input'] = BytesIO()
return self.environ['wsgi.input']
body_iter = self._iter_chunked if self.chunked else self._iter_body
body, body_size, is_temp_file = BytesIO(), 0, False
for part in body_iter(read_func, self.MEMFILE_MAX):
body.write(part)
body_size += len(part)
if not is_temp_file and body_size > self.MEMFILE_MAX:
body, tmp = TemporaryFile(mode='w+b'), body
body.write(tmp.getvalue())
del tmp
is_temp_file = True
self.environ['wsgi.input'] = body
body.seek(0)
return body
def open_device(vendor_id, product_id, interface_number):
"""Opens and returns the HID device (file-like object). Raise IOError if
the device or interface is not available.
Arguments:
vendor_id -- the mouse vendor id (e.g. 0x1038)
product_id -- the mouse product id (e.g. 0x1710)
interface_number -- the interface number (e.g. 0x00)
"""
# Dry run
if debug.DEBUG and debug.DRY and is_device_plugged(vendor_id, product_id):
device = BytesIO() # Moke the device
device.send_feature_report = device.write
return device
# Real device
for interface in hid.enumerate(vendor_id, product_id):
if interface["interface_number"] != interface_number:
continue
device = hid.device()
device.open_path(interface["path"])
return device
raise IOError("Unable to find the requested device: %04X:%04X:%02X" % (
vendor_id, product_id, interface_number))
def print_chapters(self,show=False):
'''
??????
Display infos of chapters.
'''
headers={'use-agent':"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36",'Referer':'http://manhua.dmzj.com/tags/s.shtml'}
text='There are {n} chapters in comic {c}:\n{chs}'.format(n=self.chapter_num,c=self.comic_title,chs='\n'.join([info[0] for info in self.chapter_urls]))
print(text)
if show:
try:
res=requests.get(self.cover,headers=headers)
if b'403' in res.content:
raise ValueError('Got cover img failed')
out=BytesIO(res.content)
out.seek(0)
Image.open(out).show()
except (ConnectionError,ValueError):
traceback.print_exc()
return text
def file(self):
"""
Returns a file pointer to this binary
:example:
>>> process_obj = c.select(Process).where("process_name:svch0st.exe").first()
>>> binary_obj = process_obj.binary
>>> print(binary_obj.file.read(2))
MZ
"""
# TODO: I don't like reaching through to the session...
with closing(self._cb.session.get("/api/v1/binary/{0:s}".format(self.md5sum), stream=True)) as r:
z = StringIO(r.content)
zf = ZipFile(z)
fp = zf.open('filedata')
return fp
def sendto(self, bytes, *args, **kwargs):
if self.type != socket.SOCK_DGRAM:
return super(socksocket, self).sendto(bytes, *args, **kwargs)
if not self._proxyconn:
self.bind(("", 0))
address = args[-1]
flags = args[:-1]
header = BytesIO()
RSV = b"\x00\x00"
header.write(RSV)
STANDALONE = b"\x00"
header.write(STANDALONE)
self._write_SOCKS5_address(address, header)
sent = super(socksocket, self).send(header.getvalue() + bytes, *flags, **kwargs)
return sent - header.tell()
def recvfrom(self, bufsize, flags=0):
if self.type != socket.SOCK_DGRAM:
return super(socksocket, self).recvfrom(bufsize, flags)
if not self._proxyconn:
self.bind(("", 0))
buf = BytesIO(super(socksocket, self).recv(bufsize + 1024, flags))
buf.seek(2, SEEK_CUR)
frag = buf.read(1)
if ord(frag):
raise NotImplementedError("Received UDP packet fragment")
fromhost, fromport = self._read_SOCKS5_address(buf)
if self.proxy_peername:
peerhost, peerport = self.proxy_peername
if fromhost != peerhost or peerport not in (0, fromport):
raise socket.error(EAGAIN, "Packet filtered")
return (buf.read(bufsize), (fromhost, fromport))
def test_write_svg():
# Test with default options
qr = segno.make_qr('test')
out = io.BytesIO()
qr.save(out, kind='svg')
xml_str = out.getvalue()
assert xml_str.startswith(b'<?xml')
root = _parse_xml(out)
assert 'viewBox' not in root.attrib
assert 'height' in root.attrib
assert 'width' in root.attrib
css_class = root.attrib.get('class')
assert css_class
assert 'segno' == css_class
path_el = _get_path(root)
assert path_el is not None
path_class = path_el.get('class')
assert 'qrline' == path_class
stroke = path_el.get('stroke')
assert stroke == '#000'
title_el = _get_title(root)
assert title_el is None
desc_el = _get_desc(root)
assert desc_el is None
def test_write_svg_black():
# Test with default options
qr = segno.make_qr('test')
out = io.BytesIO()
qr.save(out, kind='svg', color='bLacK')
xml_str = out.getvalue()
assert xml_str.startswith(b'<?xml')
root = _parse_xml(out)
assert 'viewBox' not in root.attrib
assert 'height' in root.attrib
assert 'width' in root.attrib
css_class = root.attrib.get('class')
assert css_class
assert 'segno' == css_class
path_el = _get_path(root)
assert path_el is not None
path_class = path_el.get('class')
assert 'qrline' == path_class
stroke = path_el.get('stroke')
assert stroke == '#000'
title_el = _get_title(root)
assert title_el is None
desc_el = _get_desc(root)
assert desc_el is None
def test_write_svg_black3():
# Test with default options
qr = segno.make_qr('test')
out = io.BytesIO()
qr.save(out, kind='svg', color=(0, 0, 0))
xml_str = out.getvalue()
assert xml_str.startswith(b'<?xml')
root = _parse_xml(out)
assert 'viewBox' not in root.attrib
assert 'height' in root.attrib
assert 'width' in root.attrib
css_class = root.attrib.get('class')
assert css_class
assert 'segno' == css_class
path_el = _get_path(root)
assert path_el is not None
path_class = path_el.get('class')
assert 'qrline' == path_class
stroke = path_el.get('stroke')
assert stroke == '#000'
title_el = _get_title(root)
assert title_el is None
desc_el = _get_desc(root)
assert desc_el is None
def pam_bw_as_matrix(buff, border):
"""\
Returns the QR code as list of [0, 1] lists.
:param io.BytesIO buff: Buffer to read the matrix from.
"""
res = []
data, size = _image_data(buff)
for i, offset in enumerate(range(0, len(data), size)):
if i < border:
continue
if i >= size - border:
break
row_data = bytearray(data[offset + border:offset + size - border])
# Invert bytes since PAM uses 0x0 = black, 0x1 = white
res.append([b ^ 0x1 for b in row_data])
return res
def pdf_as_matrix(buff, border):
"""\
Reads the path in the PDF and returns it as list of 0, 1 lists.
:param io.BytesIO buff: Buffer to read the matrix from.
"""
pdf = buff.getvalue()
h, w = re.search(br'/MediaBox \[0 0 ([0-9]+) ([0-9]+)\]', pdf,
flags=re.MULTILINE).groups()
if h != w:
raise ValueError('Expected equal height/width, got height="{}" width="{}"'.format(h, w))
size = int(w) - 2 * border
graphic = _find_graphic(buff)
res = [[0] * size for i in range(size)]
for x1, y1, x2, y2 in re.findall(r'\s*(\-?\d+)\s+(\-?\d+)\s+m\s+'
r'(\-?\d+)\s+(\-?\d+)\s+l', graphic):
x1, y1, x2, y2 = [int(i) for i in (x1, y1, x2, y2)]
y = abs(y1)
res[y][x1:x2] = [1] * (x2 - x1)
return res
def test_flow():
from io import BytesIO
# info + flush
inbound = b'\x01\x02\x1a\x00\x01\x02\x12\x00'
data = BytesIO(inbound)
req_type,_ = read_message(data, types.Request)
assert 'info' == req_type.WhichOneof("value")
req_type2, _ = read_message(data, types.Request)
assert 'flush' == req_type2.WhichOneof("value")
assert data.read() == b''
data.close()
data2 = BytesIO(b'')
req_type, fail = read_message(data2, types.Request)
assert fail == 0
assert req_type == None
data3 = BytesIO(b'\x01')
req_type, fail = read_message(data3, types.Request)
assert fail == 0
def bytes2zip(bytes):
"""
RETURN COMPRESSED BYTES
"""
if hasattr(bytes, "read"):
buff = TemporaryFile()
archive = gzip.GzipFile(fileobj=buff, mode='w')
for b in bytes:
archive.write(b)
archive.close()
buff.seek(0)
from pyLibrary.env.big_data import FileString, safe_size
return FileString(buff)
buff = BytesIO()
archive = gzip.GzipFile(fileobj=buff, mode='w')
archive.write(bytes)
archive.close()
return buff.getvalue()
def openstream(self, filename):
"""
Open a stream as a read-only file object (BytesIO).
Note: filename is case-insensitive.
:param filename: path of stream in storage tree (except root entry), either:
- a string using Unix path syntax, for example:
'storage_1/storage_1.2/stream'
- or a list of storage filenames, path to the desired stream/storage.
Example: ['storage_1', 'storage_1.2', 'stream']
:returns: file object (read-only)
:exception IOError: if filename not found, or if this is not a stream.
"""
sid = self._find(filename)
entry = self.direntries[sid]
if entry.entry_type != STGTY_STREAM:
raise IOError("this file is not a stream")
return self._open(entry.isectStart, entry.size)
def __init__(self, image=None, **kw):
# Tk compatibility: file or data
if image is None:
if "file" in kw:
image = Image.open(kw["file"])
del kw["file"]
elif "data" in kw:
from io import BytesIO
image = Image.open(BytesIO(kw["data"]))
del kw["data"]
self.__mode = image.mode
self.__size = image.size
if _pilbitmap_check():
# fast way (requires the pilbitmap booster patch)
image.load()
kw["data"] = "PIL:%d" % image.im.id
self.__im = image # must keep a reference
else:
# slow but safe way
kw["data"] = image.tobitmap()
self.__photo = tkinter.BitmapImage(**kw)
def get_data_hash(self, data_bytes):
"""Calculate Merkle's root hash of the given data bytes"""
# Calculate tree parameters
data_len = len(data_bytes)
tree_populated_width = math.ceil(data_len / self._chunk_len)
tree_height = math.ceil(math.log2(tree_populated_width))
tree_width = int(math.pow(2, tree_height))
tree_bottom_layer = ['\x00'] * tree_width
with io.BytesIO(data_bytes) as b_data:
self._initial_hasher(
b_data,
tree_populated_width,
tree_bottom_layer
)
# Get Merkle's root hash
mrh = self._calculate_root_hash(tree_bottom_layer)
return mrh
def test_simple(self):
pipe = test_helper.get_mock_pipeline([])
mock_mod = test_helper.MockSubscriber()
mock_mod = test_helper.MockSubscriber()
pipe.register_magic(b'\xFF\xEE\xDD', ('mock', mock_mod.consume))
pipe.register_magic(b'\x00\x00\x00', ('mock', mock_mod.consume))
_magic = magic.Subscriber(pipe)
_magic.setup(None)
doc = document.get_document('mock')
content = b'\xFF\xEE\xDDMOCKMOCKMOCK'
_magic.consume(doc, BytesIO(content))
self.assertEquals(True, doc.magic_hit)
self.assertEquals(1, len(mock_mod.produced))
expected = content
actual = mock_mod.produced[0][1].read()
self.assertEquals(expected, actual)
def test_simple(self):
_file_meta = file_meta.Subscriber(test_helper.get_mock_pipeline([]))
response = json.dumps({'Content-Type': 'image/jpeg'}).encode('utf-8')
_file_meta.setup({
'data_root': 'local_data',
'code_root': '.',
'worker_id': 1,
'host': 'mock',
helper.INJECTOR: test_helper.MockInjector(response)
})
doc = document.get_document('mock.txt')
_file_meta.consume(doc, BytesIO(b'mock'))
expected = 'picture'
actual = doc.doctype
self.assertEqual(expected, actual)
def test_simple(self):
mock_pipeline = test_helper.get_mock_pipeline([])
data_root = os.path.join('local_data', 'unittests')
if os.path.exists(data_root):
shutil.rmtree(data_root)
_copy = copy_file.Subscriber(mock_pipeline)
_copy.setup({
helper.DATA_ROOT: data_root,
'workers': 1,
'tag': 'default',
helper.COPY_EXT: ['xyz']
})
_copy.consume(document.get_document('mock.xyz'), BytesIO(b'juba.'))
_copy.consume(document.get_document('ignore.doc'), BytesIO(b'mock'))
expected = ['39bbf948-mock.xyz']
actual = os.listdir(os.path.join(data_root, 'files', 'xyz'))
self.assertEqual(expected, actual)
def reseed(self, netdb):
"""Compress netdb entries and set content"""
zip_file = io.BytesIO()
dat_files = []
for root, dirs, files in os.walk(netdb):
for f in files:
if f.endswith(".dat"):
# TODO check modified time
# may be not older than 10h
dat_files.append(os.path.join(root, f))
if len(dat_files) == 0:
raise PyseederException("Can't get enough netDb entries")
elif len(dat_files) > 75:
dat_files = random.sample(dat_files, 75)
with ZipFile(zip_file, "w", compression=ZIP_DEFLATED) as zf:
for f in dat_files:
zf.write(f, arcname=os.path.split(f)[1])
self.FILE_TYPE = 0x00
self.CONTENT_TYPE = 0x03
self.CONTENT = zip_file.getvalue()
self.CONTENT_LENGTH = len(self.CONTENT)
def _restart_data(self, format_: str='json') -> None:
assert format_ == 'json'
with open(join(CURDIR, 'data', 'helloworld.py')) as f:
testcode = f.read()
self.data = Request({
'filepath': 'test.py',
'action': 'ParseAST',
'content': testcode,
'language': 'python',
})
bufferclass = io.StringIO if format_ == 'json' else io.BytesIO
# This will mock the python_driver stdin
self.sendbuffer = bufferclass()
# This will mock the python_driver stdout
self.recvbuffer = bufferclass()
def __init__(self, codestr: str, astdict: AstDict) -> None:
self._astdict = astdict
# Tokenize and create the noop extractor and the position fixer
self._tokens: List[Token] = [Token(*i) for i in tokenize.tokenize(BytesIO(codestr.encode('utf-8')).readline)]
token_lines = _create_tokenized_lines(codestr, self._tokens)
self.noops_sync = NoopExtractor(codestr, token_lines)
self.pos_sync = LocationFixer(codestr, token_lines)
self.codestr = codestr
# This will store a dict of nodes to end positions, it will be filled
# on parse()
self._node2endpos = None
self.visit_Global = self.visit_Nonlocal = self._promote_names
def __len__(self):
"""
Returns the length of the content
"""
if not self.filepath:
# If there is no filepath, then we're probably dealing with a
# stream in memory like a StringIO or BytesIO stream.
if self.stream:
# Advance to the end of the file
ptr = self.stream.tell()
# Advance to the end of the file and get our length
length = self.stream.seek(0L, SEEK_END)
if length != ptr:
# Return our pointer
self.stream.seek(ptr, SEEK_SET)
else:
# No Stream or Filepath; nothing has been initialized
# yet at all so just return 0
length = 0
else:
if self.stream and self._dirty is True:
self.stream.flush()
self._dirty = False
# Get the size
length = getsize(self.filepath)
return length
def get_stream(self, resource):
return io.BytesIO(self.get_bytes(resource))
def get_resource_stream(self, manager, resource_name):
return io.BytesIO(self.get_resource_string(manager, resource_name))
def prepare_response(self, request, cached):
"""Verify our vary headers match and construct a real urllib3
HTTPResponse object.
"""
# Special case the '*' Vary value as it means we cannot actually
# determine if the cached response is suitable for this request.
if "*" in cached.get("vary", {}):
return
# Ensure that the Vary headers for the cached response match our
# request
for header, value in cached.get("vary", {}).items():
if request.headers.get(header, None) != value:
return
body_raw = cached["response"].pop("body")
headers = CaseInsensitiveDict(data=cached['response']['headers'])
if headers.get('transfer-encoding', '') == 'chunked':
headers.pop('transfer-encoding')
cached['response']['headers'] = headers
try:
body = io.BytesIO(body_raw)
except TypeError:
# This can happen if cachecontrol serialized to v1 format (pickle)
# using Python 2. A Python 2 str(byte string) will be unpickled as
# a Python 3 str (unicode string), which will cause the above to
# fail with:
#
# TypeError: 'str' does not support the buffer interface
body = io.BytesIO(body_raw.encode('utf8'))
return HTTPResponse(
body=body,
preload_content=False,
**cached["response"]
)
def __init__(self, fp, callback):
self.__buf = BytesIO()
self.__fp = fp
self.__callback = callback
def captured_output(stream_name):
"""Return a context manager used by captured_stdout/stdin/stderr
that temporarily replaces the sys stream *stream_name* with a StringIO.
Taken from Lib/support/__init__.py in the CPython repo.
"""
orig_stdout = getattr(sys, stream_name)
setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout))
try:
yield getattr(sys, stream_name)
finally:
setattr(sys, stream_name, orig_stdout)
def test_verifying_zipfile():
if not hasattr(zipfile.ZipExtFile, '_update_crc'):
pytest.skip('No ZIP verification. Missing ZipExtFile._update_crc.')
sio = StringIO()
zf = zipfile.ZipFile(sio, 'w')
zf.writestr("one", b"first file")
zf.writestr("two", b"second file")
zf.writestr("three", b"third file")
zf.close()
# In default mode, VerifyingZipFile checks the hash of any read file
# mentioned with set_expected_hash(). Files not mentioned with
# set_expected_hash() are not checked.
vzf = wheel.install.VerifyingZipFile(sio, 'r')
vzf.set_expected_hash("one", hashlib.sha256(b"first file").digest())
vzf.set_expected_hash("three", "blurble")
vzf.open("one").read()
vzf.open("two").read()
try:
vzf.open("three").read()
except wheel.install.BadWheelFile:
pass
else:
raise Exception("expected exception 'BadWheelFile()'")
# In strict mode, VerifyingZipFile requires every read file to be
# mentioned with set_expected_hash().
vzf.strict = True
try:
vzf.open("two").read()
except wheel.install.BadWheelFile:
pass
else:
raise Exception("expected exception 'BadWheelFile()'")
vzf.set_expected_hash("two", None)
vzf.open("two").read()
def get_resource_stream(self, manager, resource_name):
return io.BytesIO(self.get_resource_string(manager, resource_name))
def quopri_encode(input, errors='strict'):
assert errors == 'strict'
f = BytesIO(input)
g = BytesIO()
quopri.encode(f, g, quotetabs=True)
return (g.getvalue(), len(input))
def quopri_decode(input, errors='strict'):
assert errors == 'strict'
f = BytesIO(input)
g = BytesIO()
quopri.decode(f, g)
return (g.getvalue(), len(input))
def uu_encode(input, errors='strict', filename='<data>', mode=0o666):
assert errors == 'strict'
infile = BytesIO(input)
outfile = BytesIO()
read = infile.read
write = outfile.write
# Encode
write(('begin %o %s\n' % (mode & 0o777, filename)).encode('ascii'))
chunk = read(45)
while chunk:
write(binascii.b2a_uu(chunk))
chunk = read(45)
write(b' \nend\n')
return (outfile.getvalue(), len(input))
def uu_decode(input, errors='strict'):
assert errors == 'strict'
infile = BytesIO(input)
outfile = BytesIO()
readline = infile.readline
write = outfile.write
# Find start of encoded data
while 1:
s = readline()
if not s:
raise ValueError('Missing "begin" line in input data')
if s[:5] == b'begin':
break
# Decode
while True:
s = readline()
if not s or s == b'end\n':
break
try:
data = binascii.a2b_uu(s)
except binascii.Error as v:
# Workaround for broken uuencoders by /Fredrik Lundh
nbytes = (((s[0]-32) & 63) * 4 + 5) // 3
data = binascii.a2b_uu(s[:nbytes])
#sys.stderr.write("Warning: %s\n" % str(v))
write(data)
if not s:
raise ValueError('Truncated input data')
return (outfile.getvalue(), len(input))
def test_load_config_empty_file(self):
"""
Test loading of the config attr if the config file is empty
"""
usrmgr = self.__get_dummy_object()
with patch("os.path.exists", return_value=True),\
patch("ownbot.usermanager.open") as open_mock:
open_mock.return_value = io.BytesIO(b"")
self.assertEqual(usrmgr.config, {})
self.assertTrue(open_mock.called)
def _create_map_image(self):
images = self._get_images()
self._map_image = Image.new('RGBA', self._map_size)
for img in images:
if not isinstance(img, Image.Image):
img = Image.open(BytesIO(img.content)).convert('RGBA')
self._map_image = Image.alpha_composite(self._map_image, img)
def _create_pdf_libreoffice(self):
output_image = BytesIO()
self._map_image.save(output_image, 'PNG')
render = Renderer(media_path='.')
# TODO: use the configuration to select the template
# TODO: use the configuration to select the name of the key in the template
result = render.render('template.odt', my_map=output_image)
with NamedTemporaryFile(
mode='wb+',
prefix='geo-pyprint_',
delete=True
) as generated_odt:
generated_odt.write(result)
generated_odt.flush()
output_name = generated_odt.name + '.pdf'
cmd = [
'unoconv',
'-f',
'pdf',
'-o',
output_name,
generated_odt.name
]
subprocess.call(cmd, timeout=None)
return output_name
def load_verify_locations(self, cafile=None, capath=None, cadata=None):
if cafile is not None:
cafile = cafile.encode('utf-8')
if capath is not None:
capath = capath.encode('utf-8')
self._ctx.load_verify_locations(cafile, capath)
if cadata is not None:
self._ctx.load_verify_locations(BytesIO(cadata))