Python os 模块,stat() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用os.stat()。
def mounts(self, detectdev=False):
mounts = []
with open('/proc/mounts', 'r') as f:
for line in f:
dev, path, fstype = line.split()[0:3]
if fstype in ('ext2', 'ext3', 'ext4', 'xfs',
'jfs', 'reiserfs', 'btrfs',
'simfs'): # simfs: filesystem in OpenVZ
if not os.path.isdir(path): continue
mounts.append({'dev': dev, 'path': path, 'fstype': fstype})
for mount in mounts:
stat = os.statvfs(mount['path'])
total = stat.f_blocks*stat.f_bsize
free = stat.f_bfree*stat.f_bsize
used = (stat.f_blocks-stat.f_bfree)*stat.f_bsize
mount['total'] = b2h(total)
mount['free'] = b2h(free)
mount['used'] = b2h(used)
mount['used_rate'] = div_percent(used, total)
if detectdev:
dev = os.stat(mount['path']).st_dev
mount['major'], mount['minor'] = os.major(dev), os.minor(dev)
return mounts
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location, such as /tmp, it opens up an opportunity for an attacker to
replace an extracted file with an unauthorized payload. Warn the user
if a known insecure location is used.
See Distribute #375 for more details.
"""
if os.name == 'nt' and not path.startswith(os.environ['windir']):
# On Windows, permissions are generally restrictive by default
# and temp directories are not writable by other users, so
# bypass the warning.
return
mode = os.stat(path).st_mode
if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
msg = ("%s is writable by group/others and vulnerable to attack "
"when "
"used with get_resource_filename. Consider a more secure "
"location (set with .set_extraction_path or the "
"PYTHON_EGG_CACHE environment variable)." % path)
warnings.warn(msg, UserWarning)
def copy_from_host(module):
compress = module.params.get('compress')
src = module.params.get('src')
if not os.path.exists(src):
module.fail_json(msg="file not found: {}".format(src))
if not os.access(src, os.R_OK):
module.fail_json(msg="file is not readable: {}".format(src))
mode = oct(os.stat(src).st_mode & 0o777)
with open(src, 'rb') as f:
raw_data = f.read()
sha1 = hashlib.sha1(raw_data).hexdigest()
data = zlib.compress(raw_data) if compress else raw_data
module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
source=src)
def check_is_readonly(self, fname):
"""
Return `True` if @fname is read-only on the filesystem.
@fname
Path to a file.
"""
if not fname:
return
try:
mode = os.stat(fname)
read_only = (stat.S_IMODE(mode.st_mode) & stat.S_IWUSR != stat.S_IWUSR)
except FileNotFoundError:
return
return read_only
def place_data_on_block_device(blk_device, data_src_dst):
"""Migrate data in data_src_dst to blk_device and then remount."""
# mount block device into /mnt
mount(blk_device, '/mnt')
# copy data to /mnt
copy_files(data_src_dst, '/mnt')
# umount block device
umount('/mnt')
# Grab user/group ID's from original source
_dir = os.stat(data_src_dst)
uid = _dir.st_uid
gid = _dir.st_gid
# re-mount where the data should originally be
# TODO: persist is currently a NO-OP in core.host
mount(blk_device, data_src_dst, persist=True)
# ensure original ownership of new mount.
os.chown(data_src_dst, uid, gid)
def copyfile(src, dst):
"""Copy data from src to dst"""
if _samefile(src, dst):
raise Error("`%s` and `%s` are the same file" % (src, dst))
for fn in [src, dst]:
try:
st = os.stat(fn)
except OSError:
# File most likely does not exist
pass
else:
# XXX What about other special files? (sockets, devices...)
if stat.S_ISFIFO(st.st_mode):
raise SpecialFileError("`%s` is a named pipe" % fn)
with open(src, 'rb') as fsrc:
with open(dst, 'wb') as fdst:
copyfileobj(fsrc, fdst)
def newer(self, source, target):
"""Tell if the target is newer than the source.
Returns true if 'source' exists and is more recently modified than
'target', or if 'source' exists and 'target' doesn't.
Returns false if both exist and 'target' is the same age or younger
than 'source'. Raise PackagingFileError if 'source' does not exist.
Note that this test is not very accurate: files created in the same
second will have the same "age".
"""
if not os.path.exists(source):
raise DistlibException("file '%r' does not exist" %
os.path.abspath(source))
if not os.path.exists(target):
return True
return os.stat(source).st_mtime > os.stat(target).st_mtime
def postprocess(self, tempname, filename):
"""Perform any platform-specific postprocessing of `tempname`
This is where Mac header rewrites should be done; other platforms don't
have anything special they should do.
Resource providers should call this method ONLY after successfully
extracting a compressed resource. They must NOT call it on resources
that are already in the filesystem.
`tempname` is the current (temporary) name of the file, and `filename`
is the name it will be renamed to by the caller after this routine
returns.
"""
if os.name == 'posix':
# Make the resource executable
mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
os.chmod(tempname, mode)
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location, such as /tmp, it opens up an opportunity for an attacker to
replace an extracted file with an unauthorized payload. Warn the user
if a known insecure location is used.
See Distribute #375 for more details.
"""
if os.name == 'nt' and not path.startswith(os.environ['windir']):
# On Windows, permissions are generally restrictive by default
# and temp directories are not writable by other users, so
# bypass the warning.
return
mode = os.stat(path).st_mode
if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
msg = ("%s is writable by group/others and vulnerable to attack "
"when "
"used with get_resource_filename. Consider a more secure "
"location (set with .set_extraction_path or the "
"PYTHON_EGG_CACHE environment variable)." % path)
warnings.warn(msg, UserWarning)
def postprocess(self, tempname, filename):
"""Perform any platform-specific postprocessing of `tempname`
This is where Mac header rewrites should be done; other platforms don't
have anything special they should do.
Resource providers should call this method ONLY after successfully
extracting a compressed resource. They must NOT call it on resources
that are already in the filesystem.
`tempname` is the current (temporary) name of the file, and `filename`
is the name it will be renamed to by the caller after this routine
returns.
"""
if os.name == 'posix':
# Make the resource executable
mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
os.chmod(tempname, mode)
def unpack_and_compile(self, egg_path, destination):
to_compile = []
to_chmod = []
def pf(src, dst):
if dst.endswith('.py') and not src.startswith('EGG-INFO/'):
to_compile.append(dst)
elif dst.endswith('.dll') or dst.endswith('.so'):
to_chmod.append(dst)
self.unpack_progress(src, dst)
return not self.dry_run and dst or None
unpack_archive(egg_path, destination, pf)
self.byte_compile(to_compile)
if not self.dry_run:
for f in to_chmod:
mode = ((os.stat(f)[stat.ST_MODE]) | 0o555) & 0o7755
chmod(f, mode)
def commonprefix(m):
"Given a list of pathnames, returns the longest common leading component"
if not m: return ''
# Some people pass in a list of pathname parts to operate in an OS-agnostic
# fashion; don't try to translate in that case as that's an abuse of the
# API and they are already doing what they need to be OS-agnostic and so
# they most likely won't be using an os.PathLike object in the sublists.
if not isinstance(m[0], (list, tuple)):
m = tuple(map(os.fspath, m))
s1 = min(m)
s2 = max(m)
for i, c in enumerate(s1):
if c != s2[i]:
return s1[:i]
return s1
# Are two stat buffers (obtained from stat, fstat or lstat)
# describing the same file?
def serve(self, request, path):
# the following code is largely borrowed from `django.views.static.serve`
# and django-filetransfers: filetransfers.backends.default
fullpath = os.path.join(settings.PRIVATE_MEDIA_ROOT, path)
if not os.path.exists(fullpath):
raise Http404('"{0}" does not exist'.format(fullpath))
# Respect the If-Modified-Since header.
statobj = os.stat(fullpath)
content_type = mimetypes.guess_type(fullpath)[0] or 'application/octet-stream'
if not was_modified_since(request.META.get('HTTP_IF_MODIFIED_SINCE'),
statobj[stat.ST_MTIME], statobj[stat.ST_SIZE]):
return HttpResponseNotModified(content_type=content_type)
response = HttpResponse(open(fullpath, 'rb').read(), content_type=content_type)
response["Last-Modified"] = http_date(statobj[stat.ST_MTIME])
# filename = os.path.basename(path)
# response['Content-Disposition'] = smart_str(u'attachment; filename={0}'.format(filename))
return response
def copyfile(src, dst):
"""Copy data from src to dst"""
if _samefile(src, dst):
raise Error("`%s` and `%s` are the same file" % (src, dst))
for fn in [src, dst]:
try:
st = os.stat(fn)
except OSError:
# File most likely does not exist
pass
else:
# XXX What about other special files? (sockets, devices...)
if stat.S_ISFIFO(st.st_mode):
raise SpecialFileError("`%s` is a named pipe" % fn)
with open(src, 'rb') as fsrc:
with open(dst, 'wb') as fdst:
copyfileobj(fsrc, fdst)
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location, such as /tmp, it opens up an opportunity for an attacker to
replace an extracted file with an unauthorized payload. Warn the user
if a known insecure location is used.
See Distribute #375 for more details.
"""
if os.name == 'nt' and not path.startswith(os.environ['windir']):
# On Windows, permissions are generally restrictive by default
# and temp directories are not writable by other users, so
# bypass the warning.
return
mode = os.stat(path).st_mode
if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
msg = ("%s is writable by group/others and vulnerable to attack "
"when "
"used with get_resource_filename. Consider a more secure "
"location (set with .set_extraction_path or the "
"PYTHON_EGG_CACHE environment variable)." % path)
warnings.warn(msg, UserWarning)
def postprocess(self, tempname, filename):
"""Perform any platform-specific postprocessing of `tempname`
This is where Mac header rewrites should be done; other platforms don't
have anything special they should do.
Resource providers should call this method ONLY after successfully
extracting a compressed resource. They must NOT call it on resources
that are already in the filesystem.
`tempname` is the current (temporary) name of the file, and `filename`
is the name it will be renamed to by the caller after this routine
returns.
"""
if os.name == 'posix':
# Make the resource executable
mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
os.chmod(tempname, mode)
def postprocess(self, tempname, filename):
"""Perform any platform-specific postprocessing of `tempname`
This is where Mac header rewrites should be done; other platforms don't
have anything special they should do.
Resource providers should call this method ONLY after successfully
extracting a compressed resource. They must NOT call it on resources
that are already in the filesystem.
`tempname` is the current (temporary) name of the file, and `filename`
is the name it will be renamed to by the caller after this routine
returns.
"""
if os.name == 'posix':
# Make the resource executable
mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
os.chmod(tempname, mode)
def unpack_and_compile(self, egg_path, destination):
to_compile = []
to_chmod = []
def pf(src, dst):
if dst.endswith('.py') and not src.startswith('EGG-INFO/'):
to_compile.append(dst)
elif dst.endswith('.dll') or dst.endswith('.so'):
to_chmod.append(dst)
self.unpack_progress(src, dst)
return not self.dry_run and dst or None
unpack_archive(egg_path, destination, pf)
self.byte_compile(to_compile)
if not self.dry_run:
for f in to_chmod:
mode = ((os.stat(f)[stat.ST_MODE]) | 0o555) & 0o7755
chmod(f, mode)
def place_data_on_block_device(blk_device, data_src_dst):
"""Migrate data in data_src_dst to blk_device and then remount."""
# mount block device into /mnt
mount(blk_device, '/mnt')
# copy data to /mnt
copy_files(data_src_dst, '/mnt')
# umount block device
umount('/mnt')
# Grab user/group ID's from original source
_dir = os.stat(data_src_dst)
uid = _dir.st_uid
gid = _dir.st_gid
# re-mount where the data should originally be
# TODO: persist is currently a NO-OP in core.host
mount(blk_device, data_src_dst, persist=True)
# ensure original ownership of new mount.
os.chown(data_src_dst, uid, gid)
def place_data_on_block_device(blk_device, data_src_dst):
"""Migrate data in data_src_dst to blk_device and then remount."""
# mount block device into /mnt
mount(blk_device, '/mnt')
# copy data to /mnt
copy_files(data_src_dst, '/mnt')
# umount block device
umount('/mnt')
# Grab user/group ID's from original source
_dir = os.stat(data_src_dst)
uid = _dir.st_uid
gid = _dir.st_gid
# re-mount where the data should originally be
# TODO: persist is currently a NO-OP in core.host
mount(blk_device, data_src_dst, persist=True)
# ensure original ownership of new mount.
os.chown(data_src_dst, uid, gid)
def add_local_charm_dir(self, charm_dir, series):
"""Upload a local charm to the model.
This will automatically generate an archive from
the charm dir.
:param charm_dir: Path to the charm directory
:param series: Charm series
"""
fh = tempfile.NamedTemporaryFile()
CharmArchiveGenerator(charm_dir).make_archive(fh.name)
with fh:
func = partial(
self.add_local_charm, fh, series, os.stat(fh.name).st_size)
charm_url = await self._connector.loop.run_in_executor(None, func)
log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
return charm_url
def _write_symlink(self, zf, link_target, link_path):
"""Package symlinks with appropriate zipfile metadata."""
info = zipfile.ZipInfo()
info.filename = link_path
info.create_system = 3
# Magic code for symlinks / py2/3 compat
# 27166663808 = (stat.S_IFLNK | 0755) << 16
info.external_attr = 2716663808
zf.writestr(info, link_target)
def Send_File_Client():
sendSock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sendSock.connect(ADDR)
fhead=struct.pack('IdI',1,float(time.time()),os.stat(filename).st_size)
print(fhead)
sendSock.send(fhead)
fp = open(filename,'rb')
while 1:
filedata = fp.read(BUFSIZE)
if not filedata:
break
sendSock.send(filedata)
'''
print u"?????????????...\n"
fp.close()
sendSock.close()
print u"?????...\n"
'''
def maybe_download_and_extract():
"""Download and extract the tarball from Alex's website."""
dest_directory = FLAGS.data_dir
if not os.path.exists(dest_directory):
os.makedirs(dest_directory)
filename = DATA_URL.split('/')[-1]
filepath = os.path.join(dest_directory, filename)
if not os.path.exists(filepath):
def _progress(count, block_size, total_size):
sys.stdout.write('\r>> Downloading %s %.1f%%' % (filename,
float(count * block_size) / float(total_size) * 100.0))
sys.stdout.flush()
filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath,
reporthook=_progress)
print()
statinfo = os.stat(filepath)
print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
tarfile.open(filepath, 'r:gz').extractall(dest_directory)
def maybe_download_and_extract():
"""Download and extract the tarball from Alex's website."""
dest_directory = FLAGS.data_dir
if not os.path.exists(dest_directory):
os.makedirs(dest_directory)
filename = DATA_URL.split('/')[-1]
filepath = os.path.join(dest_directory, filename)
if not os.path.exists(filepath):
def _progress(count, block_size, total_size):
sys.stdout.write('\r>> Downloading %s %.1f%%' % (filename,
float(count * block_size) / float(total_size) * 100.0))
sys.stdout.flush()
filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath,
reporthook=_progress)
print()
statinfo = os.stat(filepath)
print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
tarfile.open(filepath, 'r:gz').extractall(dest_directory)
def checkcache(filename=None):
"""Discard cache entries that are out of date.
(This is not checked upon each call!)"""
if filename is None:
filenames = cache.keys()
else:
if filename in cache:
filenames = [filename]
else:
return
for filename in filenames:
size, mtime, lines, fullname = cache[filename]
if mtime is None:
continue # no-op for files loaded via a __loader__
try:
stat = os.stat(fullname)
except os.error:
del cache[filename]
continue
if size != stat.st_size or mtime != stat.st_mtime:
del cache[filename]
def newer(source, target):
"""Tells if the target is newer than the source.
Return true if 'source' exists and is more recently modified than
'target', or if 'source' exists and 'target' doesn't.
Return false if both exist and 'target' is the same age or younger
than 'source'. Raise DistutilsFileError if 'source' does not exist.
Note that this test is not very accurate: files created in the same second
will have the same "age".
"""
if not os.path.exists(source):
raise DistutilsFileError("file '%s' does not exist" %
os.path.abspath(source))
if not os.path.exists(target):
return True
return os.stat(source).st_mtime > os.stat(target).st_mtime
def load_stats(self, arg):
if not arg: self.stats = {}
elif isinstance(arg, basestring):
f = open(arg, 'rb')
self.stats = marshal.load(f)
f.close()
try:
file_stats = os.stat(arg)
arg = time.ctime(file_stats.st_mtime) + " " + arg
except: # in case this is not unix
pass
self.files = [ arg ]
elif hasattr(arg, 'create_stats'):
arg.create_stats()
self.stats = arg.stats
arg.stats = {}
if not self.stats:
raise TypeError, "Cannot create or construct a %r object from '%r''" % (
self.__class__, arg)
return
def synopsis(filename, cache={}):
"""Get the one-line summary out of a module file."""
mtime = os.stat(filename).st_mtime
lastupdate, result = cache.get(filename, (0, None))
if lastupdate < mtime:
info = inspect.getmoduleinfo(filename)
try:
file = open(filename)
except IOError:
# module can't be opened, so skip it
return None
if info and 'b' in info[2]: # binary modules have to be imported
try: module = imp.load_module('__temp__', file, filename, info[1:])
except: return None
result = (module.__doc__ or '').splitlines()[0]
del sys.modules['__temp__']
else: # text modules can be directly examined
result = source_synopsis(file)
file.close()
cache[filename] = (mtime, result)
return result
def listsubfolders(self, name):
"""Return the names of the subfolders in a given folder
(prefixed with the given folder name)."""
fullname = os.path.join(self.path, name)
# Get the link count so we can avoid listing folders
# that have no subfolders.
nlinks = os.stat(fullname).st_nlink
if nlinks <= 2:
return []
subfolders = []
subnames = os.listdir(fullname)
for subname in subnames:
fullsubname = os.path.join(fullname, subname)
if os.path.isdir(fullsubname):
name_subname = os.path.join(name, subname)
subfolders.append(name_subname)
# Stop looking for subfolders when
# we've seen them all
nlinks = nlinks - 1
if nlinks <= 2:
break
subfolders.sort()
return subfolders
def copyfile(src, dst):
"""Copy data from src to dst"""
if _samefile(src, dst):
raise Error("`%s` and `%s` are the same file" % (src, dst))
for fn in [src, dst]:
try:
st = os.stat(fn)
except OSError:
# File most likely does not exist
pass
else:
# XXX What about other special files? (sockets, devices...)
if stat.S_ISFIFO(st.st_mode):
raise SpecialFileError("`%s` is a named pipe" % fn)
with open(src, 'rb') as fsrc:
with open(dst, 'wb') as fdst:
copyfileobj(fsrc, fdst)
def get_latest_data_subdir(pattern=None, take=-1):
def get_date(f):
return os.stat(os.path.join(BASE_DATA_DIR, f)).st_mtime
try:
dirs = next(os.walk(BASE_DATA_DIR))[1]
except StopIteration:
return None
if pattern is not None:
dirs = (d for d in dirs if pattern in d)
dirs = list(sorted(dirs, key=get_date))
if len(dirs) == 0:
return None
return dirs[take]
def filemetadata(filename):
# type: (str) -> Optional[FileMeta]
p_filename = which(filename)
if p_filename is None:
return None
filename = p_filename
s = os.stat(filename)
if filename != sys.executable:
result = run_executable(filename, ['--version'])
versionstring = result.stdout
else:
# filename is the Python interpreter itself
versionstring = bytestr(sys.version)
return FileMeta(filename, s.st_size, s.st_mtime, filesha(filename), versionstring)
# ----------------------------------------------------------------------
def handle(self, *args, **options):
commit_msg_path = os.path.join(self.HOOK_PATH, 'commit-msg')
hook_exists = os.path.exists(commit_msg_path)
if hook_exists:
with open(commit_msg_path, 'r') as fp:
hook_content = fp.read()
else:
hook_content = '#!/usr/bin/env bash\n\n'
if 'ZERODOWNTIME_COMMIT_MSG_HOOK' not in hook_content:
hook_content += COMMIT_MSG_HOOK
with open(commit_msg_path, 'w') as fp:
fp.write(hook_content)
st = os.stat(commit_msg_path)
os.chmod(commit_msg_path, st.st_mode | stat.S_IEXEC)
def _clean_upgrade(binary_ok, binary_path, path, temp_path):
if binary_ok:
import stat
# save the permissions from the current binary
old_stat = os.stat(binary_path)
# rename the current binary in order to replace it with the latest
os.rename(binary_path, path + "/old")
os.rename(temp_path, binary_path)
# set the same permissions that had the previous binary
os.chmod(binary_path, old_stat.st_mode | stat.S_IEXEC)
# delete the old binary
os.remove(path + "/old")
print("mongoaudit updated, restarting...")
os.execl(binary_path, binary_path, *sys.argv)
else:
os.remove(temp_path)
print("couldn't download the latest binary")
def handle_ref_error(self):
try:
if os.stat(self.ref).st_size>0:
with open(self.ref) as f:
for i in range(2):
line=f.next().strip()
if i == 0 and line[0]!='>':
return QtGui.QMessageBox.question(self, 'Error !', 'Please check your input reference !',
QtGui.QMessageBox.Ok)
if i == 1 and len(re.findall("[^ATGCN]", line.upper()))>0:
return QtGui.QMessageBox.question(self, 'Error !', 'Please check your input reference !',
QtGui.QMessageBox.Ok)
else:
return QtGui.QMessageBox.question(self, 'Warning !', 'The selected reference file is empty, please check !',
QtGui.QMessageBox.Ok)
except:
return QtGui.QMessageBox.question(self, 'Error !', 'Please input a reference file !',
QtGui.QMessageBox.Ok)
def find_INSTALL(Makefile, flags):
'''
See the doc-string for find_prefix as well.
Sets Makefile['INSTALL'] if needed.
$(INSTALL) is normally "install", but on Solares it needs to be
"/usr/ucb/install".
'''
if 'INSTALL' not in Makefile:
trywith = [
'/usr/ucb/install'
]
for install in trywith:
try:
os.stat(install)
except:
continue
if flags['v']:
sys.stdout.write('Using "' + install + '" as `install`\n.')
Makefile['INSTALL'] = install
return False
Makefile['INSTALL'] = 'install'
return False
def __hashEntry(self, prefix, entry, s):
if stat.S_ISREG(s.st_mode):
digest = self.__index.check(prefix, entry, s, hashFile)
elif stat.S_ISDIR(s.st_mode):
digest = self.__hashDir(prefix, entry)
elif stat.S_ISLNK(s.st_mode):
digest = self.__index.check(prefix, entry, s, DirHasher.__hashLink)
elif stat.S_ISBLK(s.st_mode) or stat.S_ISCHR(s.st_mode):
digest = struct.pack("<L", s.st_rdev)
elif stat.S_ISFIFO(s.st_mode):
digest = b''
else:
digest = b''
logging.getLogger(__name__).warning("Unknown file: %s", entry)
return digest
def download(url, filename):
"""Download .su3 file, return True on success"""
USER_AGENT = "Wget/1.11.4"
url = "{}i2pseeds.su3".format(url)
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
try:
with urllib.request.urlopen(req) as resp:
with open(filename, 'wb') as f:
f.write(resp.read())
if os.stat(filename).st_size > 0:
return True
else:
return False
except URLError as e:
return False
def assert_file_contents_equal(a, b):
'''
Assert that two files have the same size and hash.
'''
def generate_error_message(a, b):
'''
This creates the error message for the assertion error
'''
size_a = os.stat(a).st_size
size_b = os.stat(b).st_size
if size_a == size_b:
return "Files have the same size but different contents"
else:
return "Files have different sizes: a:%d b: %d" % (size_a, size_b)
assert file_digest(a) == file_digest(b), generate_error_message(a, b)
def assert_file_contents_equal(a, b):
'''
Assert that two files have the same size and hash.
'''
def generate_error_message(a, b):
'''
This creates the error message for the assertion error
'''
size_a = os.stat(a).st_size
size_b = os.stat(b).st_size
if size_a == size_b:
return "Files have the same size but different contents"
else:
return "Files have different sizes: a:%d b: %d" % (size_a, size_b)
assert file_digest(a) == file_digest(b), generate_error_message(a, b)
def cached_data_age(self, name):
"""Return age of data cached at `name` in seconds or 0 if
cache doesn't exist
:param name: name of datastore
:type name: ``unicode``
:returns: age of datastore in seconds
:rtype: ``int``
"""
cache_path = self.cachefile('%s.%s' % (name, self.cache_serializer))
if not os.path.exists(cache_path):
return 0
return time.time() - os.stat(cache_path).st_mtime
def cached_data_age(self, name):
"""Return age of data cached at `name` in seconds or 0 if
cache doesn't exist
:param name: name of datastore
:type name: ``unicode``
:returns: age of datastore in seconds
:rtype: ``int``
"""
cache_path = self.cachefile('%s.%s' % (name, self.cache_serializer))
if not os.path.exists(cache_path):
return 0
return time.time() - os.stat(cache_path).st_mtime
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location, such as /tmp, it opens up an opportunity for an attacker to
replace an extracted file with an unauthorized payload. Warn the user
if a known insecure location is used.
See Distribute #375 for more details.
"""
if os.name == 'nt' and not path.startswith(os.environ['windir']):
# On Windows, permissions are generally restrictive by default
# and temp directories are not writable by other users, so
# bypass the warning.
return
mode = os.stat(path).st_mode
if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
msg = ("%s is writable by group/others and vulnerable to attack "
"when "
"used with get_resource_filename. Consider a more secure "
"location (set with .set_extraction_path or the "
"PYTHON_EGG_CACHE environment variable)." % path)
warnings.warn(msg, UserWarning)
def seek(self, offset, whence=os.SEEK_SET):
"""Similar to 'seek' of file descriptor; works only for
regular files.
"""
if whence == os.SEEK_SET:
self._overlap.Offset = offset
elif whence == os.SEEK_CUR:
self._overlap.Offset += offset
else:
assert whence == os.SEEK_END
if isinstance(self._path, str):
sb = os.stat(self._path)
self._overlap.Offset = sb.st_size + offset
else:
self._overlap.Offset = offset
def seek(self, offset, whence=os.SEEK_SET):
"""Similar to 'seek' of file descriptor; works only for
regular files.
"""
if whence == os.SEEK_SET:
self._overlap.Offset = offset
elif whence == os.SEEK_CUR:
self._overlap.Offset += offset
else:
assert whence == os.SEEK_END
if isinstance(self._path, str):
sb = os.stat(self._path)
self._overlap.Offset = sb.st_size + offset
else:
self._overlap.Offset = offset
def is_block_device(path):
'''
Confirm device at path is a valid block device node.
:returns: boolean: True if path is a block device, False if not.
'''
if not os.path.exists(path):
return False
return S_ISBLK(os.stat(path).st_mode)
def maybe_download(filename, work_directory):
"""Download the data from Yann's website, unless it's already here."""
if not os.path.exists(work_directory):
os.mkdir(work_directory)
filepath = os.path.join(work_directory, filename)
if not os.path.exists(filepath):
filepath, _ = urllib.request.urlretrieve(SOURCE_URL + filename, filepath)
statinfo = os.stat(filepath)
print('Succesfully downloaded', filename, statinfo.st_size, 'bytes.')
return filepath
def maybe_download(filename, work_directory):
"""Download the data from Yann's website, unless it's already here."""
if not os.path.exists(work_directory):
os.mkdir(work_directory)
filepath = os.path.join(work_directory, filename)
if not os.path.exists(filepath):
filepath, _ = urllib.request.urlretrieve(SOURCE_URL + filename, filepath)
statinfo = os.stat(filepath)
print('Succesfully downloaded', filename, statinfo.st_size, 'bytes.')
return filepath