Python os 模块,truncate() 实例源码
我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用os.truncate()。
def test_does_not_fit(self):
# The contents of a structure is too large for the image size.
workdir = self._resources.enter_context(TemporaryDirectory())
# See LP: #1666580
main(('snap', '--workdir', workdir,
'--thru', 'load_gadget_yaml',
self.model_assertion))
# Make the gadget's mbr contents too big.
path = os.path.join(workdir, 'unpack', 'gadget', 'pc-boot.img')
os.truncate(path, 512)
mock = self._resources.enter_context(patch(
'ubuntu_image.__main__._logger.error'))
code = main(('snap', '--workdir', workdir, '--resume'))
self.assertEqual(code, 1)
self.assertEqual(
mock.call_args_list[-1],
call('Volume contents do not fit (72B over): '
'volumes:<pc>:structure:<mbr> [#0]'))
def test_truncate_fixed(sub_open):
fname = join(sub_open, 'new_file')
f = open(fname, 'w')
f.write('hello\n')
f.flush()
ff = open(fname, 'a')
ff.truncate(1)
ff.close()
ff = open(fname, 'r')
assert ff.read() == 'h'
ff.close()
f.close()
os.truncate(fname, 0)
f = open(fname, 'r')
assert f.read() == ''
f.close()
def test_ftruncate(self):
if hasattr(os, "ftruncate"):
self.check(os.truncate, 0)
self.check(os.ftruncate, 0)
def create_internal_disk(self):
"""Create a internal-image*.wic in the resultdir that runqemu can use."""
copyfile(os.path.join(self.image_dir, 'refkit-installer-image-%s.qemuboot.conf' % self.image_arch),
os.path.join(self.resultdir, 'internal-image-%s.qemuboot.conf' % self.image_arch))
for ovmf in glob('%s/ovmf*' % self.image_dir):
os.symlink(ovmf, os.path.join(self.resultdir, os.path.basename(ovmf)))
with open(os.path.join(self.resultdir, 'internal-image-%s.wic' % self.image_arch), 'w') as f:
# empty, sparse file of 8GB
os.truncate(f.fileno(), 8 * 1024 * 1024 * 1024)
def get_default_sector_size():
with NamedTemporaryFile() as fp:
# Truncate to zero, so that extending the size in the next call
# will cause all the bytes to read as zero. Stevens $4.13
os.truncate(fp.name, 0)
os.truncate(fp.name, MiB(1))
return Device(fp.name).sectorSize
def __init__(self, path, size, schema=None):
"""Initialize an image file to a given size in bytes.
:param path: Path to image file on the file system.
:type path: str
:param size: Size in bytes to set the image file to.
:type size: int
:param schema: The partitioning schema of the volume.
:type schema: VolumeSchema
Public attributes:
* path - Path to the image file.
"""
self.path = path
# Create an empty image file of a fixed size. Unlike
# truncate(1) --size 0, os.truncate(path, 0) doesn't touch the
# file; i.e. it must already exist.
with open(path, 'wb'):
pass
# Truncate to zero, so that extending the size in the next call
# will cause all the bytes to read as zero. Stevens $4.13
os.truncate(path, 0)
os.truncate(path, size)
# Prepare the device and disk objects for parted to be used for all
# future partition() calls. Only do it if we actually care about the
# partition table.
if schema is None:
self.sector_size = 512
self.device = None
self.disk = None
self.schema = None
else:
self.device = parted.Device(self.path)
label = 'msdos' if schema is VolumeSchema.mbr else 'gpt'
self.schema = schema
self.disk = parted.freshDisk(self.device, label)
self.sector_size = self.device.sectorSize
def test_sparse_copy(self):
with ExitStack() as resources:
tmpdir = resources.enter_context(TemporaryDirectory())
sparse_file = os.path.join(tmpdir, 'sparse.dat')
fp = resources.enter_context(open(sparse_file, 'w'))
os.truncate(fp.fileno(), 1000000)
# This file is sparse.
self.assertTrue(is_sparse(sparse_file))
copied_file = os.path.join(tmpdir, 'copied.dat')
sparse_copy(sparse_file, copied_file)
self.assertTrue(is_sparse(copied_file))
def test_copy_symlink(self):
with ExitStack() as resources:
tmpdir = resources.enter_context(TemporaryDirectory())
sparse_file = os.path.join(tmpdir, 'sparse.dat')
fp = resources.enter_context(open(sparse_file, 'w'))
os.truncate(fp.fileno(), 1000000)
# This file is sparse.
self.assertTrue(is_sparse(sparse_file))
# Create a symlink to the sparse file.
linked_file = os.path.join(tmpdir, 'linked.dat')
os.symlink(sparse_file, linked_file)
self.assertTrue(os.path.islink(linked_file))
copied_link = os.path.join(tmpdir, 'copied.dat')
sparse_copy(linked_file, copied_link, follow_symlinks=False)
self.assertTrue(os.path.islink(copied_link))
def test_ftruncate(self):
self.check(os.truncate, 0)
self.check(os.ftruncate, 0)
def _open_log_file():
"""Open a Porcupine log file.
Usually this opens and overwrites log.txt. If another Porcupine
process has it currently opened, this opens log1.txt instead, then
log2.txt and so on.
"""
# create an iterator 'log.txt', 'log2.txt', 'log3.txt', ...
filenames = itertools.chain(
['log.txt'],
map('log{}.txt'.format, itertools.count(start=2)),
)
for filename in filenames:
path = os.path.join(dirs.cachedir, filename)
# unfortunately there's not a mode that would open in write but
# not truncate like 'w' or seek to end like 'a'
fileno = os.open(path, os.O_WRONLY | os.O_CREAT, 0o644)
if _lock(fileno):
# now we can delete the old content, can't use os.truncate
# here because it doesn't exist on windows
file = open(fileno, 'w')
file.truncate(0)
return file
else:
os.close(fileno)
# FileHandler doesn't take already opened files and StreamHandler
# doesn't close the file :(
def truncate(self, length):
self.data = self.get_data()
if length == 0:
self.data = bytes('', 'utf8')
elif length <= len(self.data):
self.data = self.data[:length]
else:
self.data = self.data + bytes(
'\0' * (length - self.stat['st_size']), 'utf8'
)
self.overwrite = True
def truncate(self, length):
if self._handle is None:
os.truncate(self.full_path, length)
else:
self._handle.seek(0)
self._handle.truncate(length)
self._handle.flush()
def truncate(self, length):
old_data = self.data
if length == 0:
self.data = bytes('', 'utf8')
elif length <= len(old_data):
self.data = self.data[:length]
else:
self.data = old_data + bytes(
'\0' * (length - len(old_data)), 'utf8'
)
self.stat['st_atime'] = time()
self.stat['st_mtime'] = time()
self.dirty = True
def truncate(self, path, length, fh=None):
with self._lock:
if length < 0: # pragma: no cover
raise FuseOSError(EINVAL)
if fh is None:
file = self.get_file(path, expect_type=SingleFile)
else: # pragma: no cover
file = self._open_files[fh]
if self.fixed and not isinstance(file, (TempFile, SpecialFile)):
raise FuseOSError(EPERM)
file.truncate(length)
def test_ftruncate(self):
self.check(os.truncate, 0)
self.check(os.ftruncate, 0)
def _consistent(self, cached):
try:
all_fh = [ ]
for vlist in cached.open_handle.values():
all_fh += vlist
for v_fh in all_fh:
assert (None, v_fh) in self._shelfcache, 'Inconsistent list members'
except Exception as e:
self.logger.error('Shadow cache is corrupt: %s' % str(e))
if self.verbose > 3:
set_trace()
raise TmfsOSError(errno.EBADFD)
# Most uses send an open handle fh (integer) as key. truncate by name
# is the exception. An update needs to be reflected for all keys.
def truncate(self, shelf, length, fh):
if fh is not None:
# This is an update, but there's no good way to flag that to
# __setitem__. Do an idiot check here.
assert (None, fh) in self, 'VFS thinks %s is open but LFS does not, shelf_id: %s' % (
shelf.name, shelf.id)
self[(shelf.id, None)] = shelf
return 0
def truncate(self, shelf, length, fd): # shadow_dir, yes an fd
try:
if fd: # It's an open shelf
assert self[shelf.name]._fd == fd, 'fd mismatch on truncate'
os.truncate(
shelf._fd if shelf._fd >= 0 else self.shadowpath(shelf.name),
length)
shelf.size_bytes = length
if shelf.open_handle is None:
shelf = self[shelf.id]
if shelf is not None:
shelf.size_bytes = length
return 0
except OSError as e:
raise TmfsOSError(e.errno)