Python io 模块,IOBase() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.IOBase()。
def __is_or_has_file(self, data):
'''
Figure out if we have been given a file-like object as one of the inputs to the function that called this.
Is a bit clunky because 'file' doesn't exist as a bare-word type check in Python 3 and built in file objects
are not instances of io.<anything> in Python 2
https://stackoverflow.com/questions/1661262/check-if-object-is-file-like-in-python
Returns:
Boolean - True if we have a file-like object
'''
if (hasattr(data, 'file')):
data = data.file
try:
return isinstance(data, file)
except NameError:
from io import IOBase
return isinstance(data, IOBase)
def cmd_stmt_execute(self, statement_id, data=(), parameters=(), flags=0):
"""Execute a prepared MySQL statement"""
parameters = list(parameters)
long_data_used = {}
if data:
for param_id, _ in enumerate(parameters):
if isinstance(data[param_id], IOBase):
binary = True
try:
binary = 'b' not in data[param_id].mode
except AttributeError:
pass
self.cmd_stmt_send_long_data(statement_id, param_id,
data[param_id])
long_data_used[param_id] = (binary,)
execute_packet = self._protocol.make_stmt_execute(
statement_id, data, tuple(parameters), flags,
long_data_used, self.charset)
packet = self._send_cmd(ServerCmd.STMT_EXECUTE, packet=execute_packet)
result = self._handle_binary_result(packet)
return result
def outputpapertemplate(self, dest, listchar, output=None):
if output == None:
output = PyPDF2.PdfFileWriter()
while listchar:
iopage = self.outputtemplateonepage(listchar)
page = PyPDF2.PdfFileReader(iopage)
output.addPage(page.getPage(0))
if dest != None:
if isinstance(dest, str): # when dest is a file path
destdir = os.path.dirname(dest)
if destdir != '' and not os.path.isdir(destdir):
os.makedirs(destdir)
with open(dest, "wb") as w:
output.write(w)
else: # when dest is io.IOBase
output.write(dest)
else:
return output
def __init__(self, record_type=None, record_name=None, data=None):
self._message_begin = self._message_end = False
self._type = self._name = self._data = ''
if not (record_type is None and record_name is None):
self.type = record_type if record_type is not None else 'unknown'
if record_name is not None:
self.name = record_name
if data is not None:
self.data = data
elif data is not None:
if isinstance(data, (bytearray, str)):
data = io.BytesIO(data)
if isinstance(data, io.IOBase):
self._read(data)
else:
raise TypeError("invalid data argument type")
def __init__(self, f, summary=True):
"""Args:
+ f: Either a file name or a seekable binary stream.
+ summary: If True, call self.read_summary().
"""
super().__init__(0)
if isinstance(f, IOBase):
self.stream = f
else:
self.stream = open(f, 'rb')
self.stream.seek(0, SEEK_END)
self.stream_size = self.stream.tell()
self.stream.seek(0, SEEK_SET)
if summary:
self.read_summary()
def cmd_stmt_execute(self, statement_id, data=(), parameters=(), flags=0):
"""Execute a prepared MySQL statement"""
parameters = list(parameters)
long_data_used = {}
if data:
for param_id, _ in enumerate(parameters):
if isinstance(data[param_id], IOBase):
binary = True
try:
binary = 'b' not in data[param_id].mode
except AttributeError:
pass
self.cmd_stmt_send_long_data(statement_id, param_id,
data[param_id])
long_data_used[param_id] = (binary,)
execute_packet = self._protocol.make_stmt_execute(
statement_id, data, tuple(parameters), flags,
long_data_used, self.charset)
packet = self._send_cmd(ServerCmd.STMT_EXECUTE, packet=execute_packet)
result = self._handle_binary_result(packet)
return result
def _find_options_in_meta(self, content):
"""Reads 'content' and extracts options encoded in HTML meta tags
:param content: str or file-like object - contains HTML to parse
returns:
dict: {config option: value}
"""
if (isinstance(content, io.IOBase)
or content.__class__.__name__ == 'StreamReaderWriter'):
content = content.read()
found = {}
for x in re.findall('<meta [^>]*>', content):
if re.search('name=["\']%s' % self.config.meta_tag_prefix, x):
name = re.findall('name=["\']%s([^"\']*)' %
self.config.meta_tag_prefix, x)[0]
found[name] = re.findall('content=["\']([^"\']*)', x)[0]
return found
def test_wrap_non_iobase():
class FakeFile:
def close(self): # pragma: no cover
pass
def write(self): # pragma: no cover
pass
wrapped = FakeFile()
assert not isinstance(wrapped, io.IOBase)
async_file = trio.wrap_file(wrapped)
assert isinstance(async_file, AsyncIOWrapper)
del FakeFile.write
with pytest.raises(TypeError):
trio.wrap_file(FakeFile())
def __init__(self, record_type=None, record_name=None, data=None):
self._message_begin = self._message_end = False
self._type = self._name = self._data = ''
if not (record_type is None and record_name is None):
self.type = record_type if record_type is not None else 'unknown'
if record_name is not None:
self.name = record_name
if data is not None:
self.data = data
elif data is not None:
if isinstance(data, (bytearray, str)):
data = io.BytesIO(data)
if isinstance(data, io.IOBase):
self._read(data)
else:
raise TypeError("invalid data argument type")
def wrap_file_object(fileobj):
"""Handle differences in Python 2 and 3 around writing bytes."""
# If it's not an instance of IOBase, we're probably using Python 2 and
# that is less finnicky about writing text versus bytes to a file.
if not isinstance(fileobj, io.IOBase):
return fileobj
# At this point we're using Python 3 and that will mangle text written to
# a file written in bytes mode. So, let's check if the file can handle
# text as opposed to bytes.
if isinstance(fileobj, io.TextIOBase):
return fileobj
# Finally, we've determined that the fileobj passed in cannot handle text,
# so we use TextIOWrapper to handle the conversion for us.
return io.TextIOWrapper(fileobj)
def get_gramet_image_url(url_or_fp):
img_src = ''
if isinstance(url_or_fp, io.IOBase):
# noinspection PyUnresolvedReferences
data = url_or_fp.read()
u = urlsplit(OGIMET_URL)
else:
u = urlsplit(url_or_fp)
import requests
r = requests.get(url_or_fp)
data = r.text
if data:
m = re.search(r'<img src="([^"]+/gramet_[^"]+)"', data)
if m:
img_src = "{url.scheme}://{url.netloc}{path}".format(
url=u, path=m.group(1))
return img_src
def copy_file_data(src_file, dst_file, chunk_size=None):
"""Copy data from one file object to another.
Arguments:
src_file (io.IOBase): File open for reading.
dst_file (io.IOBase): File open for writing.
chunk_size (int, optional): Number of bytes to copy at
a time (or `None` to use sensible default).
"""
chunk_size = chunk_size or io.DEFAULT_BUFFER_SIZE
read = src_file.read
write = dst_file.write
# The 'or None' is so that it works with binary and text files
for chunk in iter(lambda: read(chunk_size) or None, None):
write(chunk)
def _find_options_in_meta(self, content):
"""Reads 'content' and extracts options encoded in HTML meta tags
:param content: str or file-like object - contains HTML to parse
returns:
dict: {config option: value}
"""
if (isinstance(content, io.IOBase)
or content.__class__.__name__ == 'StreamReaderWriter'):
content = content.read()
found = {}
for x in re.findall('<meta [^>]*>', content):
if re.search('name=["\']%s' % self.configuration.meta_tag_prefix, x):
name = re.findall('name=["\']%s([^"\']*)' %
self.configuration.meta_tag_prefix, x)[0]
found[name] = re.findall('content=["\']([^"\']*)', x)[0]
return found
def _upload_media_py3(self, media_type, media_file, extension=''):
if isinstance(media_file, io.IOBase) and hasattr(media_file, 'name'):
extension = media_file.name.split('.')[-1].lower()
if not is_allowed_extension(extension):
raise ValueError('Invalid file type.')
filename = media_file.name
elif isinstance(media_file, io.BytesIO):
extension = extension.lower()
if not is_allowed_extension(extension):
raise ValueError('Please provide \'extension\' parameters when the type of \'media_file\' is \'io.BytesIO\'.')
filename = 'temp.' + extension
else:
raise ValueError('Parameter media_file must be io.BufferedIOBase(open a file with \'rb\') or io.BytesIO object.')
return self.request.post(
url='https://api.weixin.qq.com/cgi-bin/media/upload',
params={
'type': media_type,
},
files={
'media': (filename, media_file, convert_ext_to_mime(extension))
}
)
def truncate(self, size=None):
"""
Resize the stream to the given size in bytes (or the current position
if size is not specified). This resizing can extend or reduce the
current file size. The new file size is returned.
In prior versions of picamera, truncation also changed the position of
the stream (because prior versions of these stream classes were
non-seekable). This functionality is now deprecated; scripts should
use :meth:`~io.IOBase.seek` and :meth:`truncate` as one would with
regular :class:`~io.BytesIO` instances.
"""
if size is not None:
warnings.warn(
PiCameraDeprecated(
'This method changes the position of the stream to the '
'truncated length; this is deprecated functionality and '
'you should not rely on it (seek before or after truncate '
'to ensure position is consistent)'))
super(PiArrayOutput, self).truncate(size)
if size is not None:
self.seek(size)
def test_good_response(self, resolwe_mock, requests_mock, os_mock, open_mock):
resolwe_mock.configure_mock(**self.config)
os_mock.path.isfile.return_value = True
# When mocking open one wants it to return a "file-like" mock: (spec=io.IOBase)
mock_open.return_value = MagicMock(spec=io.IOBase)
requests_mock.get.return_value = MagicMock(ok=True,
**{'iter_content.return_value': range(3)})
Resolwe._download_files(resolwe_mock, self.file_list)
self.assertEqual(resolwe_mock.logger.info.call_count, 3)
# This asserts may seem wierd. To check what is happening behind the scenes:
# print(open_mock.mock_calls)
self.assertEqual(open_mock.return_value.__enter__.return_value.write.call_count, 6)
# Why 6? 2 files in self.file_list, each downloads 3 chunks (defined in response mock)
def add_fields(self, *fields):
to_add = list(fields)
while to_add:
rec = to_add.pop(0)
if isinstance(rec, io.IOBase):
k = guess_filename(rec, 'unknown')
self.add_field(k, rec)
elif isinstance(rec, (MultiDictProxy, MultiDict)):
to_add.extend(rec.items())
elif isinstance(rec, (list, tuple)) and len(rec) == 2:
k, fp = rec
self.add_field(k, fp)
else:
raise TypeError('Only io.IOBase, multidict and (name, file) '
'pairs allowed, use .add_field() for passing '
'more complex parameters, got {!r}'
.format(rec))
def __init__(self, obj, headers=None, *, chunk_size=8192):
if headers is None:
headers = CIMultiDict()
elif not isinstance(headers, CIMultiDict):
headers = CIMultiDict(headers)
self.obj = obj
self.headers = headers
self._chunk_size = chunk_size
self._fill_headers_with_defaults()
self._serialize_map = {
bytes: self._serialize_bytes,
str: self._serialize_str,
io.IOBase: self._serialize_io,
MultipartWriter: self._serialize_multipart,
('application', 'json'): self._serialize_json,
('application', 'x-www-form-urlencoded'): self._serialize_form
}
def _guess_content_length(self, obj):
if isinstance(obj, bytes):
return len(obj)
elif isinstance(obj, str):
*_, params = parse_mimetype(self.headers.get(CONTENT_TYPE))
charset = params.get('charset', 'us-ascii')
return len(obj.encode(charset))
elif isinstance(obj, io.StringIO):
*_, params = parse_mimetype(self.headers.get(CONTENT_TYPE))
charset = params.get('charset', 'us-ascii')
return len(obj.getvalue().encode(charset)) - obj.tell()
elif isinstance(obj, io.BytesIO):
return len(obj.getvalue()) - obj.tell()
elif isinstance(obj, io.IOBase):
try:
return os.fstat(obj.fileno()).st_size - obj.tell()
except (AttributeError, OSError):
return None
else:
return None
def upload(self, file, name: str=None, path: str=None) -> StackFile:
"""
Upload a file to Stack
:param file: IO pointer or string containing a path to a file
:param name: Custom name which will be used on the remote server, defaults to file.name
:param path: Path to upload it to, defaults to current working directory
:return: Instance of a stack file
"""
if not path:
path = self.__cwd
if isinstance(file, IOBase):
return self.__upload(file=file, path=path, name=name)
if isinstance(file, str):
with open(file, "rb") as fd:
return self.__upload(file=fd, path=path, name=name)
raise StackException("File should either be a path to a file on disk or an IO type, got: {}".format(type(file)))
def analyse_file(self, filename, token=None, language=None):
token = token or self.token
if token is None:
raise RecastError("Token is missing")
language = language or self.language
filename = open(filename, 'rb') if not isinstance(filename, io.IOBase) else filename
body = {'voice': filename}
if language is not None:
body['language'] = language
response = requests.post(
Utils.REQUEST_ENDPOINT,
files=body,
headers={'Authorization': "Token {}".format(token)}
)
if response.status_code != requests.codes.ok:
raise RecastError(response.json().get('message', ''))
return Response(response.json()['results'])
def close(self):
"""Delete the IO object and close the input file and the output file"""
if self.__closed:
# avoid double close
return
deleted = False
try:
# on posix, one can remove a file while it's opend by a process
# the file then will be not visable to others, but process still have the file descriptor
# it is recommand to remove temp file before close it on posix to avoid race
# on nt, it will just fail and raise OSError so that after closing remove it again
self.__del_files()
deleted = True
except OSError:
pass
if isinstance(self.input_file, IOBase):
self.input_file.close()
if isinstance(self.output_file, IOBase):
self.output_file.close()
if not deleted:
self.__del_files()
self.__closed = True
def __new__(cls, **kwargs):
"""Patch for abstractmethod-like enforcement in io.IOBase grandchildren."""
if (
not (hasattr(cls, '_read_bytes') and callable(cls._read_bytes))
or not (hasattr(cls, '_prep_message') and callable(cls._read_bytes))
or not hasattr(cls, '_config_class')
):
raise TypeError("Can't instantiate abstract class {}".format(cls.__name__))
instance = super(_EncryptionStream, cls).__new__(cls)
config = kwargs.pop('config', None)
if not isinstance(config, instance._config_class): # pylint: disable=protected-access
config = instance._config_class(**kwargs) # pylint: disable=protected-access
instance.config = config
instance.bytes_read = 0
instance.output_buffer = b''
instance._message_prepped = False # pylint: disable=protected-access
instance.source_stream = instance.config.source
instance._stream_length = instance.config.source_length # pylint: disable=protected-access
return instance
def deserialize(data):
if isinstance(data, IOBase):
try:
data.seek(0)
except UnsupportedOperation:
pass
if hasattr(data, 'readall'):
data = data.readall()
else:
data = data.read()
if isinstance(data, bytes):
data = str(data, encoding='utf-8')
if isinstance(data, str):
try:
data = json.loads(data, object_hook=collections.OrderedDict)
except json.JSONDecodeError as e:
data = yaml.load(data)
return data
def cmd_stmt_execute(self, statement_id, data=(), parameters=(), flags=0):
"""Execute a prepared MySQL statement"""
parameters = list(parameters)
long_data_used = {}
if data:
for param_id, _ in enumerate(parameters):
if isinstance(data[param_id], IOBase):
binary = True
try:
binary = 'b' not in data[param_id].mode
except AttributeError:
pass
self.cmd_stmt_send_long_data(statement_id, param_id,
data[param_id])
long_data_used[param_id] = (binary,)
execute_packet = self._protocol.make_stmt_execute(
statement_id, data, tuple(parameters), flags,
long_data_used, self.charset)
packet = self._send_cmd(ServerCmd.STMT_EXECUTE, packet=execute_packet)
result = self._handle_binary_result(packet)
return result
def _is_filelike_object(f):
try:
return isinstance(f, (file, io.IOBase))
except NameError:
# 'file' is not a class in python3
return isinstance(f, io.IOBase)
def put(self, data: Union[bytes, IOBase], *, format: str,
graph: Optional[IRI] = None):
async with self._crud_request("PUT", graph=graph, data=data,
content_type=format) as resp:
resp.raise_for_status()
def post(self, data: Union[bytes, IOBase], *, format: str,
graph: Optional[IRI] = None):
async with self._crud_request("POST", graph=graph, data=data,
content_type=format) as resp:
resp.raise_for_status()
def __iter__(self): # type: ignore
# Until https://github.com/python/typing/issues/11
# there's no good way to tell mypy about custom
# iterators that subclass io.IOBase.
"""Let this class act as an iterator."""
return self
def sanitize_for_serialization(obj):
"""
Sanitize an object for Request.
If obj is None, return None.
If obj is str, int, float, bool, return directly.
If obj is datetime.datetime, datetime.date convert to string in iso8601 format.
If obj is list, santize each element in the list.
If obj is dict, return the dict.
If obj is swagger model, return the properties dict.
"""
if isinstance(obj, type(None)):
return None
elif isinstance(obj, (str, int, float, bool, io.IOBase, tuple)):
return obj
elif isinstance(obj, list):
return [NbClientManager.sanitize_for_serialization(sub_obj) for sub_obj in obj]
elif isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
else:
if isinstance(obj, dict):
obj_dict = obj
else:
# Convert model obj to dict except attributes `swaggerTypes`, `attributeMap`
# and attributes which value is not None.
# Convert attribute name to json key in model definition for
# request.
obj_dict = {obj.attributeMap[key]: val
for key, val in obj.__dict__.items()
if key != 'swaggerTypes' and key != 'attributeMap' and val is not None}
return {key: NbClientManager.sanitize_for_serialization(val)
for (key, val) in obj_dict.items()}
def read(obj):
"""Context manager for reading data from multiple sources as a file object
Args:
obj (string|Path|file object): Data to read / read from
If obj is a file object, this is just a pass through
If obj is a Path object, this is similar to obj.open()
If obj is a string, this creates a StringIO so
the data can be read like a file object
Returns:
file object: File handle containing data
"""
try: # Python 2 compatibility
is_unicode = isinstance(obj, unicode)
except NameError:
is_unicode = False
is_open = False
if isinstance(obj, Path):
fh = obj.open()
is_open = True
elif isinstance(obj, str) or is_unicode:
fh = StringIO(obj)
fh.name = '<string>'
elif isinstance(obj, IOBase):
fh = obj
else:
raise Exception("Unknown input type {}".format(type(obj).__name__))
try:
yield fh
finally:
if is_open:
fh.close()
def is_file_like_obj(obj):
"""
Helper function to check that the given object implements all public methods of the
:class:`~io.IOBase` abstract class.
"""
return all((hasattr(obj, name) for name in dir(io.IOBase) if not name.startswith('_')))
def test_scratch_file_supports_file_obj_interface(active_scratch_dir, method_name):
"""
Assert that methods of :class:`~scratchdir.ScratchDir` that are expected to return file-like objects
do so and these objects implement, atleast, the :class:`~io.IOBase` interface.
"""
method = getattr(active_scratch_dir, method_name)
assert is_file_like_obj(method())
def __init__(self, file, start):
assert hasattr(file, 'read') and hasattr(file, 'tell') and hasattr(file, 'seek')
assert isinstance(start, int)
self.file, self.start = file, start
self.total_len = total_len(file)
self.len = self.total_len - start
io.IOBase.__init__(self)
try:
self.seek(0)
except:
pass
def store(self, data, blob):
if not isinstance(data, io.IOBase):
raise NotStorable('Could not store data (not of "file").')
filename = getattr(data, 'name', None)
if filename is not None:
blob.consumeFile(filename)
return
def isFile(self, path=None):
# dirty hack to check where file is opened with codecs module
# (because it returns 'instance' type when encoding is specified
if path:
return isinstance(path, io.IOBase) or path.__class__.__name__ == 'StreamReaderWriter'
else:
return 'file' in self.type
def aclose(self):
"""Like :meth:`io.IOBase.close`, but async.
This is also shielded from cancellation; if a cancellation scope is
cancelled, the wrapped file object will still be safely closed.
"""
# ensure the underling file is closed during cancellation
with _core.open_cancel_scope(shield=True):
await trio.run_sync_in_worker_thread(self._wrapped.close)
await _core.checkpoint_if_cancelled()
def test_feed_download_output(mock_open, connection, feed, start_time,
end_time, feed_download_url, feed_report_csv):
"""Verifies feed download writing to a file."""
mock_open.return_value = mock.MagicMock(spec=io.IOBase)
httpretty.register_uri(
httpretty.POST,
'{}/feed/{}/prepare'.format(
matchlight.MATCHLIGHT_API_URL_V2,
feed.name),
body=json.dumps({'feed_response_id': 1}),
content_type='application/json', status=200)
httpretty.register_uri(
httpretty.POST,
'{}/feed/{}/link'.format(
matchlight.MATCHLIGHT_API_URL_V2,
feed.name),
responses=[
httpretty.Response(
body=json.dumps({'status': 'pending'}),
content_type='application/json',
status=200),
httpretty.Response(
body=json.dumps({
'status': 'ready',
'url': feed_download_url,
}),
content_type='application/json',
status=200),
],
)
body = '\n'.join(feed_report_csv).encode('utf-8')
httpretty.register_uri(
httpretty.GET, feed_download_url,
content_type='text/csv',
body=body)
connection.feeds.download(
feed, start_time, end_time, save_path='/tmp/output')
file_handle = mock_open.return_value.__enter__.return_value
file_handle.write.assert_called_once_with(body)
def _get_file_object(csvfile, encoding=None):
if isinstance(csvfile, str):
assert encoding, 'encoding required for file path'
return open(csvfile, 'rt', encoding=encoding, newline='') # <- EXIT!
if hasattr(csvfile, 'mode'):
assert 'b' not in csvfile.mode, "File must be open in text mode ('rt')."
elif issubclass(csvfile.__class__, io.IOBase):
assert issubclass(csvfile.__class__, io.TextIOBase), ("Stream object must inherit "
"from io.TextIOBase.")
return csvfile
def _py2_get_file_object(csvfile, encoding):
if isinstance(csvfile, str):
return open(csvfile, 'rb') # <- EXIT!
if hasattr(csvfile, 'mode'):
assert 'b' in csvfile.mode, ("When using Python 2, file must "
"be open in binary mode ('rb').")
elif issubclass(csvfile.__class__, io.IOBase):
assert not issubclass(csvfile.__class__, io.TextIOBase), ("When using Python 2, "
"must use byte stream "
"(not text stream).")
return csvfile
def from_csv(cls, file, encoding=None, **fmtparams):
"""Create a DataSource from a CSV *file* (a path or file-like
object)::
source = datatest.DataSource.from_csv('mydata.csv')
If *file* is an iterable of files, data will be loaded and
aligned by column name::
files = ['mydata1.csv', 'mydata2.csv']
source = datatest.DataSource.from_csv(files)
"""
if isinstance(file, string_types) or isinstance(file, IOBase):
file = [file]
new_cls = cls.__new__(cls)
temptable = _from_csv(file, encoding, **fmtparams)
new_cls._connection = temptable.connection
new_cls._table = temptable.name
repr_string = '{0}.from_csv({1}{2}{3})'.format(
new_cls.__class__.__name__,
repr(file[0]) if len(file) == 1 else repr(file),
', {0!r}'.format(encoding) if encoding else '',
', **{0!r}'.format(fmtparams) if fmtparams else '',
)
new_cls._repr_string = repr_string
return new_cls
def file_output(file_object):
"""
Writes strings to a file, making sure there's a newline at the end
:param:
- `file_object`: opened, writable file or name of file to open
"""
from io import IOBase
if not isinstance(file_object, IOBase):
file_object = open(file_object, WRITEABLE)
while True:
line = (yield)
line = line.rstrip(NEWLINE) + NEWLINE
file_object.write(line)
def _is_file(f):
return isinstance(f, io.IOBase)
def _is_file(f):
return isinstance(f, io.IOBase)