Python os 模块,sep() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.sep()。
def zipdir(archivename, basedir):
'''Zip directory, from J.F. Sebastian http://stackoverflow.com/'''
assert os.path.isdir(basedir)
with closing(ZipFile(archivename, "w", ZIP_DEFLATED)) as z:
for root, dirs, files in os.walk(basedir):
#NOTE: ignore empty directories
for fn in files:
if fn[-4:]!='.zip':
absfn = os.path.join(root, fn)
zfn = absfn[len(basedir)+len(os.sep):] #XXX: relative path
z.write(absfn, zfn)
# ================ Inventory input data and create data structure =================
def create_bird_config(pod_n, router_n, config):
print("[PocketInternet][Pod {}] Configuring bird_{}".format(
pod_n,
router_n))
template = j2env.get_template('bird.conf.j2')
as_number = generate_as_number(pod_n)
environment = {
"our_as": as_number,
"peerings": compile_peering_details(pod_n, router_n, "ipv4", config)
}
filename = "bird_{}.conf".format(router_n)
config = template.render(**environment)
config_dir = os.path.join(CONFIG_ROOT, "pod_{}".format(pod_n))
with open(os.path.join(config_dir, filename), 'w') as fp:
print("[PocketInternet][Pod {}] Writing Config File: {}{}{}".format(
pod_n,
config_dir,
os.sep,
filename))
fp.write(config)
def _glob_to_re(self, pattern):
"""Translate a shell-like glob pattern to a regular expression.
Return a string containing the regex. Differs from
'fnmatch.translate()' in that '*' does not match "special characters"
(which are platform-specific).
"""
pattern_re = fnmatch.translate(pattern)
# '?' and '*' in the glob pattern become '.' and '.*' in the RE, which
# IMHO is wrong -- '?' and '*' aren't supposed to match slash in Unix,
# and by extension they shouldn't match such "special characters" under
# any OS. So change all non-escaped dots in the RE to match any
# character except the special characters (currently: just os.sep).
sep = os.sep
if os.sep == '\\':
# we're using a regex to manipulate a regex, so we need
# to escape the backslash twice
sep = r'\\\\'
escaped = r'\1[^%s]' % sep
pattern_re = re.sub(r'((?<!\\)(\\\\)*)\.', escaped, pattern_re)
return pattern_re
def create_bird6_config(pod_n, router_n, config):
print("[PocketInternet][Pod {}] Configuring bird6_{}".format(
pod_n,
router_n))
template = j2env.get_template('bird6.conf.j2')
as_number = generate_as_number(pod_n)
environment = {
"our_as": as_number,
"peerings": compile_peering_details(pod_n, router_n, "ipv6", config)
}
filename = "bird6_{}.conf".format(router_n)
config = template.render(**environment)
config_dir = os.path.join(CONFIG_ROOT, "pod_{}".format(pod_n))
with open(os.path.join(config_dir, filename), 'w') as fp:
print("[PocketInternet][Pod {}] Writing Config File: {}{}{}".format(
pod_n,
config_dir,
os.sep,
filename))
fp.write(config)
def del_file(self, location, file, dir=None, timeout=None):
"""Must be used with 'yield' as
'loc = yield scheduler.del_file(location, "file1")'.
Delete 'file' from peer at 'location'. 'dir' must be same as that used
for 'send_file'.
"""
if isinstance(dir, basestring) and dir:
dir = dir.strip()
# reject absolute path for dir
if os.path.join(os.sep, dir) == dir:
raise StopIteration(-1)
kwargs = {'file': os.path.basename(file), 'dir': dir}
req = _NetRequest('del_file', kwargs=kwargs, dst=location, timeout=timeout)
reply = yield _Peer._sync_reply(req)
if reply is None:
reply = -1
raise StopIteration(reply)
def del_file(self, location, file, dir=None, timeout=None):
"""Must be used with 'yield' as
'loc = yield scheduler.del_file(location, "file1")'.
Delete 'file' from peer at 'location'. 'dir' must be same as that used
for 'send_file'.
"""
if isinstance(dir, str) and dir:
dir = dir.strip()
# reject absolute path for dir
if os.path.join(os.sep, dir) == dir:
raise StopIteration(-1)
kwargs = {'file': os.path.basename(file), 'dir': dir}
req = _NetRequest('del_file', kwargs=kwargs, dst=location, timeout=timeout)
reply = yield _Peer._sync_reply(req)
if reply is None:
reply = -1
raise StopIteration(reply)
def _get_records(self):
"""
Get the list of installed files for the distribution
:return: A list of tuples of path, hash and size. Note that hash and
size might be ``None`` for some entries. The path is exactly
as stored in the file (which is as in PEP 376).
"""
results = []
r = self.get_distinfo_resource('RECORD')
with contextlib.closing(r.as_stream()) as stream:
with CSVReader(stream=stream) as record_reader:
# Base location is parent dir of .dist-info dir
#base_location = os.path.dirname(self.path)
#base_location = os.path.abspath(base_location)
for row in record_reader:
missing = [None for i in range(len(row), 3)]
path, checksum, size = row + missing
#if not os.path.isabs(path):
# path = path.replace('/', os.sep)
# path = os.path.join(base_location, path)
results.append((path, checksum, size))
return results
def get_resources_dests(resources_root, rules):
"""Find destinations for resources files"""
def get_rel_path(base, path):
# normalizes and returns a lstripped-/-separated path
base = base.replace(os.path.sep, '/')
path = path.replace(os.path.sep, '/')
assert path.startswith(base)
return path[len(base):].lstrip('/')
destinations = {}
for base, suffix, dest in rules:
prefix = os.path.join(resources_root, base)
for abs_base in iglob(prefix):
abs_glob = os.path.join(abs_base, suffix)
for abs_path in iglob(abs_glob):
resource_file = get_rel_path(resources_root, abs_path)
if dest is None: # remove the entry if it was here
destinations.pop(resource_file, None)
else:
rel_path = get_rel_path(abs_base, abs_path)
rel_dest = dest.replace(os.path.sep, '/').rstrip('/')
destinations[resource_file] = rel_dest + '/' + rel_path
return destinations
def convert_path(pathname):
"""Return 'pathname' as a name that will work on the native filesystem.
The path is split on '/' and put back together again using the current
directory separator. Needed because filenames in the setup script are
always supplied in Unix style, and have to be converted to the local
convention before we can actually use them in the filesystem. Raises
ValueError on non-Unix-ish systems if 'pathname' either starts or
ends with a slash.
"""
if os.sep == '/':
return pathname
if not pathname:
return pathname
if pathname[0] == '/':
raise ValueError("path '%s' cannot be absolute" % pathname)
if pathname[-1] == '/':
raise ValueError("path '%s' cannot end with '/'" % pathname)
paths = pathname.split('/')
while os.curdir in paths:
paths.remove(os.curdir)
if not paths:
return os.curdir
return os.path.join(*paths)
def _find(self, path):
path = path[self.prefix_len:]
if path in self._files:
result = True
else:
if path and path[-1] != os.sep:
path = path + os.sep
i = bisect.bisect(self.index, path)
try:
result = self.index[i].startswith(path)
except IndexError:
result = False
if not result:
logger.debug('_find failed: %r %r', path, self.loader.prefix)
else:
logger.debug('_find worked: %r %r', path, self.loader.prefix)
return result
def build(cls, path):
"""
Build a dictionary similar to the zipimport directory
caches, except instead of tuples, store ZipInfo objects.
Use a platform-specific path separator (os.sep) for the path keys
for compatibility with pypy on Windows.
"""
with ContextualZipFile(path) as zfile:
items = (
(
name.replace('/', os.sep),
zfile.getinfo(name),
)
for name in zfile.namelist()
)
return dict(items)
def _index(self):
try:
return self._dirindex
except AttributeError:
ind = {}
for path in self.zipinfo:
parts = path.split(os.sep)
while parts:
parent = os.sep.join(parts[:-1])
if parent in ind:
ind[parent].append(parts[-1])
break
else:
ind[parent] = [parts.pop()]
self._dirindex = ind
return ind
def build(cls, path):
"""
Build a dictionary similar to the zipimport directory
caches, except instead of tuples, store ZipInfo objects.
Use a platform-specific path separator (os.sep) for the path keys
for compatibility with pypy on Windows.
"""
with ContextualZipFile(path) as zfile:
items = (
(
name.replace('/', os.sep),
zfile.getinfo(name),
)
for name in zfile.namelist()
)
return dict(items)
def _index(self):
try:
return self._dirindex
except AttributeError:
ind = {}
for path in self.zipinfo:
parts = path.split(os.sep)
while parts:
parent = os.sep.join(parts[:-1])
if parent in ind:
ind[parent].append(parts[-1])
break
else:
ind[parent] = [parts.pop()]
self._dirindex = ind
return ind
def init_work_dir(self):
retval = os.getcwd()
print '#current dir is : ' + retval
# ??????
store_dir = retval + os.sep + 'tmp'
print '#all imgs are going to be stored in dir :' + store_dir
if not os.path.exists(store_dir):
print '#tmp dir does not exist, attemp to mkdir'
os.mkdir(store_dir)
print '#mkdir sucessfully'
else:
print '#tmp dir is already exist'
self.store_dir = store_dir
# print '#now change current dir to tmp'
# os.chdir(store_dir) #no neccessary
# print os.getcwd()
def saveFile(self, url, page, idx):
user_define_name = self.now_date() + '_p_' + str(page) + '_' + string.zfill(idx, 2) # ??2?
file_ext = self.file_extension(url) # ???
save_file_name = user_define_name + "_" + file_ext
# ???????open??
# urllib.urlretrieve(item[0], self.save_path + save_file_name)
# ????
url = self.CheckUrlValidate(url)
try:
pic = requests.get(url, timeout=30)
f = open(self.store_dir + os.sep + save_file_name, 'wb')
f.write(pic.content)
f.close()
print '\ndone save file ' + save_file_name
except ReadTimeout:
print 'save file %s failed. cause by timeout(30)' %(save_file_name)
except Exception, e:
print 'this python version does not support https.'
print e
#??url????http:??
def init_work_dir(self):
retval = os.getcwd()
print '#current dir is : ' + retval
# ??????
store_dir = retval + os.sep + 'tmp'
print '#all imgs are going to be stored in dir :' + store_dir
if not os.path.exists(store_dir):
print '#tmp dir does not exist, attemp to mkdir'
os.mkdir(store_dir)
print '#mkdir sucessfully'
else:
print '#tmp dir is already exist'
self.store_dir = store_dir
# print '#now change current dir to tmp'
# os.chdir(store_dir) #no neccessary
# print os.getcwd()
def __getitem__(self, index):
"""__getitem__
:param index:
"""
img_path = self.files[self.split][index].rstrip()
lbl_path = os.path.join(self.annotations_base,
img_path.split(os.sep)[-2],
os.path.basename(img_path)[:-15] + 'gtFine_labelIds.png')
img = m.imread(img_path)
img = np.array(img, dtype=np.uint8)
lbl = m.imread(lbl_path)
lbl = self.encode_segmap(np.array(lbl, dtype=np.uint8))
if self.augmentations is not None:
img, lbl = self.augmentations(img, lbl)
if self.is_transform:
img, lbl = self.transform(img, lbl)
return img, lbl
def _get_records(self):
"""
Get the list of installed files for the distribution
:return: A list of tuples of path, hash and size. Note that hash and
size might be ``None`` for some entries. The path is exactly
as stored in the file (which is as in PEP 376).
"""
results = []
r = self.get_distinfo_resource('RECORD')
with contextlib.closing(r.as_stream()) as stream:
with CSVReader(stream=stream) as record_reader:
# Base location is parent dir of .dist-info dir
#base_location = os.path.dirname(self.path)
#base_location = os.path.abspath(base_location)
for row in record_reader:
missing = [None for i in range(len(row), 3)]
path, checksum, size = row + missing
#if not os.path.isabs(path):
# path = path.replace('/', os.sep)
# path = os.path.join(base_location, path)
results.append((path, checksum, size))
return results
def get_resources_dests(resources_root, rules):
"""Find destinations for resources files"""
def get_rel_path(base, path):
# normalizes and returns a lstripped-/-separated path
base = base.replace(os.path.sep, '/')
path = path.replace(os.path.sep, '/')
assert path.startswith(base)
return path[len(base):].lstrip('/')
destinations = {}
for base, suffix, dest in rules:
prefix = os.path.join(resources_root, base)
for abs_base in iglob(prefix):
abs_glob = os.path.join(abs_base, suffix)
for abs_path in iglob(abs_glob):
resource_file = get_rel_path(resources_root, abs_path)
if dest is None: # remove the entry if it was here
destinations.pop(resource_file, None)
else:
rel_path = get_rel_path(abs_base, abs_path)
rel_dest = dest.replace(os.path.sep, '/').rstrip('/')
destinations[resource_file] = rel_dest + '/' + rel_path
return destinations
def convert_path(pathname):
"""Return 'pathname' as a name that will work on the native filesystem.
The path is split on '/' and put back together again using the current
directory separator. Needed because filenames in the setup script are
always supplied in Unix style, and have to be converted to the local
convention before we can actually use them in the filesystem. Raises
ValueError on non-Unix-ish systems if 'pathname' either starts or
ends with a slash.
"""
if os.sep == '/':
return pathname
if not pathname:
return pathname
if pathname[0] == '/':
raise ValueError("path '%s' cannot be absolute" % pathname)
if pathname[-1] == '/':
raise ValueError("path '%s' cannot end with '/'" % pathname)
paths = pathname.split('/')
while os.curdir in paths:
paths.remove(os.curdir)
if not paths:
return os.curdir
return os.path.join(*paths)
def _find(self, path):
path = path[self.prefix_len:]
if path in self._files:
result = True
else:
if path and path[-1] != os.sep:
path = path + os.sep
i = bisect.bisect(self.index, path)
try:
result = self.index[i].startswith(path)
except IndexError:
result = False
if not result:
logger.debug('_find failed: %r %r', path, self.loader.prefix)
else:
logger.debug('_find worked: %r %r', path, self.loader.prefix)
return result
def build(cls, path):
"""
Build a dictionary similar to the zipimport directory
caches, except instead of tuples, store ZipInfo objects.
Use a platform-specific path separator (os.sep) for the path keys
for compatibility with pypy on Windows.
"""
with ContextualZipFile(path) as zfile:
items = (
(
name.replace('/', os.sep),
zfile.getinfo(name),
)
for name in zfile.namelist()
)
return dict(items)
def _index(self):
try:
return self._dirindex
except AttributeError:
ind = {}
for path in self.zipinfo:
parts = path.split(os.sep)
while parts:
parent = os.sep.join(parts[:-1])
if parent in ind:
ind[parent].append(parts[-1])
break
else:
ind[parent] = [parts.pop()]
self._dirindex = ind
return ind
def build(cls, path):
"""
Build a dictionary similar to the zipimport directory
caches, except instead of tuples, store ZipInfo objects.
Use a platform-specific path separator (os.sep) for the path keys
for compatibility with pypy on Windows.
"""
with ContextualZipFile(path) as zfile:
items = (
(
name.replace('/', os.sep),
zfile.getinfo(name),
)
for name in zfile.namelist()
)
return dict(items)
def _index(self):
try:
return self._dirindex
except AttributeError:
ind = {}
for path in self.zipinfo:
parts = path.split(os.sep)
while parts:
parent = os.sep.join(parts[:-1])
if parent in ind:
ind[parent].append(parts[-1])
break
else:
ind[parent] = [parts.pop()]
self._dirindex = ind
return ind
def _resolve_setup_path(egg_base, install_dir, egg_path):
"""
Generate a path from egg_base back to '.' where the
setup script resides and ensure that path points to the
setup path from $install_dir/$egg_path.
"""
path_to_setup = egg_base.replace(os.sep, '/').rstrip('/')
if path_to_setup != os.curdir:
path_to_setup = '../' * (path_to_setup.count('/') + 1)
resolved = normalize_path(
os.path.join(install_dir, egg_path, path_to_setup)
)
if resolved != normalize_path(os.curdir):
raise DistutilsOptionError(
"Can't get a consistent path to setup script from"
" installation directory", resolved, normalize_path(os.curdir))
return path_to_setup
def get_problem_root(problem_name, absolute=False):
"""
Installation location for a given problem.
Args:
problem_name: the problem name.
absolute: should return an absolute path.
Returns:
The tentative installation location.
"""
problem_root = join(PROBLEM_ROOT, sanitize_name(problem_name))
assert problem_root.startswith(sep)
if absolute:
return problem_root
return problem_root[len(sep):]
def get_bundle_root(bundle_name, absolute=False):
"""
Installation location for a given bundle.
Args:
bundle_name: the bundle name.
absolute: should return an absolute path.
Returns:
The tentative installation location.
"""
bundle_root = join(BUNDLE_ROOT, sanitize_name(bundle_name))
assert bundle_root.startswith(sep)
if absolute:
return bundle_root
return bundle_root[len(sep):]
def _metadata_unit(unit):
"""Given the name of a unit (e.g. apache2/0), get the unit charm's
metadata.yaml. Very similar to metadata() but allows us to inspect
other units. Unit needs to be co-located, such as a subordinate or
principal/primary.
:returns: metadata.yaml as a python object.
"""
basedir = os.sep.join(charm_dir().split(os.sep)[:-2])
unitdir = 'unit-{}'.format(unit.replace(os.sep, '-'))
joineddir = os.path.join(basedir, unitdir, 'charm', 'metadata.yaml')
if not os.path.exists(joineddir):
return None
with open(joineddir) as md:
return yaml.safe_load(md)
def _metadata_unit(unit):
"""Given the name of a unit (e.g. apache2/0), get the unit charm's
metadata.yaml. Very similar to metadata() but allows us to inspect
other units. Unit needs to be co-located, such as a subordinate or
principal/primary.
:returns: metadata.yaml as a python object.
"""
basedir = os.sep.join(charm_dir().split(os.sep)[:-2])
unitdir = 'unit-{}'.format(unit.replace(os.sep, '-'))
joineddir = os.path.join(basedir, unitdir, 'charm', 'metadata.yaml')
if not os.path.exists(joineddir):
return None
with open(joineddir) as md:
return yaml.safe_load(md)
def setUp(self):
path = "temp"
plugin_name = "plugin_test"
self.helper = SQLitePluginPathHelper(path, plugin_name, "db")
plugin_file = plugin_name + ".py"
self.formatter = (path + os.sep + "plaso" + os.sep + "formatters" + os.sep
+ plugin_file)
self.formatter_test = (path + os.sep + "tests" + os.sep + "formatters" +
os.sep + plugin_file)
self.parser = (path + os.sep + "plaso" + os.sep + "parsers" + os.sep +
"sqlite_plugins" + os.sep + plugin_file)
self.parser_test = (path + os.sep + "tests" + os.sep + "parsers" + os.sep +
"sqlite_plugins" + os.sep + plugin_file)
self.database = path + os.sep + "test_data" + os.sep + plugin_name + ".db"
self.parser_init = (path + os.sep + "plaso" + os.sep + "parsers" + os.sep +
"sqlite_plugins" + os.sep + "__init__.py")
self.formatter_init = (path + os.sep + "plaso" + os.sep + "formatters" +
os.sep + "__init__.py")
def byte_compile(self, files):
if sys.dont_write_bytecode:
self.warn('byte-compiling is disabled, skipping.')
return
from distutils.util import byte_compile
prefix = self.build_lib
if prefix[-1] != os.sep:
prefix = prefix + os.sep
# XXX this code is essentially the same as the 'byte_compile()
# method of the "install_lib" command, except for the determination
# of the 'prefix' string. Hmmm.
if self.compile:
byte_compile(files, optimize=0,
force=self.force, prefix=prefix, dry_run=self.dry_run)
if self.optimize > 0:
byte_compile(files, optimize=self.optimize,
force=self.force, prefix=prefix, dry_run=self.dry_run)
def done(title, dest, downloaded):
playing = xbmc.Player().isPlaying()
text = xbmcgui.Window(10000).getProperty('GEN-DOWNLOADED')
if len(text) > 0:
text += '[CR]'
if downloaded:
text += '%s : %s' % (dest.rsplit(os.sep)[-1], '[COLOR forestgreen]Download succeeded[/COLOR]')
else:
text += '%s : %s' % (dest.rsplit(os.sep)[-1], '[COLOR red]Download failed[/COLOR]')
xbmcgui.Window(10000).setProperty('GEN-DOWNLOADED', text)
if (not downloaded) or (not playing):
xbmcgui.Dialog().ok(title, text)
xbmcgui.Window(10000).clearProperty('GEN-DOWNLOADED')
def extract_features(dir_audio, dir_feat):
dir_audio = utils.abs_path_dir(dir_audio)
dir_feat = utils.abs_path_dir(dir_feat)
filelist = []
for elem in os.listdir(dir_audio):
if os.path.isfile(dir_audio + elem):
filelist.append(dir_audio + elem)
else:
for filename in os.listdir(dir_audio + elem):
if "ld.wav" in filename:
filelist.append(dir_audio + elem + "/" + filename)
# marsyas(dir_feat, filelist)
for index, filen in enumerate(filelist):
utils.print_progress_start(str(index+1) + "/" + str(len(filelist)) + " " + filen.split(os.sep)[-1])
utils.yaafe(filen)
essentia(dir_feat, filen)
utils.print_progress_end()
def __process(l, inFile, stateDir):
if l.startswith("="):
return bytes.fromhex(l[1:])
elif l.startswith("<"):
with open(l[1:], "rb") as f:
return f.read()
elif l.startswith("{"):
import hashlib
return __processBlock(hashlib.new(l[1:]), inFile, stateDir)
elif l.startswith("#"):
import os.path
if stateDir:
stateFile = os.path.join(stateDir, l[1:].replace(os.sep, "_"))
else:
stateFile = None
return hashPath(l[1:], stateFile)
elif l.startswith("g"):
from .scm.git import GitScm
return bytes.fromhex(GitScm.processLiveBuildIdSpec(l[1:]))
else:
print("Malformed spec:", l, file=sys.stderr)
sys.exit(1)
def __mmap_ncs_packet_headers(self, filename):
"""
Memory map of the Neuralynx .ncs file optimized for extraction of
data packet headers
Reading standard dtype improves speed, but timestamps need to be
reconstructed
"""
filesize = getsize(self.sessiondir + sep + filename) # in byte
if filesize > 16384:
data = np.memmap(self.sessiondir + sep + filename,
dtype='<u4',
shape=((filesize - 16384) / 4 / 261, 261),
mode='r', offset=16384)
ts = data[:, 0:2]
multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
axis=0)
timestamps = np.sum(ts * multi, axis=1)
# timestamps = data[:,0] + (data[:,1] *2**32)
header_u4 = data[:, 2:5]
return timestamps, header_u4
else:
return None
def __mmap_nev_file(self, filename):
""" Memory map the Neuralynx .nev file """
nev_dtype = np.dtype([
('reserved', '<i2'),
('system_id', '<i2'),
('data_size', '<i2'),
('timestamp', '<u8'),
('event_id', '<i2'),
('ttl_input', '<i2'),
('crc_check', '<i2'),
('dummy1', '<i2'),
('dummy2', '<i2'),
('extra', '<i4', (8,)),
('event_string', 'a128'),
])
if getsize(self.sessiondir + sep + filename) > 16384:
return np.memmap(self.sessiondir + sep + filename,
dtype=nev_dtype, mode='r', offset=16384)
else:
return None
def __mmap_ncs_packet_headers(self, filename):
"""
Memory map of the Neuralynx .ncs file optimized for extraction of
data packet headers
Reading standard dtype improves speed, but timestamps need to be
reconstructed
"""
filesize = getsize(self.sessiondir + sep + filename) # in byte
if filesize > 16384:
data = np.memmap(self.sessiondir + sep + filename,
dtype='<u4',
shape=((filesize - 16384) / 4 / 261, 261),
mode='r', offset=16384)
ts = data[:, 0:2]
multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
axis=0)
timestamps = np.sum(ts * multi, axis=1)
# timestamps = data[:,0] + (data[:,1] *2**32)
header_u4 = data[:, 2:5]
return timestamps, header_u4
else:
return None
def __mmap_ncs_packet_timestamps(self, filename):
"""
Memory map of the Neuralynx .ncs file optimized for extraction of
data packet headers
Reading standard dtype improves speed, but timestamps need to be
reconstructed
"""
filesize = getsize(self.sessiondir + sep + filename) # in byte
if filesize > 16384:
data = np.memmap(self.sessiondir + sep + filename,
dtype='<u4',
shape=(int((filesize - 16384) / 4 / 261), 261),
mode='r', offset=16384)
ts = data[:, 0:2]
multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
axis=0)
timestamps = np.sum(ts * multi, axis=1)
# timestamps = data[:,0] + data[:,1]*2**32
return timestamps
else:
return None
def __mmap_nev_file(self, filename):
""" Memory map the Neuralynx .nev file """
nev_dtype = np.dtype([
('reserved', '<i2'),
('system_id', '<i2'),
('data_size', '<i2'),
('timestamp', '<u8'),
('event_id', '<i2'),
('ttl_input', '<i2'),
('crc_check', '<i2'),
('dummy1', '<i2'),
('dummy2', '<i2'),
('extra', '<i4', (8,)),
('event_string', 'a128'),
])
if getsize(self.sessiondir + sep + filename) > 16384:
return np.memmap(self.sessiondir + sep + filename,
dtype=nev_dtype, mode='r', offset=16384)
else:
return None
def _index(self):
try:
return self._dirindex
except AttributeError:
ind = {}
for path in self.zipinfo:
parts = path.split(os.sep)
while parts:
parent = os.sep.join(parts[:-1])
if parent in ind:
ind[parent].append(parts[-1])
break
else:
ind[parent] = [parts.pop()]
self._dirindex = ind
return ind
def _index(self):
try:
return self._dirindex
except AttributeError:
ind = {}
for path in self.zipinfo:
parts = path.split(os.sep)
while parts:
parent = os.sep.join(parts[:-1])
if parent in ind:
ind[parent].append(parts[-1])
break
else:
ind[parent] = [parts.pop()]
self._dirindex = ind
return ind
def _get_records(self):
"""
Get the list of installed files for the distribution
:return: A list of tuples of path, hash and size. Note that hash and
size might be ``None`` for some entries. The path is exactly
as stored in the file (which is as in PEP 376).
"""
results = []
r = self.get_distinfo_resource('RECORD')
with contextlib.closing(r.as_stream()) as stream:
with CSVReader(stream=stream) as record_reader:
# Base location is parent dir of .dist-info dir
#base_location = os.path.dirname(self.path)
#base_location = os.path.abspath(base_location)
for row in record_reader:
missing = [None for i in range(len(row), 3)]
path, checksum, size = row + missing
#if not os.path.isabs(path):
# path = path.replace('/', os.sep)
# path = os.path.join(base_location, path)
results.append((path, checksum, size))
return results
def _glob_to_re(self, pattern):
"""Translate a shell-like glob pattern to a regular expression.
Return a string containing the regex. Differs from
'fnmatch.translate()' in that '*' does not match "special characters"
(which are platform-specific).
"""
pattern_re = fnmatch.translate(pattern)
# '?' and '*' in the glob pattern become '.' and '.*' in the RE, which
# IMHO is wrong -- '?' and '*' aren't supposed to match slash in Unix,
# and by extension they shouldn't match such "special characters" under
# any OS. So change all non-escaped dots in the RE to match any
# character except the special characters (currently: just os.sep).
sep = os.sep
if os.sep == '\\':
# we're using a regex to manipulate a regex, so we need
# to escape the backslash twice
sep = r'\\\\'
escaped = r'\1[^%s]' % sep
pattern_re = re.sub(r'((?<!\\)(\\\\)*)\.', escaped, pattern_re)
return pattern_re
def get_resources_dests(resources_root, rules):
"""Find destinations for resources files"""
def get_rel_path(base, path):
# normalizes and returns a lstripped-/-separated path
base = base.replace(os.path.sep, '/')
path = path.replace(os.path.sep, '/')
assert path.startswith(base)
return path[len(base):].lstrip('/')
destinations = {}
for base, suffix, dest in rules:
prefix = os.path.join(resources_root, base)
for abs_base in iglob(prefix):
abs_glob = os.path.join(abs_base, suffix)
for abs_path in iglob(abs_glob):
resource_file = get_rel_path(resources_root, abs_path)
if dest is None: # remove the entry if it was here
destinations.pop(resource_file, None)
else:
rel_path = get_rel_path(abs_base, abs_path)
rel_dest = dest.replace(os.path.sep, '/').rstrip('/')
destinations[resource_file] = rel_dest + '/' + rel_path
return destinations
def convert_path(pathname):
"""Return 'pathname' as a name that will work on the native filesystem.
The path is split on '/' and put back together again using the current
directory separator. Needed because filenames in the setup script are
always supplied in Unix style, and have to be converted to the local
convention before we can actually use them in the filesystem. Raises
ValueError on non-Unix-ish systems if 'pathname' either starts or
ends with a slash.
"""
if os.sep == '/':
return pathname
if not pathname:
return pathname
if pathname[0] == '/':
raise ValueError("path '%s' cannot be absolute" % pathname)
if pathname[-1] == '/':
raise ValueError("path '%s' cannot end with '/'" % pathname)
paths = pathname.split('/')
while os.curdir in paths:
paths.remove(os.curdir)
if not paths:
return os.curdir
return os.path.join(*paths)
def _find(self, path):
path = path[self.prefix_len:]
if path in self._files:
result = True
else:
if path and path[-1] != os.sep:
path = path + os.sep
i = bisect.bisect(self.index, path)
try:
result = self.index[i].startswith(path)
except IndexError:
result = False
if not result:
logger.debug('_find failed: %r %r', path, self.loader.prefix)
else:
logger.debug('_find worked: %r %r', path, self.loader.prefix)
return result
def do_install_data(self):
# Hack for packages that install data to install's --install-lib
self.get_finalized_command('install').install_lib = self.bdist_dir
site_packages = os.path.normcase(os.path.realpath(_get_purelib()))
old, self.distribution.data_files = self.distribution.data_files,[]
for item in old:
if isinstance(item,tuple) and len(item)==2:
if os.path.isabs(item[0]):
realpath = os.path.realpath(item[0])
normalized = os.path.normcase(realpath)
if normalized==site_packages or normalized.startswith(
site_packages+os.sep
):
item = realpath[len(site_packages)+1:], item[1]
# XXX else: raise ???
self.distribution.data_files.append(item)
try:
log.info("installing package data to %s" % self.bdist_dir)
self.call_command('install_data', force=0, root=None)
finally:
self.distribution.data_files = old
def write_manifest(self):
"""Write the file list in 'self.filelist' (presumably as filled in
by 'add_defaults()' and 'read_template()') to the manifest file
named by 'self.manifest'.
"""
# The manifest must be UTF-8 encodable. See #303.
if sys.version_info >= (3,):
files = []
for file in self.filelist.files:
try:
file.encode("utf-8")
except UnicodeEncodeError:
log.warn("'%s' not UTF-8 encodable -- skipping" % file)
else:
files.append(file)
self.filelist.files = files
files = self.filelist.files
if os.sep!='/':
files = [f.replace(os.sep,'/') for f in files]
self.execute(write_file, (self.manifest, files),
"writing manifest file '%s'" % self.manifest)