Python os.path 模块,getsize() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.path.getsize()。
def dirsize(src):
"""
Takes a source directory and returns the entire size of all of it's
content(s) in bytes.
The function returns None if the size can't be properly calculated.
"""
if not isdir(src):
# Nothing to return
return 0
try:
with pushd(src, create_if_missing=False):
size = sum(getsize(f) for f in listdir('.') if isfile(f))
except (OSError, IOError):
return None
# Return our total size
return size
def open(self):
"""
Setup the internal structure.
NB : Call this function before
extracting data from a file.
"""
if self.file :
self.file.close()
try :
self.file = open(self.path, 'rb')
except Exception as e:
raise Exception("python couldn't open file %s : %s" % (self.path, e))
self.file_size = path.getsize(self.file.name)
self.creation_date = datetime.fromtimestamp(path.getctime(self.file.name))
self.modification_date = datetime.fromtimestamp(path.getmtime(self.file.name))
self.nomenclature = self.get_nomenclature()
self.factory = self.get_factory()
self.layout = self.create_layout()
def __mmap_ncs_packet_headers(self, filename):
"""
Memory map of the Neuralynx .ncs file optimized for extraction of
data packet headers
Reading standard dtype improves speed, but timestamps need to be
reconstructed
"""
filesize = getsize(self.sessiondir + sep + filename) # in byte
if filesize > 16384:
data = np.memmap(self.sessiondir + sep + filename,
dtype='<u4',
shape=((filesize - 16384) / 4 / 261, 261),
mode='r', offset=16384)
ts = data[:, 0:2]
multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
axis=0)
timestamps = np.sum(ts * multi, axis=1)
# timestamps = data[:,0] + (data[:,1] *2**32)
header_u4 = data[:, 2:5]
return timestamps, header_u4
else:
return None
def __mmap_ncs_packet_timestamps(self, filename):
"""
Memory map of the Neuralynx .ncs file optimized for extraction of
data packet headers
Reading standard dtype improves speed, but timestamps need to be
reconstructed
"""
filesize = getsize(self.sessiondir + sep + filename) # in byte
if filesize > 16384:
data = np.memmap(self.sessiondir + sep + filename,
dtype='<u4',
shape=(int((filesize - 16384) / 4 / 261), 261),
mode='r', offset=16384)
ts = data[:, 0:2]
multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
axis=0)
timestamps = np.sum(ts * multi, axis=1)
# timestamps = data[:,0] + data[:,1]*2**32
return timestamps
else:
return None
def open(self):
"""
Setup the internal structure.
NB : Call this function before
extracting data from a file.
"""
if self.file :
self.file.close()
try :
self.file = open(self.path, 'rb')
except Exception as e:
raise Exception("python couldn't open file %s : %s" % (self.path, e))
self.file_size = path.getsize(self.file.name)
self.creation_date = datetime.fromtimestamp(path.getctime(self.file.name))
self.modification_date = datetime.fromtimestamp(path.getmtime(self.file.name))
self.nomenclature = self.get_nomenclature()
self.factory = self.get_factory()
self.layout = self.create_layout()
def __mmap_ncs_packet_headers(self, filename):
"""
Memory map of the Neuralynx .ncs file optimized for extraction of
data packet headers
Reading standard dtype improves speed, but timestamps need to be
reconstructed
"""
filesize = getsize(self.sessiondir + sep + filename) # in byte
if filesize > 16384:
data = np.memmap(self.sessiondir + sep + filename,
dtype='<u4',
shape=((filesize - 16384) / 4 / 261, 261),
mode='r', offset=16384)
ts = data[:, 0:2]
multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
axis=0)
timestamps = np.sum(ts * multi, axis=1)
# timestamps = data[:,0] + (data[:,1] *2**32)
header_u4 = data[:, 2:5]
return timestamps, header_u4
else:
return None
def __mmap_ncs_packet_timestamps(self, filename):
"""
Memory map of the Neuralynx .ncs file optimized for extraction of
data packet headers
Reading standard dtype improves speed, but timestamps need to be
reconstructed
"""
filesize = getsize(self.sessiondir + sep + filename) # in byte
if filesize > 16384:
data = np.memmap(self.sessiondir + sep + filename,
dtype='<u4',
shape=(int((filesize - 16384) / 4 / 261), 261),
mode='r', offset=16384)
ts = data[:, 0:2]
multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
axis=0)
timestamps = np.sum(ts * multi, axis=1)
# timestamps = data[:,0] + data[:,1]*2**32
return timestamps
else:
return None
def __mmap_nev_file(self, filename):
""" Memory map the Neuralynx .nev file """
nev_dtype = np.dtype([
('reserved', '<i2'),
('system_id', '<i2'),
('data_size', '<i2'),
('timestamp', '<u8'),
('event_id', '<i2'),
('ttl_input', '<i2'),
('crc_check', '<i2'),
('dummy1', '<i2'),
('dummy2', '<i2'),
('extra', '<i4', (8,)),
('event_string', 'a128'),
])
if getsize(self.sessiondir + sep + filename) > 16384:
return np.memmap(self.sessiondir + sep + filename,
dtype=nev_dtype, mode='r', offset=16384)
else:
return None
def rinexobs(obsfn,writeh5=None,maxtimes=None):
stem,ext = splitext(expanduser(obsfn))
if ext[-1].lower() == 'o': #raw text file
with open(obsfn,'r') as f:
t=time.time()
lines = f.read().splitlines(True)
lines.append('')
header,version,headlines,obstimes,sats,svset = scan(lines)
print('{} is a RINEX {} file, {} kB.'.format(obsfn,version,getsize(obsfn)/1000.0))
data = processBlocks(lines,header,obstimes,svset,headlines,sats)
print("finished in {0:.2f} seconds".format(time.time()-t))
#%% save to disk (optional)
if writeh5:
h5fn = stem + '.h5'
print('saving OBS data to {}'.format(h5fn))
data.to_hdf(h5fn,key='OBS',mode='a',complevel=6,append=False)
elif ext.lower() == '.h5':
data = read_hdf(obsfn,key='OBS')
print('loaded OBS data from {} to {}'.format(blocks.items[0],blocks.items[-1]))
return data
# this will scan the document for the header info and for the line on
# which each block starts
def test_write_gff_file(self, seqprop_with_i, tmpdir):
"""Test writing the features, and that features are now loaded from a file"""
outpath = tmpdir.join('test_seqprop_with_i_write_gff_file.gff').strpath
seqprop_with_i.write_gff_file(outfile=outpath, force_rerun=True)
# Test that the file was written
assert op.exists(outpath)
assert op.getsize(outpath) > 0
# Test that file paths are correct
assert seqprop_with_i.feature_path == outpath
assert seqprop_with_i.feature_file == 'test_seqprop_with_i_write_gff_file.gff'
assert seqprop_with_i.feature_dir == tmpdir
# Test that features cannot be changed
with pytest.raises(ValueError):
seqprop_with_i.features = ['NOFEATURES']
def clear_cache(force = False):
"""
If the folder exists, and has more than 5MB of icons in the cache, delete
it to clear all the icons then recreate it.
"""
from os.path import getsize, join, isfile, exists
from os import makedirs, listdir
from sublime import cache_path
from shutil import rmtree
# The icon cache path
icon_path = join(cache_path(), "GutterColor")
# The maximum amount of space to take up
limit = 5242880 # 5 MB
if exists(icon_path):
size = sum(getsize(join(icon_path, f)) for f in listdir(icon_path) if isfile(join(icon_path, f)))
if force or (size > limit): rmtree(icon_path)
if not exists(icon_path): makedirs(icon_path)
def test_open(self):
store = self.create_ssh_store()
target_filename = 'sample_text_file1.txt'
with open(self.sample_text_file1, 'rb') as f:
length = store.put(target_filename, f)
self.assertEqual(length, getsize(self.sample_text_file1))
self.assertTrue(exists(join(self.temp_path, target_filename)))
# Reading
with store.open(target_filename, mode='rb') as stored_file, \
open(self.sample_text_file1, mode='rb') as original_file:
self.assertEqual(stored_file.read(), original_file.read())
# Writing
new_content = b'Some Binary Data'
with store.open(target_filename, mode='wb') as stored_file:
stored_file.write(new_content)
with store.open(target_filename, mode='rb') as stored_file:
self.assertEqual(stored_file.read(), new_content)
def test_open(self):
store = FileSystemStore(self.temp_path, self.base_url)
target_filename = 'test_open/sample_text_file1.txt'
with open(self.sample_text_file1, 'rb') as f:
length = store.put(target_filename, f)
self.assertEqual(length, getsize(self.sample_text_file1))
self.assertTrue(exists(join(self.temp_path, target_filename)))
# Reading
with store.open(target_filename, mode='rb') as stored_file, \
open(self.sample_text_file1, mode='rb') as original_file:
self.assertEqual(stored_file.read(), original_file.read())
# Writing
new_content = b'Some Binary Data'
with store.open(target_filename, mode='wb') as stored_file:
stored_file.write(new_content)
with store.open(target_filename, mode='rb') as stored_file:
self.assertEqual(stored_file.read(), new_content)
def addService(self, service):
from os import path
from enigma import eServiceCenter, iServiceInformation
from ServiceReference import ServiceReference
from time import localtime, time
self.source = service
serviceHandler = eServiceCenter.getInstance()
info = serviceHandler.info(service)
sDescr = info and info.getInfoString(service, iServiceInformation.sDescription) or ""
self.DVBdescr = sDescr
sTimeCreate = info.getInfo(service, iServiceInformation.sTimeCreate)
if sTimeCreate > 1:
self.timeCreate = localtime(sTimeCreate)
serviceref = ServiceReference(info.getInfoString(service, iServiceInformation.sServiceref))
name = info and info.getName(service) or "Title" + sDescr
self.DVBname = name
self.DVBchannel = serviceref.getServiceName()
self.inputfile = service.getPath()
self.filesize = path.getsize(self.inputfile)
self.estimatedDiskspace = self.filesize
self.length = info.getLength(service)
def shall_skip(module, opts):
# type: (unicode, Any) -> bool
"""Check if we want to skip this module."""
# skip if the file doesn't exist and not using implicit namespaces
if not opts.implicit_namespaces and not path.exists(module):
return True
# skip it if there is nothing (or just \n or \r\n) in the file
if path.exists(module) and path.getsize(module) <= 2:
return True
# skip if it has a "private" name and this is selected
filename = path.basename(module)
if filename != '__init__.py' and filename.startswith('_') and \
not opts.includeprivate:
return True
return False
def get_sha1_by_slice(file_name, slice_size):
""" Get SHA array based on Qcloud Slice Upload Interface
:param file_name: local file path
:param slice_size: slice size in bit
:return: sha array like [{“offset”:0, “datalen”:1024,”datasha”:”aaa”}, {}, {}]
"""
from os import path
with open(file_name, 'rb') as f:
result = []
file_size = path.getsize(file_name)
sha1_obj = Sha1Hash()
for current_offset in range(0, file_size, slice_size):
data_length = min(slice_size, file_size - current_offset)
sha1_obj.update(f.read(data_length))
sha1_val = sha1_obj.inner_digest()
result.append({"offset": current_offset, "datalen": data_length, "datasha": sha1_val})
result[-1]['datasha'] = sha1_obj.hexdigest()
return result
def tqdm_open(filename, encoding='utf8'):
"""
???????? ?????, ????????? ? tqdm
"""
total = getsize(filename)
def wrapped_line_iterator(fd):
with tqdm(total=total, unit="B", unit_scale=True, desc=basename(filename), miniters=1) as pb:
processed_bytes = 0
for line in fd:
processed_bytes += len(line)
if processed_bytes >= 1024 * 1024:
pb.update(processed_bytes)
processed_bytes = 0
yield line
pb.update(processed_bytes)
with open(filename, encoding=encoding) as fd:
yield wrapped_line_iterator(fd)
def hashsize(path):
'''
Generate SHA-1 hash + file size string for the given
filename path. Used to check integrity of downloads.
Resulting string is space separated 'hash size':
>>> hashsize('locally.py')
'fbb498a1d3a3a47c8c1ad5425deb46b635fac2eb 2006'
'''
size = getsize(path)
h = sha1()
with open(path, 'rb') as source:
while True:
# read in 64k blocks, because some files are too big
# and free memory is not enough
c = source.read(64*1024)
if not c:
break
h.update(c)
return '%s %s' % (h.hexdigest(), size)
def timeit(parser_class, file_name, message, **kwargs):
print(message, 'File:', file_name)
file_name = join(dirname(__file__), 'test_inputs', file_name)
file_size = getsize(file_name)
print('File size: {:.2f}'.format(file_size/1000), 'KB')
this_folder = dirname(__file__)
g = Grammar.from_file(join(this_folder, 'rhapsody.pg'))
parser = parser_class(g, **kwargs)
t_start = time.time()
with open(file_name) as f:
parser.parse(f.read())
t_end = time.time()
print('Elapsed time: {:.2f}'.format(t_end - t_start), 'sec')
print('Speed = {:.2f}'.format(file_size/1000/(t_end - t_start)),
'KB/sec\n')
def judge_twice_url(self, save_path, save_type):
'''
???????????? ?? title ? mian.html????????????????
??????????????
1?title??? Page has moved
2???htmml????3KB
'''
page_path = pjoin(save_path, 'main.html')
url_path = pjoin(save_path, 'url_file')
with open(page_path, 'r') as f:
content = f.read()
title = get_title(content)
if title == 'Page has moved' or getsize(page_path) <= 3072:
with open(url_path, 'r') as f:
url = f.read()
redict_url = self.get_twice_page_url(content)
# ???????
if redict_url:
if redict_url.find('http') != -1:
logger['logger_file_debug'].debug(
"redict_url: %s" % (redict_url, ))
self.re_download_twice_web(
redict_url, save_path, save_type)
def addService(self, service):
from os import path
from enigma import eServiceCenter, iServiceInformation
from ServiceReference import ServiceReference
from time import localtime, time
self.source = service
serviceHandler = eServiceCenter.getInstance()
info = serviceHandler.info(service)
sDescr = info and info.getInfoString(service, iServiceInformation.sDescription) or ""
self.DVBdescr = sDescr
sTimeCreate = info.getInfo(service, iServiceInformation.sTimeCreate)
if sTimeCreate > 1:
self.timeCreate = localtime(sTimeCreate)
serviceref = ServiceReference(info.getInfoString(service, iServiceInformation.sServiceref))
name = info and info.getName(service) or "Title" + sDescr
self.DVBname = name
self.DVBchannel = serviceref.getServiceName()
self.inputfile = service.getPath()
self.filesize = path.getsize(self.inputfile)
self.estimatedDiskspace = self.filesize
self.length = info.getLength(service)
def as_contigset(fasta_file, xml_file):
if fasta_file == xml_file or xml_file is None:
if not op.isfile(fasta_file) or op.getsize(fasta_file) == 0:
return ContigSet()
return ContigSet(fasta_file)
file_size = op.getsize(fasta_file)
fai_file = fasta_file + ".fai"
if op.exists(fai_file):
os.remove(fai_file)
ds = ContigSet(fasta_file, generateIndices=True)
ds.write(xml_file)
if not file_size > 0:
with open(fai_file, "w") as fai:
fai.write("")
return ds
def main():
with open('dlink_ftp.dlink.eu_filelist.csv', 'w') as fout:
cw = csv.writer(fout, dialect='excel')
cw.writerow(['ftp_url', 'file_size', 'file_date', 'model', 'file_sha1', 'file_md5'])
with open('dlink_ftp.dlink.eu_filelist.txt', 'r') as fin:
for line in fin:
line=line.strip()
if not line:
continue
ftpurl, fsize, fdate = line.split('\t', 2)
fdate = datetime.fromtimestamp(float(fdate))
fname = 'output/D-Link/ftp.dlink.eu/' + ftpurl.split('/')[-1]
sha1 = getFileSha1(fname)
md5 = getFileMd5(fname)
fsize = path.getsize(fname)
model = get_model_from_ftp_url(ftpurl)
cw.writerow([ftpurl, fsize, fdate, model,sha1,md5])
print('%s,%s,%s,%s'%(ftpurl,fsize,fdate,model))
def filedata(self):
filedata = {
resource.descriptor['path']: {
'name': resource.descriptor['name'],
'length': getsize(resource.source),
'md5': compute_hash(resource.source),
'type': resource.descriptor.get('mediatype', 'text/'+resource.descriptor['path'].split('.')[-1]),
} for resource in self
}
descriptor_file = {
basename(self.filepath): {
'name': self.name,
'length': getsize(self.filepath),
'md5': compute_hash(self.filepath),
'type': 'application/octet-stream',
}
}
filedata.update(descriptor_file)
return {
'filedata': filedata,
'metadata': {
'owner': self.user.id,
'name': self.name
}
}
def TestDownload(self):
try:
a = datetime.now()
info('Excuting regular download test for network speed')
url = self.config.cloudConfig.DownloadSpeedTestUrl if 'DownloadSpeedTestUrl' in self.config.cloudConfig else defaultUrl
debug(url + ' ' + download_path)
request.urlretrieve(url, download_path)
request.urlcleanup()
b = datetime.now()
c = b - a
if path.exists(download_path):
size = path.getsize(download_path)/mb
self.downloadSpeed = size/c.total_seconds()
remove(download_path)
return True
except socket_error as serr:
error ('TestDownload:' + str(serr))
ret = False
Daemon.OnFailure('cloud', serr.errno)
return
except:
exception('TestDownload Failed')
return False
def rotator(source, dest):
try:
# print('Log rotator, pid:' + str(getpid()))
size=path.getsize(source)
if size > 100000000:
# files larger than 100MB will be deleted
remove(source)
else:
tar = tarfile.open(dest, "w:bz2")
tar.add(source)
tar.close()
remove(source)
# Remove old myDevices.log backups if they are older than a week. This code can be removed
# in later versions if myDevices.log files have been replaced with cayenne.log.
for old_file in iglob('/var/log/myDevices/myDevices.log*'):
if path.getmtime(old_file) + 604800 < time.time():
remove(old_file)
except Exception as ex:
print('Log rotator failed with: ' +str(ex))
def addService(self, service):
from os import path
from enigma import eServiceCenter, iServiceInformation
from ServiceReference import ServiceReference
from time import localtime
self.source = service
serviceHandler = eServiceCenter.getInstance()
info = serviceHandler.info(service)
sDescr = info and info.getInfoString(service, iServiceInformation.sDescription) or ""
self.DVBdescr = sDescr
sTimeCreate = info.getInfo(service, iServiceInformation.sTimeCreate)
if sTimeCreate > 1:
self.timeCreate = localtime(sTimeCreate)
serviceref = ServiceReference(info.getInfoString(service, iServiceInformation.sServiceref))
name = info and info.getName(service) or "Title" + sDescr
self.DVBname = name
self.DVBchannel = serviceref.getServiceName()
self.inputfile = service.getPath()
self.filesize = path.getsize(self.inputfile)
self.estimatedDiskspace = self.filesize
self.length = info.getLength(service)
def locate(self, spin, request):
path = join(spin.app.app_dir, spin.app.static_dir, basename(request.path))
if not isfile(path):
return
# Where we are going to serve files.
# I might spawn an event like FILE_NOT_FOUND.
# So, users could use it to send appropriate answers.
type_file, encoding = guess_type(path)
default_type = 'application/octet-stream'
spin.add_header(('Content-Type', type_file if type_file else default_type),
('Content-Length', getsize(path)))
spin.send_headers()
xmap(spin, OPEN_FILE_ERR, lambda con, err: lose(con))
drop(spin, path)
def upload_image(self, filename, album_uri, caption=None, title=None, keywords=None):
headers = {
'User-Agent': self.__user_agent,
'X-Smug-ResponseType': 'JSON',
'X-Smug-Version': 'v2',
'Content-Type': guess_type(filename)[0],
'X-Smug-AlbumUri': album_uri,
'X-Smug-FileName': filename,
'Content-Length': path.getsize(filename),
}
if caption is not None:
headers['X-Smug-Caption']=caption
if title is not None:
headers['X-Smug-Title']=title
if keywords is not None:
headers['X-Smug-Keywords']=keywords
with open(filename, "rb") as f:
data=f.read()
return self.raw_post(self.UPLOAD_URL, data=data, headers=headers)
def shall_skip(module, opts):
# type: (unicode, Any) -> bool
"""Check if we want to skip this module."""
# skip if the file doesn't exist and not using implicit namespaces
if not opts.implicit_namespaces and not path.exists(module):
return True
# skip it if there is nothing (or just \n or \r\n) in the file
if path.exists(module) and path.getsize(module) <= 2:
return True
# skip if it has a "private" name and this is selected
filename = path.basename(module)
if filename != '__init__.py' and filename.startswith('_') and \
not opts.includeprivate:
return True
return False
def traverse_dir(path):
file_dict = {}
dir_dict = {}
count = 1
for root, dirs, files in walk(path):
for d in dirs:
abs_p = join(root, d)
dir_dict[abs_p] = 0
print(abs_p)
count += 1
if count % 200 == 0:
print('%s files scanned' % count)
for f in files:
abs_p = join(root, f)
file_dict[abs_p] = getsize(abs_p)
print(abs_p)
count += 1
if count % 200 == 0:
print('%s files scanned' % count)
return file_dict, dir_dict
def do_show_bulletin_attachment(request, bulletin_id, localfile, remotefile):
#pylint: disable=unused-argument
if not cavedb.perms.is_bulletin_allowed(bulletin_id):
raise Http404
if not isfile(localfile):
raise Http404
mimetype = guess_type(localfile)[0]
if mimetype is None:
mimetype = "application/octet-stream"
try:
wrapper = FileWrapper(open(localfile, 'rb'))
response = FileResponse(wrapper, content_type=mimetype)
if remotefile and (mimetype is None or not mimetype.startswith('image')):
response['Content-Disposition'] = 'attachment; filename=' + remotefile
response['Content-Length'] = getsize(localfile)
except IOError:
print('Cannot find %s\n' % (localfile), file=sys.stderr)
raise Http404
return response
def getfilesize(filename, ratio=None):
rawsize = op.getsize(filename)
if not filename.endswith(".gz"):
return rawsize
import struct
fo = open(filename, 'rb')
fo.seek(-4, 2)
r = fo.read()
fo.close()
size = struct.unpack('<I', r)[0]
# This is only ISIZE, which is the UNCOMPRESSED modulo 2 ** 32
if ratio is None:
return size
# Heuristic
heuristicsize = rawsize / ratio
while size < heuristicsize:
size += 2 ** 32
if size > 2 ** 32:
logging.warn(\
"Gzip file estimated uncompressed size: {0}.".format(size))
return size
def file_manager(request):
def slugify(text):
import re
return re.sub(r'[ /"#!:]+', '_', text)
if not is_admin_or_root(request.user):
raise PermissionDenied
if request.method == 'POST':
try:
file = request.FILES['file']
save_uploaded_file_to(file, settings.UPLOAD_DIR, filename=slugify(file.name))
except Exception as e:
raise PermissionDenied(repr(e))
return render(request, 'filemanager.jinja2', context={
'file_list': list(map(lambda x: {
'name': x,
'modified_time': datetime.fromtimestamp(path.getmtime(path.join(settings.UPLOAD_DIR, x))).
strftime(settings.DATETIME_FORMAT_TEMPLATE),
'size': str(path.getsize(path.join(settings.UPLOAD_DIR, x)) // 1024) + "K"
}, filter(lambda x: path.isfile(path.join(settings.UPLOAD_DIR, x)), listdir(settings.UPLOAD_DIR))))
})
def dir_size(dir):
""" calculate the size of files under a dir
based on the os module example"""
# It's really hard to give an approximate value for package's
# installed size. Gettin a sum of all files' sizes if far from
# being true. Using 'du' command (like Debian does) can be a
# better solution :(.
# Not really, du calculates size on disk, this is much better -- exa
from os.path import getsize, islink, isdir, exists
join = join_path
if exists(dir) and (not isdir(dir) and not islink(dir)):
#so, this is not a directory but file..
return getsize(dir)
if islink(dir):
return long(len(os.readlink(dir)))
def sizes():
for root, dirs, files in os.walk(dir):
yield sum([getsize(join(root, name)) for name in files if not islink(join(root,name))])
yield sum([long(len(os.readlink((join(root, name))))) for name in files if islink(join(root,name))])
return sum( sizes() )
def __init__(self, inputCsv, resolver):
if not op.isfile(inputCsv):
raise ValueError("Missing input file: %s" % inputCsv)
nbytes = min(32, op.getsize(inputCsv))
raw= open(inputCsv, 'rb').read(nbytes)
if raw.startswith(codecs.BOM_UTF8):
raise TableValidationError("Input CSV file is in UTF-8 format. Please convert to ASCII or remove Byte Order Mark (BOM)")
try:
with open(inputCsv) as f:
cr = csv.reader(f)
allRows = list(cr)
columnNames, rows = \
allRows[0], allRows[1:]
self.tbl = eztable.Table(columnNames, rows)
except:
raise TableValidationError("Input CSV file can't be read/parsed:" + str(sys.exc_info()[0]))
self._validateTable()
self._resolveInputs(resolver)
def _download_wrapped_file(download):
download_path = download.abspath
# We do not allow symlinks as downloads for security reasons
if not path.exists(download_path) or path.islink(download_path):
return HttpResponse("Download not found", status=HTTP_NOT_FOUND)
wrapper = FileWrapper(open(download_path, "rb"))
response = HttpResponse(wrapper, content_type='application/force-download')
response['Content-Disposition'] = 'attachment; filename="{}"'.format(
DOWNLOAD_FNAME_TEMLATE.format(
filename=path.basename(download_path),
download_pk=download.pk,
problem_slug=download.problem.slug
)
)
response['Content-Length'] = path.getsize(download_path)
return response
def get_sha1_by_slice(file_name, slice_size):
""" Get SHA array based on Qcloud Slice Upload Interface
:param file_name: local file path
:param slice_size: slice size in bit
:return: sha array like [{“offset”:0, “datalen”:1024,”datasha”:”aaa”}, {}, {}]
"""
from os import path
with open(file_name, 'rb') as f:
result = []
file_size = path.getsize(file_name)
sha1_obj = Sha1Hash()
for current_offset in range(0, file_size, slice_size):
data_length = min(slice_size, file_size - current_offset)
sha1_obj.update(f.read(data_length))
sha1_val = sha1_obj.inner_digest()
result.append({"offset": current_offset, "datalen": data_length, "datasha": sha1_val})
result[-1]['datasha'] = sha1_obj.hexdigest()
return result
def compile(self, nb_procs):
if nb_procs > 1:
target = "lsa.mpi"
else:
target = "lsa"
cmd_comp = "make -f %sMakefile -C %s %s 1>/dev/null" % (
self.getTempDirectory(),
self.getTempDirectory(),
target)
res_comp = call(cmd_comp,
stdout=open("%sout_optim_comp" % self.getTempDirectory(),"w"),
stderr=open("%serr_optim_comp" % self.getTempDirectory(),"w"),
shell=True, preexec_fn=setpgrp, close_fds=True)
if res_comp != 0 or getsize(self.getTempDirectory() + "err_optim_comp") > 0:
return self.OPTIM_FAILURE
else:
return self.OPTIM_SUCCESS
def Cleanup(delete_dir, delete_threshold, freeup_amount):
free_space = FreeSpaceMB(delete_dir)
if free_space < delete_threshold:
files = [f for f in map(lambda x: join(delete_dir, x), listdir(delete_dir)) \
if isfile(f) and not islink(f)]
# Sort files acsending based on their modification time.
files.sort(key=lambda f: getmtime(f))
freed = 0.0
# Delete enough files to free up enough space that macthes freeup_amount
for f in files:
# Size of file in MB
f_size = getsize(f) / 1024 / 1024
remove(f)
print "Deleted ", f
freed = freed + f_size
if freed >= freeup_amount:
break
def addService(self, service):
from os import path
from enigma import eServiceCenter, iServiceInformation
from ServiceReference import ServiceReference
from time import localtime, time
self.source = service
serviceHandler = eServiceCenter.getInstance()
info = serviceHandler.info(service)
sDescr = info and info.getInfoString(service, iServiceInformation.sDescription) or ""
self.DVBdescr = sDescr
sTimeCreate = info.getInfo(service, iServiceInformation.sTimeCreate)
if sTimeCreate > 1:
self.timeCreate = localtime(sTimeCreate)
serviceref = ServiceReference(info.getInfoString(service, iServiceInformation.sServiceref))
name = info and info.getName(service) or "Title" + sDescr
self.DVBname = name
self.DVBchannel = serviceref.getServiceName()
self.inputfile = service.getPath()
self.filesize = path.getsize(self.inputfile)
self.estimatedDiskspace = self.filesize
self.length = info.getLength(service)
def __len__(self):
"""
Returns the length of the content
"""
if not self.filepath:
# If there is no filepath, then we're probably dealing with a
# stream in memory like a StringIO or BytesIO stream.
if self.stream:
# Advance to the end of the file
ptr = self.stream.tell()
# Advance to the end of the file and get our length
length = self.stream.seek(0L, SEEK_END)
if length != ptr:
# Return our pointer
self.stream.seek(ptr, SEEK_SET)
else:
# No Stream or Filepath; nothing has been initialized
# yet at all so just return 0
length = 0
else:
if self.stream and self._dirty is True:
self.stream.flush()
self._dirty = False
# Get the size
length = getsize(self.filepath)
return length
def __init__(self, filename, counter):
self.filename = path.abspath(filename)
self.queue = Queue()
self.check_chain = CheckerChain(self.queue, counter)
self.observer = Observer()
self.fd = None
self.offset = 0
if path.isfile(self.filename):
self.fd = open(self.filename)
self.offset = path.getsize(self.filename)
def on_moved(self, event):
if path.abspath(event.src_path) == self.filename:
self.fd.close()
if path.abspath(event.dest_path) == self.filename and path.isfile(self.filename):
self.fd = open(self.filename)
self.offset = path.getsize(self.filename)
def on_created(self, event):
if path.abspath(event.src_path) == self.filename and path.isfile(self.filename):
self.fd = open(self.filename)
self.offset = path.getsize(self.filename)
def valid_file_exists(file):
"""Determines whether a file exists and its "valid"
(i.e., the file size is greater than 0; if it's 0, it probably faild dueto an RPC error)"""
return path.isfile(file) and path.getsize(file) > 0
#endregion
def needs_download(url, filepath):
if not op.exists(filepath):
return True
else:
response = requests.head(url)
remote_size = int(response.headers['Content-Length'])
local_size = op.getsize(filepath)
if remote_size > local_size:
return True
else:
return False
def __mmap_nse_packets(self, filename):
"""
Memory map of the Neuralynx .ncs file optimized for extraction of
data packet headers
Reading standard dtype improves speed, but timestamps need to be
reconstructed
"""
filesize = getsize(self.sessiondir + sep + filename) # in byte
if filesize > 16384:
data = np.memmap(self.sessiondir + sep + filename,
dtype='<u2',
shape=((filesize - 16384) / 2 / 56, 56),
mode='r', offset=16384)
# reconstructing original data
# first 4 ints -> timestamp in microsec
timestamps = data[:, 0] + data[:, 1] * 2 ** 16 + data[:,
2] * 2 ** 32 + \
data[
:,
3] * 2 ** 48
channel_id = data[:, 4] + data[:, 5] * 2 ** 16
cell_number = data[:, 6] + data[:, 7] * 2 ** 16
features = [data[:, p] + data[:, p + 1] * 2 ** 16 for p in
range(8, 23, 2)]
features = np.array(features, dtype='i4')
data_points = data[:, 24:56].astype('i2')
del data
return timestamps, channel_id, cell_number, features, data_points
else:
return None
def __mmap_ncs_data(self, filename):
""" Memory map of the Neuralynx .ncs file optimized for data
extraction"""
if getsize(self.sessiondir + sep + filename) > 16384:
data = np.memmap(self.sessiondir + sep + filename,
dtype=np.dtype(('i2', (522))), mode='r',
offset=16384)
# removing data packet headers and flattening data
return data[:, 10:]
else:
return None
def __mmap_ntt_file(self, filename):
""" Memory map the Neuralynx .nse file """
nse_dtype = np.dtype([
('timestamp', '<u8'),
('sc_number', '<u4'),
('cell_number', '<u4'),
('params', '<u4', (8,)),
('data', '<i2', (32, 4)),
])
if getsize(self.sessiondir + sep + filename) > 16384:
return np.memmap(self.sessiondir + sep + filename,
dtype=nse_dtype, mode='r', offset=16384)
else:
return None