Python io 模块,SEEK_END 实例源码
我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用io.SEEK_END。
def write(self, writer, base_address):
pos = writer.tell()
writer.seek(0, io.SEEK_END)
end_position = writer.tell()
if self.is_leaf:
# Write the offset back at this nodes address.
writer.seek(pos)
writer.write_uint32((end_position - base_address - 2) | 0x80000000)
writer.seek(end_position + 4)
# Write the triangle indices and terminate the list with 0xFFFF.
writer.write_uint16s(self.indices)
writer.write_uint16(0xFFFF)
else:
writer.seek(pos)
writer.write_uint32(end_position - base_address)
writer.seek(end_position + 4)
base = writer.tell()
writer.write_uint32s([0x00000000] * 8)
writer.seek(base)
for node in self.branches:
node.write(writer, base)
writer.seek(pos + 4)
def refresh_contents(self):
log_file = open(self.log_path)
log_file.seek(0, io.SEEK_END)
end_pos = log_file.tell()
if end_pos > 20000:
log_file.seek(end_pos - 20000, io.SEEK_SET)
else:
log_file.seek(0, io.SEEK_SET)
contents = log_file.read().split("\n", 1)[-1]
if contents != self.contents:
self.contents = contents
self.textBrowser_LogContent.clear()
self.textBrowser_LogContent.setPlainText(contents)
# Scrolling to bottom
cursor = self.textBrowser_LogContent.textCursor()
cursor.movePosition(QTextCursor.End)
self.textBrowser_LogContent.setTextCursor(cursor)
self.textBrowser_LogContent.ensureCursorVisible()
log_file.close()
def _downstream_thread(self, stream, downstream_boundary):
data_buff = io.BytesIO()
multipart_parser = MultipartParser(data_buff, downstream_boundary)
while not self._stop_threads_event.is_set():
if stream.data:
current_buffer_pos = data_buff.tell()
data_buff.seek(0, io.SEEK_END)
data_buff.write(b''.join(stream.data))
data_buff.seek(current_buffer_pos)
stream.data = []
message_part = multipart_parser.get_next_part()
if not message_part:
time.sleep(0.5)
continue
self._process_message_parts([message_part])
def __init__(self, file_object, start, size):
"""
Build an object that will act like a BufferedReader object,
but constrained to a predetermined part of the file_object.
:param file_object: A file opened for reading in binary mode.
:param start: Start of the part within the file_object
:param size: Ideal size of the part. This will be set
to the number of bytes remaining in the
file if there aren't enough bytes remaining.
"""
self.file = file_object
self.bytes_read = 0
self.start = start
# Seek to the end of the file to see how many bytes remain
# from the start of the part.
self.file.seek(0, io.SEEK_END)
self.size = min(self.file.tell() - self.start, size)
# Reset the pointer to the start of the part.
self.file.seek(start, io.SEEK_SET)
def seek(self, offset, whence=io.SEEK_SET):
self.__raiseIfClosed()
if not self.__seekable:
raise OSError("Seek not enabled for this stream")
if whence == io.SEEK_SET:
if offset < 0:
raise ValueError("Cannot have a negative absolute seek")
newStreamPosition = offset
elif whence == io.SEEK_CUR:
newStreamPosition = self.__streamPosition + whence
elif whence == io.SEEK_END:
if not self.__buffers:
newStreamPosition = 0
else:
newStreamPosition = self.__streamEnd + offset
self.__streamPosition = newStreamPosition
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def get_media_size(media_io):
if hasattr(media_io, "getvalue"):
size = len(media_io.getvalue())
elif hasattr(media_io, "seekable") and media_io.seekable():
media_io.seek(0, SEEK_END)
size = media_io.tell()
media_io.seek(SEEK_SET)
else:
raise DontwiMediaError
return size
def _reset_frame(self):
self._iobuf = io.BytesIO(self._iobuf.read())
self._iobuf.seek(0, 2) # io.SEEK_END == 2 (constant not present in 2.6)
self._current_frame = None
def get_cell_log_pos(self):
"""Returns the current position of the last byte in the Tor cell log."""
return self.cell_log.seek(0, SEEK_END)
def dump():
parser = argparse.ArgumentParser(description='Dump all the boxes from an MP4 file')
parser.add_argument("input_file", type=argparse.FileType("rb"), metavar="FILE", help="Path to the MP4 file to open")
args = parser.parse_args()
fd = args.input_file
fd.seek(0, io.SEEK_END)
eof = fd.tell()
fd.seek(0)
while fd.tell() < eof:
box = Box.parse_stream(fd)
print(box)
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def sendall(self, data):
self._buffer.seek(0, io.SEEK_END)
self._buffer.write(data)
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, offset, whence):
if (whence == io.SEEK_SET):
self._pos = offset
elif (whence == io.SEEK_CUR):
self._pos += offset
elif (whence == io.SEEK_END):
self._pos = self._file._filesize + offset
return self._pos
def test_small_file_seek_and_read(self):
with self.fs.open(self.KEY_LOGO_PNG, "rb") as f:
self.assertEqual(64, f.seek(64, io.SEEK_CUR))
self.assertEqual(128, f.seek(64, io.SEEK_CUR))
self.assertEqual(256, f.seek(256, io.SEEK_SET))
self.assertEqual(24610, f.seek(-256, io.SEEK_END))
self.assertEqual(
b'\x04$\x00_\x85$\xfb^\xf8\xe8]\x7f;}\xa8\xb7',
f.read(16))
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def testTruncate(self):
f = _FileIO(TESTFN, 'w')
f.write(bytes(bytearray(range(10))))
self.assertEqual(f.tell(), 10)
f.truncate(5)
self.assertEqual(f.tell(), 10)
self.assertEqual(f.seek(0, io.SEEK_END), 5)
f.truncate(15)
self.assertEqual(f.tell(), 5)
self.assertEqual(f.seek(0, io.SEEK_END), 15)
f.close()
def seek(self, offset, whence=io.SEEK_SET):
"""
Change the stream position to the given byte *offset*. *offset* is
interpreted relative to the position indicated by *whence*. Values for
*whence* are:
* ``SEEK_SET`` or ``0`` – start of the stream (the default); *offset*
should be zero or positive
* ``SEEK_CUR`` or ``1`` – current stream position; *offset* may be
negative
* ``SEEK_END`` or ``2`` – end of the stream; *offset* is usually
negative
Return the new absolute position.
"""
with self.lock:
if whence == io.SEEK_CUR:
offset = self._pos + offset
elif whence == io.SEEK_END:
offset = self._length + offset
if offset < 0:
raise ValueError(
'New position is before the start of the stream')
self._set_pos(offset)
return self._pos
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def test_seeking_from_end(self):
with EncodedFile(io.BytesIO(COMPRESSED)) as fp:
self.assertEqual(fp.read(100), TEXT[:100])
seeked_pos = fp.seek(-100, io.SEEK_END)
self.assertEqual(seeked_pos, len(TEXT) - 100)
self.assertEqual(fp.read(100), TEXT[-100:])
def test_seeking_from_end_beyond_beginning(self):
with EncodedFile(io.BytesIO(COMPRESSED)) as fp:
# Go to end to get size
size = fp.seek(0, io.SEEK_END)
# Go to beginning
self.assertNotRaises(fp.seek, -size, io.SEEK_END)
# One before beginning
self.assertRaises(IOError, fp.seek, -size - 1, io.SEEK_END)
def seek(self, offset, whence=io.SEEK_SET):
# Recalculate offset as an absolute file position.
if whence == io.SEEK_SET:
pass
elif whence == io.SEEK_CUR:
offset += self._pos
elif whence == io.SEEK_END:
if self._size < 0:
# Finish reading the file
while self.read(io.DEFAULT_BUFFER_SIZE):
pass
offset += self._size
else:
raise ValueError('Invalid value for whence: {}'.format(whence))
if offset < 0:
msg = '[Error {code}] {msg}'
raise IOError(msg.format(code=errno.EINVAL,
msg=os.strerror(errno.EINVAL)))
# Make it so that offset is the number of bytes to skip forward.
if offset < self._pos:
self._rewind()
else:
offset -= self._pos
# Read and discard data until we reach the desired position
while offset > 0:
data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset))
if not data:
break
offset -= len(data)
return self._pos
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def testTruncate(self):
f = _FileIO(TESTFN, 'w')
f.write(bytes(bytearray(range(10))))
self.assertEqual(f.tell(), 10)
f.truncate(5)
self.assertEqual(f.tell(), 10)
self.assertEqual(f.seek(0, io.SEEK_END), 5)
f.truncate(15)
self.assertEqual(f.tell(), 5)
self.assertEqual(f.seek(0, io.SEEK_END), 15)
f.close()
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def testTruncate(self):
f = _FileIO(TESTFN, 'w')
f.write(bytes(bytearray(range(10))))
self.assertEqual(f.tell(), 10)
f.truncate(5)
self.assertEqual(f.tell(), 10)
self.assertEqual(f.seek(0, io.SEEK_END), 5)
f.truncate(15)
self.assertEqual(f.tell(), 5)
self.assertEqual(f.seek(0, io.SEEK_END), 15)
f.close()
def recognizes(cls, file):
size = os.stat(file.path).st_size
if size < CBFS_HEADER_SIZE or size > CBFS_MAXIMUM_FILE_SIZE:
return False
with open(file.path, 'rb') as f:
# pick at the latest byte as it should contain the relative offset of the header
f.seek(-4, io.SEEK_END)
# <pgeorgi> given the hardware we support so far, it looks like
# that field is now bound to be little endian
# -- #coreboot, 2015-10-14
rel_offset = struct.unpack('<i', f.read(4))[0]
if rel_offset < 0 and -rel_offset > CBFS_HEADER_SIZE and -rel_offset < size:
f.seek(rel_offset, io.SEEK_END)
logger.debug('looking for header at offset: %x', f.tell())
if is_header_valid(f.read(CBFS_HEADER_SIZE), size):
return True
elif not file.name.endswith('.rom'):
return False
else:
logger.debug('CBFS relative offset seems wrong, scanning whole image')
f.seek(0, io.SEEK_SET)
offset = 0
buf = f.read(CBFS_HEADER_SIZE)
while len(buf) >= CBFS_HEADER_SIZE:
if is_header_valid(buf, size, offset):
return True
if len(buf) - offset <= CBFS_HEADER_SIZE:
buf = f.read(32768)
offset = 0
else:
offset += 1
return False
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, offset, whence):
"""
Seek within the part. This is similar to the standard seek, except
that io.SEEK_SET is the start of the part and io.SEEK_END is the
end of the part.
:param offset: Offset in bytes from location determined by the whence value
:param whence: io.SEEK_END and io.SEEK_SET are supported.
"""
if whence == io.SEEK_END:
self.file.seek(self.start + self.size + offset, io.SEEK_SET)
elif whence == io.SEEK_SET:
self.file.seek(self.start + offset, io.SEEK_SET)
else:
raise RuntimeError("Unhandled whence value: {}".format(whence))
def add_parts_from_file(self, file_path):
"""
Splits a file into parts and adds all parts to an internal list of parts to upload. The parts will not be uploaded to the server until upload is called.
:param string file_path: Path of file to upload in parts
"""
with io.open(file_path, mode='rb') as file_object:
file_object.seek(0, io.SEEK_END)
end = file_object.tell()
file_object.seek(0, io.SEEK_SET)
offset = 0
while file_object.tell() < end:
self.add_part_from_file(file_path, offset=offset, size=self.part_size)
offset += self.part_size
file_object.seek(offset, io.SEEK_SET)
def _reverse_read_lines(fp, buf_size=8192): # pylint: disable=invalid-name
"""
Async generator that returns the lines of a file in reverse order.
ref: https://stackoverflow.com/a/23646049/8776239
and: https://stackoverflow.com/questions/2301789/read-a-file-in-reverse-order-using-python
"""
segment = None # holds possible incomplete segment at the beginning of the buffer
offset = 0
await fp.seek(0, io.SEEK_END)
file_size = remaining_size = await fp.tell()
while remaining_size > 0:
offset = min(file_size, offset + buf_size)
await fp.seek(file_size - offset)
buffer = await fp.read(min(remaining_size, buf_size))
remaining_size -= buf_size
lines = buffer.splitlines(True)
# the first line of the buffer is probably not a complete line so
# we'll save it and append it to the last line of the next buffer
# we read
if segment is not None:
# if the previous chunk starts right from the beginning of line
# do not concat the segment to the last line of new chunk
# instead, yield the segment first
if buffer[-1] == '\n':
# print 'buffer ends with newline'
yield segment
else:
lines[-1] += segment
# print 'enlarged last line to >{}<, len {}'.format(lines[-1], len(lines))
segment = lines[0]
for index in range(len(lines) - 1, 0, -1):
l = lines[index]
if l:
yield l
# Don't yield None if the file was empty
if segment is not None:
yield segment
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def available(self):
"""
This is duplicate functionality if we have a HighPerformanceStreamIO.
But we also want to support those that aren't. TODO: Better solution?
"""
curPos = self._stream.tell()
self._stream.seek(0, SEEK_END)
endPos = self._stream.tell()
self._stream.seek(curPos)
return endPos-curPos
def _rawStreamSize(self):
curPos = self._stream.tell()
self._stream.seek(0, SEEK_END)
endPos = self._stream.tell()
self._stream.seek(curPos)
return endPos - self._prefixStart
def _rawAvailable(self):
"some underlying streams may not support 'available'"
curPos = self._stream.tell()
self._stream.seek(0, SEEK_END)
endPos = self._stream.tell()
self._stream.seek(curPos)
return endPos - curPos
def available(self):
"""
Get the available bytes, cutting off the suffix if
it's been acocunted for
"""
if not self._rawStreamSize() >= (self.PREFIX_SIZE + self.SUFFIX_SIZE):
return 0
curPos = self._stream.tell()
# even if the suffix hasn't been received yet, we calculate our offsets as if it had.
# why? because if it hasn't been received yet, we don't want to finish! The whole packet
# isn't framed (verified) until the final bytes are received.
self._stream.seek(-self.SUFFIX_SIZE, SEEK_END)
endPos = self._stream.tell()
self._stream.seek(curPos)
return endPos-curPos
def seek(self, offset, whence=SEEK_SET):
"Adjust seek from prefix start and, if present, from prefix"
if not self._rawStreamSize() >= (self.PREFIX_SIZE + self.SUFFIX_SIZE):
return
if whence == SEEK_SET:
offset += self._prefixStart + self.PREFIX_SIZE
return self._stream.seek(offset, whence)
elif whence == SEEK_CUR:
return self._stream.seek(offset, whence)
elif whence == SEEK_END:
# even if the suffix hasn't been received yet, we calculate our offsets as if it had.
# why? because if it hasn't been received yet, we don't want to finish! The whole packet
# isn't framed (verified) until the final bytes are received.
offset = offset - self.SUFFIX_SIZE
return self._stream.seek(offset, whence)
def update(self, newData):
beforeWritePos = self.tell()
self.seek(0, io.SEEK_END)
self.write(newData)
self.seek(beforeWritePos)
def available(self):
cur = self.tell()
end = self.seek(0, io.SEEK_END)
self.seek(cur)
return end-cur
def ended(self) -> bool:
try:
fileno = self._codeio.fileno()
offset = os.fstat(fileno).st_size
except io.UnsupportedOperation:
old_offset = self.offset
self._codeio.seek(0, io.SEEK_END)
offset = self._codeio.tell()
self._codeio.seek(old_offset)
return self.offset == offset
def _reset_frame(self):
self._iobuf = io.BytesIO(self._iobuf.read())
self._iobuf.seek(0, 2) # io.SEEK_END == 2 (constant not present in 2.6)
self._current_frame = None
def _play_run(self, f):
err = None
try:
# Calculate how many records are in the file; we'll use this later
# when updating the progress bar
rec_total = (f.seek(0, io.SEEK_END) - HEADER_REC.size) // DATA_REC.size
f.seek(0)
skipped = 0
for rec, data in enumerate(self._play_source(f)):
now = time()
if data.timestamp < now:
skipped += 1
continue
else:
if self._play_event.wait(data.timestamp - now):
break
self.props.application.pressure.set_values(data.pressure, data.ptemp)
self.props.application.humidity.set_values(data.humidity, data.htemp)
self.props.application.imu.set_imu_values(
(data.ax, data.ay, data.az),
(data.gx, data.gy, data.gz),
(data.cx, data.cy, data.cz),
(data.ox, data.oy, data.oz),
)
# Again, would be better to use custom signals here but
# attempting to do so just results in seemingly random
# segfaults during playback
with self._play_update_lock:
if self._play_update_id == 0:
self._play_update_id = GLib.idle_add(self._play_update_controls, rec / rec_total)
except Exception as e:
err = e
finally:
f.close()
# Must ensure that controls are only re-enabled *after* all pending
# control updates have run
with self._play_update_lock:
if self._play_update_id:
GLib.source_remove(self._play_update_id)
self._play_update_id = 0
# Get the main thread to re-enable the controls at the end of
# playback
GLib.idle_add(self._play_controls_finish, err)