Python os 模块,lseek() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.lseek()。
def _lseek(file_obj, offset, whence):
"""This is a helper function which invokes 'os.lseek' for file object
'file_obj' and with specified 'offset' and 'whence'. The 'whence'
argument is supposed to be either '_SEEK_DATA' or '_SEEK_HOLE'. When
there is no more data or hole starting from 'offset', this function
returns '-1'. Otherwise the data or hole position is returned."""
try:
return os.lseek(file_obj.fileno(), offset, whence)
except OSError as err:
# The 'lseek' system call returns the ENXIO if there is no data or
# hole starting from the specified offset.
if err.errno == os.errno.ENXIO:
return -1
elif err.errno == os.errno.EINVAL:
raise ErrorNotSupp("the kernel or file-system does not support "
"\"SEEK_HOLE\" and \"SEEK_DATA\"")
else:
raise
def test_fs_holes(self):
# Even if the filesystem doesn't report holes,
# if the OS supports it the SEEK_* constants
# will be defined and will have a consistent
# behaviour:
# os.SEEK_DATA = current position
# os.SEEK_HOLE = end of file position
with open(support.TESTFN, 'r+b') as fp:
fp.write(b"hello")
fp.flush()
size = fp.tell()
fno = fp.fileno()
try :
for i in range(size):
self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
except OSError :
# Some OSs claim to support SEEK_HOLE/SEEK_DATA
# but it is not true.
# For instance:
# http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
raise unittest.SkipTest("OSError raised!")
def is_sparse(path):
# LP: #1656371 - Looking at stat().st_blocks won't work on ZFS file
# systems, since that seems to return 1, whereas on EXT4 it returns 0.
# Rather than hard code the value based on file system type (which could
# be different even on other file systems), a more reliable way seems to
# be to use SEEK_DATA with an offset of 0 to find the first block of data
# after position 0. If there is no data, an ENXIO will get raised, at
# least on any modern Linux kernels we care about. See lseek(2) for
# details.
with open(path, 'r') as fp:
try:
os.lseek(fp.fileno(), 0, os.SEEK_DATA)
except OSError as error:
# There is no OSError subclass for ENXIO.
if error.errno != errno.ENXIO:
raise
# The expected exception occurred, meaning, there is no data in
# the file, so it's entirely sparse.
return True
# The expected exception did not occur, so there is data in the file.
return False
def _lseek(file_obj, offset, whence):
"""This is a helper function which invokes 'os.lseek' for file object
'file_obj' and with specified 'offset' and 'whence'. The 'whence'
argument is supposed to be either '_SEEK_DATA' or '_SEEK_HOLE'. When
there is no more data or hole starting from 'offset', this function
returns '-1'. Otherwise the data or hole position is returned."""
try:
return os.lseek(file_obj.fileno(), offset, whence)
except OSError as err:
# The 'lseek' system call returns the ENXIO if there is no data or
# hole starting from the specified offset.
if err.errno == os.errno.ENXIO:
return -1
elif err.errno == os.errno.EINVAL:
raise ErrorNotSupp("the kernel or file-system does not support "
"\"SEEK_HOLE\" and \"SEEK_DATA\"")
else:
raise
def test_writev(self):
fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
try:
n = os.writev(fd, (b'test1', b'tt2', b't3'))
self.assertEqual(n, 10)
os.lseek(fd, 0, os.SEEK_SET)
self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
# Issue #20113: empty list of buffers should not crash
try:
size = posix.writev(fd, [])
except OSError:
# writev(fd, []) raises OSError(22, "Invalid argument")
# on OpenIndiana
pass
else:
self.assertEqual(size, 0)
finally:
os.close(fd)
def test_readv(self):
fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
try:
os.write(fd, b'test1tt2t3')
os.lseek(fd, 0, os.SEEK_SET)
buf = [bytearray(i) for i in [5, 3, 2]]
self.assertEqual(posix.readv(fd, buf), 10)
self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
# Issue #20113: empty list of buffers should not crash
try:
size = posix.readv(fd, [])
except OSError:
# readv(fd, []) raises OSError(22, "Invalid argument")
# on OpenIndiana
pass
else:
self.assertEqual(size, 0)
finally:
os.close(fd)
def test_fs_holes(self):
# Even if the filesystem doesn't report holes,
# if the OS supports it the SEEK_* constants
# will be defined and will have a consistent
# behaviour:
# os.SEEK_DATA = current position
# os.SEEK_HOLE = end of file position
with open(support.TESTFN, 'r+b') as fp:
fp.write(b"hello")
fp.flush()
size = fp.tell()
fno = fp.fileno()
try :
for i in range(size):
self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
except OSError :
# Some OSs claim to support SEEK_HOLE/SEEK_DATA
# but it is not true.
# For instance:
# http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
raise unittest.SkipTest("OSError raised!")
def test_writev(self):
fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
try:
n = os.writev(fd, (b'test1', b'tt2', b't3'))
self.assertEqual(n, 10)
os.lseek(fd, 0, os.SEEK_SET)
self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
# Issue #20113: empty list of buffers should not crash
try:
size = posix.writev(fd, [])
except OSError:
# writev(fd, []) raises OSError(22, "Invalid argument")
# on OpenIndiana
pass
else:
self.assertEqual(size, 0)
finally:
os.close(fd)
def test_readv(self):
fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
try:
os.write(fd, b'test1tt2t3')
os.lseek(fd, 0, os.SEEK_SET)
buf = [bytearray(i) for i in [5, 3, 2]]
self.assertEqual(posix.readv(fd, buf), 10)
self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
# Issue #20113: empty list of buffers should not crash
try:
size = posix.readv(fd, [])
except OSError:
# readv(fd, []) raises OSError(22, "Invalid argument")
# on OpenIndiana
pass
else:
self.assertEqual(size, 0)
finally:
os.close(fd)
def test_fs_holes(self):
# Even if the filesystem doesn't report holes,
# if the OS supports it the SEEK_* constants
# will be defined and will have a consistent
# behaviour:
# os.SEEK_DATA = current position
# os.SEEK_HOLE = end of file position
with open(support.TESTFN, 'r+b') as fp:
fp.write(b"hello")
fp.flush()
size = fp.tell()
fno = fp.fileno()
try :
for i in range(size):
self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
except OSError :
# Some OSs claim to support SEEK_HOLE/SEEK_DATA
# but it is not true.
# For instance:
# http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
raise unittest.SkipTest("OSError raised!")
def write_data(self, fd, offset=0, size=None, pattern=None):
"""Write data to the file given by the file descriptor
fd:
File descriptor
offset:
File offset where data will be written to [default: 0]
size:
Total number of bytes to write [default: --filesize option]
pattern:
Data pattern to write to the file [default: data_pattern default]
"""
if size is None:
size = self.filesize
while size > 0:
# Write as much as wsize bytes per write call
dsize = min(self.wsize, size)
os.lseek(fd, offset, 0)
count = os.write(fd, self.data_pattern(offset, dsize, pattern))
size -= count
offset += count
def _copy_stream(self, fd, url, offset):
"""Copies remote file to local.
:param fd: the file`s descriptor
:param url: the remote file`s url
:param offset: the number of bytes from the beginning,
that will be skipped
:return: the count of actually copied bytes
"""
source = self.open_stream(url, offset)
os.ftruncate(fd, offset)
os.lseek(fd, offset, os.SEEK_SET)
chunk_size = 16 * 1024
size = 0
while 1:
chunk = source.read(chunk_size)
if not chunk:
break
os.write(fd, chunk)
size += len(chunk)
return size
def read(self, path, length, offset, fh):
if debug:
print "read: " + path
print "offset: "
print offset
print "length: "
print length
print fh
full_path = self._full_path(path)
print full_path
#os.lseek(fh, offset, os.SEEK_SET)
#if os.path.isfile(full_path) == False:
import config as config
account_name = config.STORAGE_ACCOUNT_NAME
account_key = config.STORAGE_ACCOUNT_KEY
containername = path.split('/')[1]
filename = path.split('/')[2]
service = baseblobservice.BaseBlobService(account_name, account_key)
blob = service.get_blob_to_bytes(containername, filename, None, offset, offset+length-1)
#blob = blob[offset:(offset+length)]
bytes = blob.content
return bytes
"""try:
if os.path.isdir(path.split('/')[1]) == False:
os.mkdir(full_path.split('/')[0]+'/'+containername)
if os.path.isfile(full_path) == False:
print "read block blob"
block_blob_service.get_blob_to_path(containername, filename, full_path)
else:
os.remove(full_path)
block_blob_service.get_blob_to_path(containername, filename, full_path)
except:
pass
fhn = os.open(full_path, 32768)
os.lseek(fhn, offset, os.SEEK_SET)
#print "os.read(fh, length)"
#print os.read(fh, length)
return os.read(fhn, length)"""
def write(self, path, buf, offset, fh):
if debug:
print "write: " + path
os.lseek(fh, offset, os.SEEK_SET)
return os.write(fh, buf)
def try_seek(fd, offset):
try:
if offset is None:
os.lseek(fd, 0, os.SEEK_END)
elif offset >= 0:
os.lseek(fd, offset, os.SEEK_SET)
else:
os.lseek(fd, offset, os.SEEK_END)
except OSError as ose:
if ose.args[0] != errno.ESPIPE:
raise
def readChunk(self, offset, length):
return self.server.avatar._runAsUser([ (os.lseek, (self.fd, offset, 0)),
(os.read, (self.fd, length)) ])
def writeChunk(self, offset, data):
return self.server.avatar._runAsUser([(os.lseek, (self.fd, offset, 0)),
(os.write, (self.fd, data))])
def get_volume(self, id):
"""
return volume information if the argument is an id or a path
"""
# If the id is actually a path
if exists(id):
with open(id) as file:
size = os.lseek(file.fileno(), 0, os.SEEK_END)
return {'path': id, 'size': size}
return self.volume.get(id)
def read(self, path, length, offset, fh):
os.lseek(fh, offset, os.SEEK_SET)
return os.read(fh, length)
def write(self, path, buf, offset, fh):
os.lseek(fh, offset, os.SEEK_SET)
return os.write(fh, buf)
def read(self, path, length, offset, fh):
os.lseek(fh, offset, os.SEEK_SET)
return os.read(fh, length)
def write(self, path, buf, offset, fh):
os.lseek(fh, offset, os.SEEK_SET)
return os.write(fh, buf)
def read(self, path, length, offset, fh):
os.lseek(fh, offset, os.SEEK_SET)
return os.read(fh, length)
def smbComWrite(connId, smbServer, SMBCommand, recvPacket):
connData = smbServer.getConnectionData(connId)
respSMBCommand = smb.SMBCommand(smb.SMB.SMB_COM_WRITE)
respParameters = smb.SMBWriteResponse_Parameters()
respData = ''
comWriteParameters = smb.SMBWrite_Parameters(SMBCommand['Parameters'])
comWriteData = smb.SMBWrite_Data(SMBCommand['Data'])
if connData['OpenedFiles'].has_key(comWriteParameters['Fid']):
fileHandle = connData['OpenedFiles'][comWriteParameters['Fid']]['FileHandle']
errorCode = STATUS_SUCCESS
try:
if fileHandle != PIPE_FILE_DESCRIPTOR:
# TODO: Handle big size files
# If we're trying to write past the file end we just skip the write call (Vista does this)
if os.lseek(fileHandle, 0, 2) >= comWriteParameters['Offset']:
os.lseek(fileHandle,comWriteParameters['Offset'],0)
os.write(fileHandle,comWriteData['Data'])
else:
sock = connData['OpenedFiles'][comWriteParameters['Fid']]['Socket']
sock.send(comWriteData['Data'])
respParameters['Count'] = comWriteParameters['Count']
except Exception, e:
smbServer.log('smbComWrite: %s' % e, logging.ERROR)
errorCode = STATUS_ACCESS_DENIED
else:
errorCode = STATUS_INVALID_HANDLE
if errorCode > 0:
respParameters = ''
respData = ''
respSMBCommand['Parameters'] = respParameters
respSMBCommand['Data'] = respData
smbServer.setConnectionData(connId, connData)
return [respSMBCommand], None, errorCode
def smbComRead(connId, smbServer, SMBCommand, recvPacket):
connData = smbServer.getConnectionData(connId)
respSMBCommand = smb.SMBCommand(smb.SMB.SMB_COM_READ)
respParameters = smb.SMBReadResponse_Parameters()
respData = smb.SMBReadResponse_Data()
comReadParameters = smb.SMBRead_Parameters(SMBCommand['Parameters'])
if connData['OpenedFiles'].has_key(comReadParameters['Fid']):
fileHandle = connData['OpenedFiles'][comReadParameters['Fid']]['FileHandle']
errorCode = STATUS_SUCCESS
try:
if fileHandle != PIPE_FILE_DESCRIPTOR:
# TODO: Handle big size files
os.lseek(fileHandle,comReadParameters['Offset'],0)
content = os.read(fileHandle,comReadParameters['Count'])
else:
sock = connData['OpenedFiles'][comReadParameters['Fid']]['Socket']
content = sock.recv(comReadParameters['Count'])
respParameters['Count'] = len(content)
respData['DataLength'] = len(content)
respData['Data'] = content
except Exception, e:
smbServer.log('smbComRead: %s ' % e, logging.ERROR)
errorCode = STATUS_ACCESS_DENIED
else:
errorCode = STATUS_INVALID_HANDLE
if errorCode > 0:
respParameters = ''
respData = ''
respSMBCommand['Parameters'] = respParameters
respSMBCommand['Data'] = respData
smbServer.setConnectionData(connId, connData)
return [respSMBCommand], None, errorCode
def smb2Read(connId, smbServer, recvPacket):
connData = smbServer.getConnectionData(connId)
respSMBCommand = smb2.SMB2Read_Response()
readRequest = smb2.SMB2Read(recvPacket['Data'])
respSMBCommand['Buffer'] = '\x00'
if str(readRequest['FileID']) == '\xff'*16:
# Let's take the data from the lastRequest
if connData['LastRequest'].has_key('SMB2_CREATE'):
fileID = connData['LastRequest']['SMB2_CREATE']['FileID']
else:
fileID = str(readRequest['FileID'])
else:
fileID = str(readRequest['FileID'])
if connData['OpenedFiles'].has_key(fileID):
fileHandle = connData['OpenedFiles'][fileID]['FileHandle']
errorCode = 0
try:
if fileHandle != PIPE_FILE_DESCRIPTOR:
offset = readRequest['Offset']
os.lseek(fileHandle,offset,0)
content = os.read(fileHandle,readRequest['Length'])
else:
sock = connData['OpenedFiles'][fileID]['Socket']
content = sock.recv(readRequest['Length'])
respSMBCommand['DataOffset'] = 0x50
respSMBCommand['DataLength'] = len(content)
respSMBCommand['DataRemaining']= 0
respSMBCommand['Buffer'] = content
except Exception, e:
smbServer.log('SMB2_READ: %s ' % e, logging.ERROR)
errorCode = STATUS_ACCESS_DENIED
else:
errorCode = STATUS_INVALID_HANDLE
smbServer.setConnectionData(connId, connData)
return [respSMBCommand], None, errorCode
def seekable(self):
if self._seekable is None:
try:
os.lseek(self._fileno, 0, os.SEEK_CUR)
except OSError:
self._seekable = False
else:
self._seekable = True
return self._seekable
def seek(self, offset, whence=0):
return os.lseek(self._fileno, offset, whence)
def seekable(self):
if self._seekable is None:
try:
os.lseek(self._fileno, 0, os.SEEK_CUR)
except OSError:
self._seekable = False
else:
self._seekable = True
return self._seekable
def seek(self, offset, whence=0):
return os.lseek(self._fileno, offset, whence)
def _probe_seek_hole(self):
"""
Check whether the system implements 'SEEK_HOLE' and 'SEEK_DATA'.
Unfortunately, there seems to be no clean way for detecting this,
because often the system just fakes them by just assuming that all
files are fully mapped, so 'SEEK_HOLE' always returns EOF and
'SEEK_DATA' always returns the requested offset.
I could not invent a better way of detecting the fake 'SEEK_HOLE'
implementation than just to create a temporary file in the same
directory where the image file resides. It would be nice to change this
to something better.
"""
directory = os.path.dirname(self._image_path)
try:
tmp_obj = tempfile.TemporaryFile("w+", dir=directory)
except IOError as err:
raise ErrorNotSupp("cannot create a temporary in \"%s\": %s"
% (directory, err))
try:
os.ftruncate(tmp_obj.fileno(), self.block_size)
except OSError as err:
raise ErrorNotSupp("cannot truncate temporary file in \"%s\": %s"
% (directory, err))
offs = _lseek(tmp_obj, 0, _SEEK_HOLE)
if offs != 0:
# We are dealing with the stub 'SEEK_HOLE' implementation which
# always returns EOF.
self._log.debug("lseek(0, SEEK_HOLE) returned %d" % offs)
raise ErrorNotSupp("the file-system does not support "
"\"SEEK_HOLE\" and \"SEEK_DATA\" but only "
"provides a stub implementation")
tmp_obj.close()
def test_read(self):
with open(support.TESTFN, "w+b") as fobj:
fobj.write(b"spam")
fobj.flush()
fd = fobj.fileno()
os.lseek(fd, 0, 0)
s = os.read(fd, 4)
self.assertEqual(type(s), bytes)
self.assertEqual(s, b"spam")
def test_lseek(self):
if hasattr(os, "lseek"):
self.check(os.lseek, 0, 0)
def test_stdin_filedes(self):
# stdin is set to open file descriptor
tf = tempfile.TemporaryFile()
self.addCleanup(tf.close)
d = tf.fileno()
os.write(d, b"pear")
os.lseek(d, 0, 0)
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.exit(sys.stdin.read() == "pear")'],
stdin=d)
p.wait()
self.assertEqual(p.returncode, 1)
def test_stdout_filedes(self):
# stdout is set to open file descriptor
tf = tempfile.TemporaryFile()
self.addCleanup(tf.close)
d = tf.fileno()
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stdout.write("orange")'],
stdout=d)
p.wait()
os.lseek(d, 0, 0)
self.assertEqual(os.read(d, 1024), b"orange")
def test_stderr_filedes(self):
# stderr is set to open file descriptor
tf = tempfile.TemporaryFile()
self.addCleanup(tf.close)
d = tf.fileno()
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stderr.write("strawberry")'],
stderr=d)
p.wait()
os.lseek(d, 0, 0)
self.assertStderrEqual(os.read(d, 1024), b"strawberry")
def test_textmode(self):
# _mkstemp_inner can create files in text mode
if not has_textmode:
return # ugh, can't use SkipTest.
# A text file is truncated at the first Ctrl+Z byte
f = self.do_create(bin=0)
f.write(b"blat\x1a")
f.write(b"extra\n")
os.lseek(f.fd, 0, os.SEEK_SET)
self.assertEqual(os.read(f.fd, 20), b"blat")
def test_lseek(self):
if verbose:
print('play around with os.lseek() with the built largefile')
with self.open(TESTFN, 'rb') as f:
self.assertEqual(os.lseek(f.fileno(), 0, 0), 0)
self.assertEqual(os.lseek(f.fileno(), 42, 0), 42)
self.assertEqual(os.lseek(f.fileno(), 42, 1), 84)
self.assertEqual(os.lseek(f.fileno(), 0, 1), 84)
self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0)
self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10)
self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0)
self.assertEqual(os.lseek(f.fileno(), size, 0), size)
# the 'a' that was written at the end of file above
self.assertEqual(f.read(1), b'a')
def _write(self, piece):
pos = piece.index * self.torrent.info.piece_length
os.lseek(self.fd, pos, os.SEEK_SET)
os.write(self.fd, piece.data)
def read(self, begin, index, length):
pos = index * self.torrent.info.piece_length
os.lseek(self.fd, pos, os.SEEK_SET)
return os.read(self.fd, length)
def test_stdin_filedes(self):
# stdin is set to open file descriptor
tf = tempfile.TemporaryFile()
d = tf.fileno()
os.write(d, "pear")
os.lseek(d, 0, 0)
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.exit(sys.stdin.read() == "pear")'],
stdin=d)
p.wait()
self.assertEqual(p.returncode, 1)
def test_stdout_filedes(self):
# stdout is set to open file descriptor
tf = tempfile.TemporaryFile()
d = tf.fileno()
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stdout.write("orange")'],
stdout=d)
p.wait()
os.lseek(d, 0, 0)
self.assertEqual(os.read(d, 1024), "orange")
def test_stderr_filedes(self):
# stderr is set to open file descriptor
tf = tempfile.TemporaryFile()
d = tf.fileno()
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stderr.write("strawberry")'],
stderr=d)
p.wait()
os.lseek(d, 0, 0)
self.assertStderrEqual(os.read(d, 1024), "strawberry")
def test_lseek(self):
self.check(os.lseek, 0, 0)
def test_stdin_filedes(self):
# stdin is set to open file descriptor
tf = tempfile.TemporaryFile()
d = tf.fileno()
os.write(d, "pear")
os.lseek(d, 0, 0)
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.exit(sys.stdin.read() == "pear")'],
stdin=d)
p.wait()
self.assertEqual(p.returncode, 1)
def test_stdout_filedes(self):
# stdout is set to open file descriptor
tf = tempfile.TemporaryFile()
d = tf.fileno()
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stdout.write("orange")'],
stdout=d)
p.wait()
os.lseek(d, 0, 0)
self.assertEqual(os.read(d, 1024), "orange")
def test_stderr_filedes(self):
# stderr is set to open file descriptor
tf = tempfile.TemporaryFile()
d = tf.fileno()
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stderr.write("strawberry")'],
stderr=d)
p.wait()
os.lseek(d, 0, 0)
self.assertStderrEqual(os.read(d, 1024), "strawberry")
def __init__(self, buf = None, pos = 0, filename = None, fp = None):
if fp is not None:
try:
fileno = fp.fileno()
length = os.path.getsize(fp.name)
import mmap
except:
# read whole file into memory
buf = fp.read()
pos = 0
else:
# map the whole file into memory
if length:
# length must not be zero
buf = mmap.mmap(fileno, length, access = mmap.ACCESS_READ)
pos = os.lseek(fileno, 0, 1)
else:
buf = ''
pos = 0
if filename is None:
try:
filename = fp.name
except AttributeError:
filename = None
self.buf = buf
self.pos = pos
self.line = 1
self.col = 1
self.filename = filename
def _save_chunk(self, n, data):
if self._part_fd < 0:
if not self._open_files():
return False
assert self._part_fd >= 0
offs = self.chunk_size * n
os.lseek(self._part_fd, offs, os.SEEK_SET)
os.write(self._part_fd, data)
self._segments_file.write(self._segments)
return True