Python shutil 模块,unpack_archive() 实例源码
我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用shutil.unpack_archive()。
def wishbone_my_setup(setup_dir):
# install GSEA, diffusion components, and download data.
tools_dir = setup_dir + '/tools'
if not os.path.exists(tools_dir + '/DiffusionGeometry/'):
shutil.unpack_archive(tools_dir + '/DiffusionGeometry.zip', tools_dir +
'/DiffusionGeometry/')
if not os.path.exists(tools_dir + '/mouse/'):
shutil.unpack_archive(tools_dir + '/mouse_gene_sets.tar.gz', tools_dir)
if not os.path.exists(tools_dir + '/human/'):
shutil.unpack_archive(tools_dir + '/human_gene_sets.tar.gz', tools_dir)
if not os.path.exists( setup_dir +'/data/GSE72857_umitab.txt.gz'):
# downloads mouse UMI from GSE72857 Transcriptional heterogeneity and lineage commitment in myeloid progenitors [single cell RNA-seq]
os.system("wget -m -nH -nd -P "+ setup_dir + "/data/ ftp://ftp.ncbi.nlm.nih.gov/geo/series/GSE72nnn/GSE72857/suppl/GSE72857%5Fumitab%2Etxt%2Egz")
x=pd.read_csv(setup_dir + '/data/GSE72857_umitab.txt.gz', sep = '\t', compression="gzip")
y=pd.read_csv(setup_dir + '/data/sample_scseq_data.csv', index_col=[0])
scdata_raw= x.T.loc[y.index]
scdata_raw = wishbone.wb.SCData(scdata_raw.astype('float'), data_type='sc-seq')
return(scdata_raw)
def _restore_from_backup(self, version):
logger.debug('Restoring from backup for {}'.format(version))
for file_path in self.files_to_preserve:
try:
shutil.copy2(os.path.join(self.backup_target, file_path),
os.path.join(self.tmp_dir, file_path))
except IOError as e:
logger.warning('Copying {} failed due to {}'
.format(file_path, e))
shutil.unpack_archive(self._backup_name_ext(
version), self.backup_target, self.backup_format)
for file_path in self.files_to_preserve:
try:
shutil.copy2(os.path.join(self.tmp_dir, file_path),
os.path.join(self.backup_target, file_path))
except IOError as e:
logger.warning('Copying {} failed due to {}'
.format(file_path, e))
shutil.rmtree(self.tmp_dir, ignore_errors=True)
def execute(self, context):
name = os.path.basename(self.filepath)
#TODO: put on another file and module
def checkProjectFile(path):
if os.path.basename(path) != "project.json": return False
if not os.path.isfile(path): return False
utils.loadProjectFile(path)
if not 'bge-project' in utils.project_data: return False
return True
def getMainBlend(path):
path = os.path.dirname(path) + os.sep + "project" + os.sep + "main.blend"
if os.path.isfile(path): return path
#If we have a zip
if os.path.isfile(self.filepath):
for x in [".zip", ".tar", ".gztar", ".bztar", ".xztar"]:
if self.filepath.endswith(x):
tpath = tempfile.gettempdir() + os.sep + os.path.basename(self.filepath)[:-len(x)] + os.sep
shutil.unpack_archive(self.filepath, tpath)
self.filepath = tpath
print("Extracted to: ", tpath)
#Once we have a folder...
list=[self.filepath, self.filepath + "project.json", os.path.dirname(self.filepath) + os.sep + "project.json", os.path.dirname(self.filepath) + os.sep + "../" + "project.json"]
endpath=None
for path in list:
if checkProjectFile(path): endpath=getMainBlend(path)
if endpath==None:
self.report({'ERROR_INVALID_INPUT'}, "Error loading project, project file not found.")
return {'CANCELLED'}
bpy.ops.wm.open_mainfile(filepath=endpath)
return {'FINISHED'}
def untar(path, fname, deleteTar=True):
"""Unpacks the given archive file to the same directory, then (by default)
deletes the archive file.
"""
print('unpacking ' + fname)
fullpath = os.path.join(path, fname)
shutil.unpack_archive(fullpath, path)
if deleteTar:
os.remove(fullpath)
def extract_neo4j(database_name, archive_path):
database_directory = os.path.join(settings.POLYGLOT_DATA_DIRECTORY, database_name)
neo4j_directory = os.path.join(database_directory, 'neo4j')
if os.path.exists(neo4j_directory):
return False
shutil.unpack_archive(archive_path, database_directory)
for d in os.listdir(database_directory):
if d.startswith('neo4j'):
os.rename(os.path.join(database_directory, d), neo4j_directory)
return True
def extract_influxdb(database_name, archive_path):
database_directory = os.path.join(settings.POLYGLOT_DATA_DIRECTORY, database_name)
influxdb_directory = os.path.join(database_directory, 'influxdb')
if os.path.exists(influxdb_directory):
return False
shutil.unpack_archive(archive_path, database_directory)
for d in os.listdir(database_directory):
if d.startswith('influxdb'):
os.rename(os.path.join(database_directory, d), influxdb_directory)
return True
def zipfile_unzip(file_path=None, extract_dir=None, **filedialog_kwargs):
if file_path is None:
file_path = QtGui.QFileDialog.getOpenFileName(**filedialog_kwargs)[0]
if extract_dir is None:
extract_dir = os.path.dirname(file_path)
shutil.unpack_archive(file_path, extract_dir=extract_dir)
def unpack_archive(file, extract_dir, format_, msg):
try:
shutil.unpack_archive(file, extract_dir=extract_dir, format=format_)
except (ValueError, OSError) as err:
logging.debug(traceback.format_exc())
logging.critical(err)
logging.critical(msg)
raise SystemExit
def _get_url_unpacked_path_or_null(url):
parsed = urllib.parse.urlparse(url)
if parsed.scheme == "file" and parsed.path.endswith(".whl"):
return Path("/dev/null")
try:
cache_dir, packed_path = _get_url_impl(url)
except CalledProcessError:
return Path("/dev/null")
if packed_path.is_file(): # pip://
shutil.unpack_archive(str(packed_path), cache_dir.name)
unpacked_path, = (
path for path in Path(cache_dir.name).iterdir() if path.is_dir())
return unpacked_path
def untar(fname):
print('unpacking ' + fname)
fullpath = os.path.join(fname)
shutil.unpack_archive(fullpath)
os.remove(fullpath)
def untar(path, fname):
print('unpacking ' + fname)
fullpath = os.path.join(path, fname)
shutil.unpack_archive(fullpath, path)
os.remove(fullpath)
def untar(fname):
print('unpacking ' + fname)
fullpath = os.path.join(fname)
shutil.unpack_archive(fullpath)
os.remove(fullpath)
def untar(fname):
print('unpacking ' + fname)
fullpath = os.path.join(fname)
shutil.unpack_archive(fullpath)
os.remove(fullpath)
def untar(fname):
print('unpacking ' + fname)
fullpath = os.path.join(fname)
shutil.unpack_archive(fullpath)
os.remove(fullpath)
def maybe_download_and_extract(data_root: str, url: str) -> None:
"""
Maybe download the specified file to ``data_root`` and try to unpack it with ``shutil.unpack_archive``.
:param data_root: data root to download the files to
:param url: url to download from
"""
# make sure data_root exists
os.makedirs(data_root, exist_ok=True)
filename = os.path.basename(url)
# check whether the archive already exists
filepath = os.path.join(data_root, filename)
if os.path.exists(filepath):
logging.info('\t`%s` already exists; skipping', filepath)
return
# download with progressbar
logging.info('\tdownloading %s', filepath)
req = requests.get(url, stream=True)
expected_size = int(req.headers.get('content-length'))
chunk_size = 1024
with open(filepath, 'wb') as f_out,\
click.progressbar(req.iter_content(chunk_size=chunk_size), length=expected_size/chunk_size) as bar:
for chunk in bar:
if chunk:
f_out.write(chunk)
f_out.flush()
# extract
try:
shutil.unpack_archive(filepath, data_root)
except (shutil.ReadError, ValueError):
logging.info('File `%s` could not be extracted by `shutil.unpack_archive`. Please process it manually.',
filepath)
def execute(self, context):
import html.parser
import urllib.request
remote_platforms = []
ps = context.scene.ge_publish_settings
# create lib folder if not already available
lib_path = bpy.path.abspath(ps.lib_path)
if not os.path.exists(lib_path):
os.makedirs(lib_path)
print("Retrieving list of platforms from blender.org...", end=" ", flush=True)
class AnchorParser(html.parser.HTMLParser):
def handle_starttag(self, tag, attrs):
if tag == 'a':
for key, value in attrs:
if key == 'href' and value.startswith('blender'):
remote_platforms.append(value)
url = 'http://download.blender.org/release/Blender' + bpy.app.version_string.split()[0]
parser = AnchorParser()
data = urllib.request.urlopen(url).read()
parser.feed(str(data))
print("done", flush=True)
print("Downloading files (this will take a while depending on your internet connection speed).", flush=True)
for i in remote_platforms:
src = '/'.join((url, i))
dst = os.path.join(lib_path, i)
dst_dir = '.'.join([i for i in dst.split('.') if i not in {'zip', 'tar', 'bz2'}])
if not os.path.exists(dst) and not os.path.exists(dst.split('.')[0]):
print("Downloading " + src + "...", end=" ", flush=True)
urllib.request.urlretrieve(src, dst)
print("done", flush=True)
else:
print("Reusing existing file: " + dst, flush=True)
print("Unpacking " + dst + "...", end=" ", flush=True)
if os.path.exists(dst_dir):
shutil.rmtree(dst_dir)
shutil.unpack_archive(dst, dst_dir)
print("done", flush=True)
print("Creating platform from libs...", flush=True)
bpy.ops.scene.publish_auto_platforms()
return {'FINISHED'}
def execute(self, context):
import html.parser
import urllib.request
remote_platforms = []
ps = context.scene.ge_publish_settings
# create lib folder if not already available
lib_path = bpy.path.abspath(ps.lib_path)
if not os.path.exists(lib_path):
os.makedirs(lib_path)
print("Retrieving list of platforms from blender.org...", end=" ", flush=True)
class AnchorParser(html.parser.HTMLParser):
def handle_starttag(self, tag, attrs):
if tag == 'a':
for key, value in attrs:
if key == 'href' and value.startswith('blender'):
remote_platforms.append(value)
url = 'http://download.blender.org/release/Blender' + bpy.app.version_string.split()[0]
parser = AnchorParser()
data = urllib.request.urlopen(url).read()
parser.feed(str(data))
print("done", flush=True)
print("Downloading files (this will take a while depending on your internet connection speed).", flush=True)
for i in remote_platforms:
src = '/'.join((url, i))
dst = os.path.join(lib_path, i)
dst_dir = '.'.join([i for i in dst.split('.') if i not in {'zip', 'tar', 'bz2'}])
if not os.path.exists(dst) and not os.path.exists(dst.split('.')[0]):
print("Downloading " + src + "...", end=" ", flush=True)
urllib.request.urlretrieve(src, dst)
print("done", flush=True)
else:
print("Reusing existing file: " + dst, flush=True)
print("Unpacking " + dst + "...", end=" ", flush=True)
if os.path.exists(dst_dir):
shutil.rmtree(dst_dir)
shutil.unpack_archive(dst, dst_dir)
print("done", flush=True)
print("Creating platform from libs...", flush=True)
bpy.ops.scene.publish_auto_platforms()
return {'FINISHED'}
def fetch(source, destination, allow_file=False):
"""
Fetch a group of files either from a file archive or version control
repository.
Supported URL schemes:
- file
- ftp
- ftps
- git
- git+http
- git+https
- git+ssh
- http
- https
:param str source: The source URL to retrieve.
:param str destination: The directory into which the files should be placed.
:param bool allow_file: Whether or not to permit the file:// URL for processing local resources.
:return: The destination directory that was used.
:rtype: str
"""
source = source.strip()
if os.path.exists(destination):
raise ValueError('destination must not be an existing directory')
parsed_url = urllib.parse.urlparse(source, scheme='file')
if parsed_url.username is not None:
# if the username is not none, then the password will be a string
creds = Creds(parsed_url.username, parsed_url.password or '')
else:
creds = Creds(None, None)
parsed_url = collections.OrderedDict(zip(('scheme', 'netloc', 'path', 'params', 'query', 'fragment'), parsed_url))
parsed_url['netloc'] = parsed_url['netloc'].split('@', 1)[-1]
parsed_url['scheme'] = parsed_url['scheme'].lower()
if parsed_url['scheme'] == 'file':
if not allow_file:
raise RuntimeError('file: URLs are not allowed to be processed')
tmp_path = parsed_url['path']
if os.path.isdir(tmp_path):
shutil.copytree(tmp_path, destination, symlinks=True)
elif os.path.isfile(tmp_path):
shutil.unpack_archive(tmp_path, destination)
else:
tmp_fd, tmp_path = tempfile.mkstemp(suffix='_' + os.path.basename(parsed_url['path']))
os.close(tmp_fd)
tmp_file = open(tmp_path, 'wb')
try:
_fetch_remote(source, destination, parsed_url, creds, tmp_file, tmp_path)
if os.stat(tmp_path).st_size:
shutil.unpack_archive(tmp_path, destination)
finally:
tmp_file.close()
os.remove(tmp_path)
return