Python shutil 模块,copy2() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用shutil.copy2()。
def copy_results(datanames, result_dir, output_dir, verbose):
''' This function copies all the [dataname.predict] results from result_dir to output_dir'''
for basename in datanames:
try:
test_files = ls(result_dir + "/" + basename + "*_test*.predict")
if len(test_files)==0:
vprint(verbose, "[-] Missing 'test' result files for " + basename)
return 0
for f in test_files: copy2(f, output_dir)
valid_files = ls(result_dir + "/" + basename + "*_valid*.predict")
if len(valid_files)==0:
vprint(verbose, "[-] Missing 'valid' result files for " + basename)
return 0
for f in valid_files: copy2(f, output_dir)
vprint( verbose, "[+] " + basename.capitalize() + " copied")
except:
vprint(verbose, "[-] Missing result files")
return 0
return 1
# ================ Display directory structure and code version (for debug purposes) =================
def install_user_service(service_file, socket_file):
"""
Installs the service file and socket file into the xinetd
service directory, sets the service to start on boot, and
starts the service now.
Args:
service_file: The path to the systemd service file to install
socket_file: The path to the systemd socket file to install
"""
if service_file is None:
return
service_name = os.path.basename(service_file)
logger.debug("...Installing user service '%s'.", service_name)
# copy service file
service_path = os.path.join(XINETD_SERVICE_PATH, service_name)
shutil.copy2(service_file, service_path)
execute(["service", "xinetd", "restart"], timeout=60)
def copymessage(self, n, tofolder, ton):
"""Copy one message over a specific destination message,
which may or may not already exist."""
path = self.getmessagefilename(n)
# Open it to check that it exists
f = open(path)
f.close()
del f
topath = tofolder.getmessagefilename(ton)
backuptopath = tofolder.getmessagefilename(',%d' % ton)
try:
os.rename(topath, backuptopath)
except os.error:
pass
ok = 0
try:
tofolder.setlast(None)
shutil.copy2(path, topath)
ok = 1
finally:
if not ok:
try:
os.unlink(topath)
except os.error:
pass
def copytree(src, dst, symlinks = False, ignore = None):
if not os.path.exists(dst):
os.makedirs(dst)
shutil.copystat(src, dst)
lst = os.listdir(src)
if ignore:
excl = ignore(src, lst)
lst = [x for x in lst if x not in excl]
for item in lst:
s = os.path.join(src, item)
d = os.path.join(dst, item)
if symlinks and os.path.islink(s):
if os.path.lexists(d):
os.remove(d)
os.symlink(os.readlink(s), d)
try:
st = os.lstat(s)
mode = stat.S_IMODE(st.st_mode)
os.lchmod(d, mode)
except:
pass # lchmod not available
elif os.path.isdir(s):
copytree(s, d, symlinks, ignore)
else:
shutil.copy2(s, d)
def install():
status_set('maintenance', 'Executing pre-install')
execd_preinstall()
configure_installation_source(config('openstack-origin'))
status_set('maintenance', 'Installing apt packages')
apt_update()
apt_install(determine_packages(), fatal=True)
_files = os.path.join(charm_dir(), 'files')
if os.path.isdir(_files):
for f in os.listdir(_files):
f = os.path.join(_files, f)
log('Installing {} to /usr/bin'.format(f))
shutil.copy2(f, '/usr/bin')
for port in API_PORTS.values():
open_port(port)
def copy_nrpe_checks():
"""
Copy the nrpe checks into place
"""
NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
'charmhelpers', 'contrib', 'openstack',
'files')
if not os.path.exists(NAGIOS_PLUGINS):
os.makedirs(NAGIOS_PLUGINS)
for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
if os.path.isfile(fname):
shutil.copy2(fname,
os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
def copy_nrpe_checks():
"""
Copy the nrpe checks into place
"""
NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
'charmhelpers', 'contrib', 'openstack',
'files')
if not os.path.exists(NAGIOS_PLUGINS):
os.makedirs(NAGIOS_PLUGINS)
for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
if os.path.isfile(fname):
shutil.copy2(fname,
os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
def copy_nrpe_checks():
"""
Copy the nrpe checks into place
"""
NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
'charmhelpers', 'contrib', 'openstack',
'files')
if not os.path.exists(NAGIOS_PLUGINS):
os.makedirs(NAGIOS_PLUGINS)
for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
if os.path.isfile(fname):
shutil.copy2(fname,
os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
def copy_nrpe_checks():
"""
Copy the nrpe checks into place
"""
NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
'charmhelpers', 'contrib', 'openstack',
'files')
if not os.path.exists(NAGIOS_PLUGINS):
os.makedirs(NAGIOS_PLUGINS)
for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
if os.path.isfile(fname):
shutil.copy2(fname,
os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
def copy_nrpe_checks():
"""
Copy the nrpe checks into place
"""
NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
'charmhelpers', 'contrib', 'openstack',
'files')
if not os.path.exists(NAGIOS_PLUGINS):
os.makedirs(NAGIOS_PLUGINS)
for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
if os.path.isfile(fname):
shutil.copy2(fname,
os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
def copy(args, movelist, fromdir, todir):
for p in sorted(movelist):
logging.debug("mkdir %s" % os.path.join(todir, p))
if not args.dryrun:
try:
os.makedirs(os.path.join(todir, p), exist_ok=True)
except FileExistsError:
pass
logging.debug("copy from '%s' to '%s':" % (os.path.join(fromdir, p), os.path.join(todir, p)))
for f in sorted(movelist[p]):
if os.path.exists(os.path.join(fromdir, p, f)):
logging.debug("%s" % (f))
if not args.dryrun:
shutil.copy2(os.path.join(fromdir, p, f), os.path.join(todir, p, f))
else:
logging.error("%s can't be copied as it doesn't exist" % (f))
def copy(self, target, nameIsLeaf=False):
"""
same as rename - except for copying. returns the new target name
"""
if self.isfile():
target = Path(target)
if nameIsLeaf:
target = self.up() / target
if self == target:
return target
targetDirpath = target.up()
if not targetDirpath.exists():
targetDirpath.create()
shutil.copy2(str(self), str(target))
return target
elif self.isdir():
shutil.copytree(str(self), str(target))
def csvwrite(_imagefile, _feature_data, write_dir):
print("Writing FEATURE.CSV file...")
feature_file = os.path.splitext(_imagefile)[0]
feature_file = feature_file.replace("IR", "Features")
name = feature_file + '.csv';
with open(name, 'w') as csvfile:
fieldnames = ['mean_value', 'euler_number', 'major_axis', 'area', 'solidity', 'std', 'eccentricity',
'eq_diameter', 'minor_axis']
fieldnames.extend(getHistFeatureKeys())
writer = csv.DictWriter(csvfile, fieldnames=fieldnames);
writer.writeheader()
for cluster in _feature_data:
data = {key:value for key, value in cluster.items() if key in fieldnames}
writer.writerow(data)
print write_dir
os.rename(name, write_dir + "\\" + "output.csv")
#copy2(outpu, _junk)
#os.rename(_junk, "output.csv")
print("FEATURE.CSV file is Written")
def copytree(src, dst, symlinks=False, ignore=None):
"""
Correctly copy an entire directory
:param src: source dir
:param dst: destination dir
:param symlinks: copy content of symlinks
:param ignore: ignore
"""
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
if os.path.isdir(s):
shutil.copytree(s, d, symlinks, ignore)
else:
shutil.copy2(s, d)
def write(self, dbfile=None):
if dbfile is None:
dbfile = self.dbfile
# Make a backup
if os.path.exists(dbfile):
shutil.copy2(dbfile, dbfile+"_bak")
# write main database file
with open(dbfile, "wb") as db:
self.header_main.tofile(db)
for entry in self.entries.values():
db.write(entry)
# Update database header
num_entries = len(self.entries)
db.seek(0)
db.write(b"\0%c%c" % (num_entries>>8, num_entries&0xFF))
def copymessage(self, n, tofolder, ton):
"""Copy one message over a specific destination message,
which may or may not already exist."""
path = self.getmessagefilename(n)
# Open it to check that it exists
f = open(path)
f.close()
del f
topath = tofolder.getmessagefilename(ton)
backuptopath = tofolder.getmessagefilename(',%d' % ton)
try:
os.rename(topath, backuptopath)
except os.error:
pass
ok = 0
try:
tofolder.setlast(None)
shutil.copy2(path, topath)
ok = 1
finally:
if not ok:
try:
os.unlink(topath)
except os.error:
pass
def copytree(src, dst, symlinks=False, ignore=None):
names = os.listdir(src)
if ignore is not None:
ignored_names = ignore(src, names)
else:
ignored_names = set()
if not os.path.isdir(dst):
os.makedirs(dst)
errors = []
for name in names:
if name in ignored_names:
continue
srcname = os.path.join(src, name)
dstname = os.path.join(dst, name)
if os.path.isdir(srcname):
copytree(srcname, dstname, symlinks, ignore)
else:
shutil.copy2(srcname, dstname)
return
# ????????????????????
def _do_tree(root_src, root_dest, tmpl_dict, tmpl_ext, tag_delim, level=0):
if level == 0:
_mkdir(root_dest)
for entry in os.scandir(root_src):
src_path = os.path.join(root_src, entry.name)
dest_path = os.path.join(root_dest,
do_text(entry.name, tmpl_dict, tag_delim))
if entry.is_dir():
_mkdir(dest_path, copy_stats_from=src_path)
_do_tree(src_path, dest_path, tmpl_dict, tmpl_ext, tag_delim,
level + 1)
elif entry.is_file():
was_tmpl = False
for ext in tmpl_ext:
ext = ext.lower()
if entry.name.lower().endswith(ext):
was_tmpl = True
dest_path = dest_path[0:-len(ext)]
do_file(src_path, dest_path, tmpl_dict, tag_delim)
break
if not was_tmpl:
shutil.copy2(src_path, dest_path, follow_symlinks=False)
def handle(self, *args, **kwargs):
export_dir = os.path.join('.', 'export-{}'.format(now().strftime('%Y-%m-%d-%H%M%S')))
os.mkdir(export_dir)
successful_exports = 0
failed_exports = []
for session in CashdeskSession.objects.filter(end__isnull=False):
report_path = session.get_report_path()
if report_path:
shutil.copy2(report_path, export_dir)
successful_exports += 1
else:
failed_exports.append(session.pk)
success_msg = 'Exported {} reports to directory {}.'.format(successful_exports, export_dir)
self.stdout.write(self.style.SUCCESS(success_msg))
if failed_exports:
warn_msg = 'Could not find reports for {} finished sessions (IDs: {}).'.format(
len(failed_exports),
failed_exports,
)
self.stdout.write(self.style.WARNING(warn_msg))
def copy_files(source_folder, target_folder):
for root, dirs, files in os.walk(source_folder):
for file in files:
next = False
#Obtenemos archivo origen a copiar
tf = os.path.join(root, file)
#Comprobamos la lista negra para no copiar
for nc in NO_COPY:
if nc.strip() in tf:
next = True
break
if next: continue
tf = tf.replace(source_folder, target_folder)
#Si el archivo existe en el destino
if os.path.exists(tf): continue
else:
try:
os.makedirs(root.replace(source_folder, target_folder))
except:
None
shutil.copy2(os.path.join(root, file), tf)
def copy_results(datanames, result_dir, output_dir, verbose):
''' This function copies all the [dataname.predict] results from result_dir to output_dir'''
for basename in datanames:
try:
test_files = ls(result_dir + "/" + basename + "*_test*.predict")
if len(test_files)==0:
vprint(verbose, "[-] Missing 'test' result files for " + basename)
return 0
for f in test_files: copy2(f, output_dir)
valid_files = ls(result_dir + "/" + basename + "*_valid*.predict")
if len(valid_files)==0:
vprint(verbose, "[-] Missing 'valid' result files for " + basename)
return 0
for f in valid_files: copy2(f, output_dir)
vprint( verbose, "[+] " + basename.capitalize() + " copied")
except:
vprint(verbose, "[-] Missing result files")
return 0
return 1
# ================ Display directory structure and code version (for debug purposes) =================
def _configure(self, context):
if not os.path.exists(self.build_path):
shutil.copytree(self.source_path, self.build_path, symlinks=True)
try:
shutil.rmtree(pj(self.build_path, 'kiwixlib', 'src', 'main'))
except FileNotFoundError:
pass
shutil.copytree(pj(self.buildEnv.install_dir, 'kiwix-lib'),
pj(self.build_path, 'kiwixlib', 'src', 'main'))
os.makedirs(
pj(self.build_path, 'app', 'src', 'main', 'assets', 'icu'),
exist_ok=True)
shutil.copy2(pj(self.buildEnv.install_dir, 'share', 'icu', '58.2',
'icudt58l.dat'),
pj(self.build_path, 'app', 'src', 'main', 'assets',
'icu', 'icudt58l.dat'))
def ExecRecursiveMirror(self, source, dest):
"""Emulation of rm -rf out && cp -af in out."""
if os.path.exists(dest):
if os.path.isdir(dest):
def _on_error(fn, path, dummy_excinfo):
# The operation failed, possibly because the file is set to
# read-only. If that's why, make it writable and try the op again.
if not os.access(path, os.W_OK):
os.chmod(path, stat.S_IWRITE)
fn(path)
shutil.rmtree(dest, onerror=_on_error)
else:
if not os.access(dest, os.W_OK):
# Attempt to make the file writable before deleting it.
os.chmod(dest, stat.S_IWRITE)
os.unlink(dest)
if os.path.isdir(source):
shutil.copytree(source, dest)
else:
shutil.copy2(source, dest)
# Try to diagnose crbug.com/741603
if not os.path.exists(dest):
raise Exception("Copying of %s to %s failed" % (source, dest))
def _CopyRuntimeImpl(target, source, verbose=True):
"""Copy |source| to |target| if it doesn't already exist or if it needs to be
updated (comparing last modified time as an approximate float match as for
some reason the values tend to differ by ~1e-07 despite being copies of the
same file... https://crbug.com/603603).
"""
if (os.path.isdir(os.path.dirname(target)) and
(not os.path.isfile(target) or
abs(os.stat(target).st_mtime - os.stat(source).st_mtime) >= 0.01)):
if verbose:
print 'Copying %s to %s...' % (source, target)
if os.path.exists(target):
# Make the file writable so that we can delete it now, and keep it
# readable.
os.chmod(target, stat.S_IWRITE | stat.S_IREAD)
os.unlink(target)
shutil.copy2(source, target)
# Make the file writable so that we can overwrite or delete it later,
# keep it readable.
os.chmod(target, stat.S_IWRITE | stat.S_IREAD)
def GetHeadersFromGN(out_dir, q):
"""Return all the header files from GN"""
tmp = None
ans, err = set(), None
try:
# Argument |dir| is needed to make sure it's on the same drive on Windows.
# dir='' means dir='.', but doesn't introduce an unneeded prefix.
tmp = tempfile.mkdtemp(dir='')
shutil.copy2(os.path.join(out_dir, 'args.gn'),
os.path.join(tmp, 'args.gn'))
# Do "gn gen" in a temp dir to prevent dirtying |out_dir|.
gn_exe = 'gn.bat' if sys.platform == 'win32' else 'gn'
subprocess.check_call([
os.path.join(DEPOT_TOOLS_DIR, gn_exe), 'gen', tmp, '--ide=json', '-q'])
gn_json = json.load(open(os.path.join(tmp, 'project.json')))
ans = ParseGNProjectJSON(gn_json, out_dir, tmp)
except Exception as e:
err = str(e)
finally:
if tmp:
shutil.rmtree(tmp)
q.put((ans, err))
def GenerateV14LayoutResource(input_filename, output_v14_filename,
output_v17_filename):
"""Convert API 17 layout resource to API 14 compatible layout resource.
It's mostly a simple replacement, s/Start/Left s/End/Right,
on the attribute names.
If the generated resource is identical to the original resource,
don't do anything. If not, write the generated resource to
output_v14_filename, and copy the original resource to output_v17_filename.
"""
dom = ParseAndReportErrors(input_filename)
is_modified = GenerateV14LayoutResourceDom(dom, input_filename)
if is_modified:
# Write the generated resource.
WriteDomToFile(dom, output_v14_filename)
# Copy the original resource.
build_utils.MakeDirectory(os.path.dirname(output_v17_filename))
shutil.copy2(input_filename, output_v17_filename)
def copy2(src, dst):
"""
Similar to shutil.copy(), but metadata (permissions etc., as mentioned in
copy_stat above) is copied as well in fact, this is just shutil.copy()
followed by copystat(). This is similar to the Unix command cp -p.
:Arguments:
src - file and metadata to be copied
dst - file/dir on which to be copied
:Return:
True/False - based on the success/failure of the operation
"""
status = False
try:
shutil.copy2(src, dst)
print_info("src {} copied to dst {} successfully along with metadata".
format(src, dst))
status = True
except Exception as e:
print_error("copying file {} with metadata to file {} raised exception"
" {}".format(src, dst, str(e)))
return status
def move(src, dst):
"""
Recursively move a file or directory (src) to another location (dst).
If the destination is an existing directory, then src is moved inside that
directory. If the destination already exists but is not a directory, it may
be overwritten depending on os.rename() semantics.If the destination is on
the current filesystem, then os.rename() is used. Otherwise, src is copied
(using shutil.copy2()) to dst and then removed.
:Arguments:
src - source file to be moved
dst - target file/directory on which to be moved
:Return:
True/False - based on the success/failure of the operation
"""
status = False
try:
shutil.move(src, dst)
print_info("move of src {} to dst {} successful".format(src, dst))
status = True
except Exception as e:
print_error("moving file {} to file {} raised exception {}".
format(src, dst, str(e)))
return status
def copy_results(datanames, result_dir, output_dir, verbose):
''' This function copies all the [dataname.predict] results from result_dir to output_dir'''
for basename in datanames:
try:
test_files = ls(result_dir + "/" + basename + "*_test*.predict")
if len(test_files)==0:
vprint(verbose, "[-] Missing 'test' result files for " + basename)
return 0
for f in test_files: copy2(f, output_dir)
valid_files = ls(result_dir + "/" + basename + "*_valid*.predict")
if len(valid_files)==0:
vprint(verbose, "[-] Missing 'valid' result files for " + basename)
return 0
for f in valid_files: copy2(f, output_dir)
vprint( verbose, "[+] " + basename.capitalize() + " copied")
except:
vprint(verbose, "[-] Missing result files")
return 0
return 1
# ================ Display directory structure and code version (for debug purposes) =================
def copy_nrpe_checks():
"""
Copy the nrpe checks into place
"""
NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
'charmhelpers', 'contrib', 'openstack',
'files')
if not os.path.exists(NAGIOS_PLUGINS):
os.makedirs(NAGIOS_PLUGINS)
for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
if os.path.isfile(fname):
shutil.copy2(fname,
os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
def uploadfile(srcfile, dstfolder):
'''
Copy a file to the upload folder
srcfile: Full source filename including path
'''
uploadfile = os.path.join(dstfolder, os.path.basename(srcfile))
folderinit(dstfolder, 'Upload file destination folder')
try:
shutil.copy2(srcfile, uploadfile)
except Exception as e:
logger.error('File "{}" could not be copied to upload folder "{}".\n\tException Message: {}'.format(srcfile, dstfolder))
return
def copy_cwl_files(from_dir=CWL_PATH):
"""Copy cwl files to a directory where the cwl-runner can find them.
cwl files are copied to $XDG_DATA_HOME/commonwl/ This is one of the default
locations where the cwl-runner looks for cwl files.
Args:
from_dir (str): Path to directory where to copy files from (default:
the cwl directory of nlppln).
"""
cwl_data_dir = os.environ.get('XDG_DATA_HOME')
if not cwl_data_dir:
cwl_data_dir = DEFAULT_DATA_DIR
cwl_data_dir = os.path.join(cwl_data_dir, CWL_DATA_DIR_PREFIX)
create_dirs(cwl_data_dir)
cwl_files = glob.glob('{}{}*.cwl'.format(from_dir, os.sep))
for fi in cwl_files:
fo = os.path.join(cwl_data_dir, os.path.basename(fi))
shutil.copy2(fi, fo)
def rename_file(entry, pdf, bib, dry_run):
if entry['author']:
authors = entry['author'].split(',')
if len(authors) <= 3:
author = ', '.join(authors[:-1])
else:
author = authors[0] + ' et al.'
if author and 'year' in entry and 'title' in entry:
newname = author + ' - ' + '{}'.format(entry['year']) + ' - ' + algo.tex_to_unicode(algo.title_case(entry['title'])).replace("/", " ") + '.pdf'
if os.path.exists(pdf):
shutil.copy2(pdf, os.path.expanduser("~") + papers_path + newname)
entry.set_tag('file', ':' + pdf + ':PDF' )
if not dry_run:
shutil.move(pdf, os.path.expanduser("~") + '/.local/share/Trash/files/')
return True
return False
def copy_nrpe_checks():
"""
Copy the nrpe checks into place
"""
NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
'charmhelpers', 'contrib', 'openstack',
'files')
if not os.path.exists(NAGIOS_PLUGINS):
os.makedirs(NAGIOS_PLUGINS)
for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
if os.path.isfile(fname):
shutil.copy2(fname,
os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
def copy_files(src, dst, symlinks=False, ignore=None):
"""Copy files from src to dst."""
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
if os.path.isdir(s):
shutil.copytree(s, d, symlinks, ignore)
else:
shutil.copy2(s, d)
def gen_setup(self, filename, fragment, tmpdir):
match = EGG_FRAGMENT.match(fragment)
dists = match and [
d for d in
interpret_distro_name(filename, match.group(1), None) if d.version
] or []
if len(dists) == 1: # unambiguous ``#egg`` fragment
basename = os.path.basename(filename)
# Make sure the file has been downloaded to the temp dir.
if os.path.dirname(filename) != tmpdir:
dst = os.path.join(tmpdir, basename)
from setuptools.command.easy_install import samefile
if not samefile(filename, dst):
shutil.copy2(filename, dst)
filename = dst
with open(os.path.join(tmpdir, 'setup.py'), 'w') as file:
file.write(
"from setuptools import setup\n"
"setup(name=%r, version=%r, py_modules=[%r])\n"
% (
dists[0].project_name, dists[0].version,
os.path.splitext(basename)[0]
)
)
return filename
elif match:
raise DistutilsError(
"Can't unambiguously interpret project/version identifier %r; "
"any dashes in the name or version should be escaped using "
"underscores. %r" % (fragment, dists)
)
else:
raise DistutilsError(
"Can't process plain .py files without an '#egg=name-version'"
" suffix to enable automatic setup script generation."
)
def gen_setup(self, filename, fragment, tmpdir):
match = EGG_FRAGMENT.match(fragment)
dists = match and [
d for d in
interpret_distro_name(filename, match.group(1), None) if d.version
] or []
if len(dists) == 1: # unambiguous ``#egg`` fragment
basename = os.path.basename(filename)
# Make sure the file has been downloaded to the temp dir.
if os.path.dirname(filename) != tmpdir:
dst = os.path.join(tmpdir, basename)
from setuptools.command.easy_install import samefile
if not samefile(filename, dst):
shutil.copy2(filename, dst)
filename = dst
with open(os.path.join(tmpdir, 'setup.py'), 'w') as file:
file.write(
"from setuptools import setup\n"
"setup(name=%r, version=%r, py_modules=[%r])\n"
% (
dists[0].project_name, dists[0].version,
os.path.splitext(basename)[0]
)
)
return filename
elif match:
raise DistutilsError(
"Can't unambiguously interpret project/version identifier %r; "
"any dashes in the name or version should be escaped using "
"underscores. %r" % (fragment, dists)
)
else:
raise DistutilsError(
"Can't process plain .py files without an '#egg=name-version'"
" suffix to enable automatic setup script generation."
)
def local_update(repo_path, deb_paths=[]):
"""
Updates a local deb repository by copying debs and running scanpackages.
Args:
repo_path: the path to the local repository.
dep_paths: list of problem deb paths to copy.
"""
if not exists(repo_path):
logger.info("Creating repository at '%s'.", repo_path)
makedirs(repo_path)
elif not isdir(repo_path):
logger.error("Repository '%s' is not a directory!", repo_path)
raise FatalException
[copy2(deb_path, repo_path) for deb_path in deb_paths]
shell = spur.LocalShell()
result = shell.run(["dpkg-scanpackages", ".", "/dev/null"], cwd=repo_path)
packages_path = join(repo_path, "Packages.gz")
with gzip.open(packages_path, "wb") as packages:
packages.write(result.output)
logger.info("Repository '%s' updated successfully. Copied %d packages.", repo_path, len(deb_paths))
def deploy_files(staging_directory, instance_directory, file_list, username, problem_class):
"""
Copies the list of files from the staging directory to the instance directory.
Will properly set permissions and setgid files based on their type.
"""
# get uid and gid for default and problem user
user = getpwnam(username)
default = getpwnam(deploy_config.default_user)
for f in file_list:
# copy the file over, making the directories as needed
output_path = join(instance_directory, f.path)
if not os.path.isdir(os.path.dirname(output_path)):
os.makedirs(os.path.dirname(output_path))
if not isinstance(f, Directory):
if isinstance(f, PreTemplatedFile):
file_source = join(staging_directory, "__pre_templated", f.path)
else:
file_source = join(staging_directory, f.path)
shutil.copy2(file_source, output_path)
# set the ownership based on the type of file
if isinstance(f, ProtectedFile) or isinstance(f, ExecutableFile):
os.chown(output_path, default.pw_uid, user.pw_gid)
else:
uid = default.pw_uid if f.user is None else getpwnam(f.user).pw_uid
gid = default.pw_gid if f.group is None else getgrnam(f.group).gr_gid
os.chown(output_path, uid, gid)
# set the permissions appropriately
os.chmod(output_path, f.permissions)
if issubclass(problem_class, Service):
os.chown(instance_directory, default.pw_uid, user.pw_gid)
os.chmod(instance_directory, 0o750)
def copy_files(src, dst, symlinks=False, ignore=None):
"""Copy files from src to dst."""
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
if os.path.isdir(s):
shutil.copytree(s, d, symlinks, ignore)
else:
shutil.copy2(s, d)
def copy_nrpe_checks():
"""
Copy the nrpe checks into place
"""
NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins'
nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks',
'charmhelpers', 'contrib', 'openstack',
'files')
if not os.path.exists(NAGIOS_PLUGINS):
os.makedirs(NAGIOS_PLUGINS)
for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")):
if os.path.isfile(fname):
shutil.copy2(fname,
os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
def copy_files(src, dst, symlinks=False, ignore=None):
"""Copy files from src to dst."""
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
if os.path.isdir(s):
shutil.copytree(s, d, symlinks, ignore)
else:
shutil.copy2(s, d)
def main():
beam_data = np.load(ARGS.data)
# Optionally load vocabulary data
vocab = None
if ARGS.vocab:
with open(ARGS.vocab) as file:
vocab = file.readlines()
vocab = [_.strip() for _ in vocab]
vocab += ["UNK", "SEQUENCE_START", "SEQUENCE_END"]
if not os.path.exists(ARGS.output_dir):
os.makedirs(ARGS.output_dir)
# Copy required files
shutil.copy2("./bin/tools/beam_search_viz/tree.css", ARGS.output_dir)
shutil.copy2("./bin/tools/beam_search_viz/tree.js", ARGS.output_dir)
for idx in range(len(beam_data["predicted_ids"])):
predicted_ids = beam_data["predicted_ids"][idx]
parent_ids = beam_data["beam_parent_ids"][idx]
scores = beam_data["scores"][idx]
graph = create_graph(
predicted_ids=predicted_ids,
parent_ids=parent_ids,
scores=scores,
vocab=vocab)
json_str = json.dumps(
json_graph.tree_data(graph, (0, 0)),
ensure_ascii=False)
html_str = HTML_TEMPLATE.substitute(DATA=json_str)
output_path = os.path.join(ARGS.output_dir, "{:06d}.html".format(idx))
with open(output_path, "w") as file:
file.write(html_str)
print(output_path)
def refilemessages(self, list, tofolder, keepsequences=0):
"""Refile one or more messages -- may raise os.error.
'tofolder' is an open folder object."""
errors = []
refiled = {}
for n in list:
ton = tofolder.getlast() + 1
path = self.getmessagefilename(n)
topath = tofolder.getmessagefilename(ton)
try:
os.rename(path, topath)
except os.error:
# Try copying
try:
shutil.copy2(path, topath)
os.unlink(path)
except (IOError, os.error), msg:
errors.append(msg)
try:
os.unlink(topath)
except os.error:
pass
continue
tofolder.setlast(ton)
refiled[n] = ton
if refiled:
if keepsequences:
tofolder._copysequences(self, refiled.items())
self.removefromallsequences(refiled.keys())
if errors:
if len(errors) == 1:
raise os.error, errors[0]
else:
raise os.error, ('multiple errors:', errors)
def movemessage(self, n, tofolder, ton):
"""Move one message over a specific destination message,
which may or may not already exist."""
path = self.getmessagefilename(n)
# Open it to check that it exists
f = open(path)
f.close()
del f
topath = tofolder.getmessagefilename(ton)
backuptopath = tofolder.getmessagefilename(',%d' % ton)
try:
os.rename(topath, backuptopath)
except os.error:
pass
try:
os.rename(path, topath)
except os.error:
# Try copying
ok = 0
try:
tofolder.setlast(None)
shutil.copy2(path, topath)
ok = 1
finally:
if not ok:
try:
os.unlink(topath)
except os.error:
pass
os.unlink(path)
self.removefromallsequences([n])
def extract_targz(tarname, filename=None, target_dir='.'):
"""filename must be a valid path in the tar"""
import tarfile
tmp_dir = '._tmp_'
if filename is None:
tarfile.TarFile.gzopen(tarname).extractall(target_dir)
else:
import shutil
tarfile.TarFile.gzopen(tarname).extractall(tmp_dir)
shutil.copy2(os.path.join(tmp_dir, filename),
os.path.join(target_dir, filename.split(os.path.sep)[-1]))
shutil.rmtree(tmp_dir)
def prepare_model_dir(self):
if self.config.load_path:
self.model_dir=self.config.load_path
else:
pth=datetime.now().strftime("%m%d_%H%M%S")+'_'+self.data_type
self.model_dir=os.path.join(self.config.model_dir,pth)
if not os.path.exists(self.model_dir):
os.mkdir(self.model_dir)
print('Model directory is ',self.model_dir)
self.save_model_dir=os.path.join(self.model_dir,'checkpoints')
if not os.path.exists(self.save_model_dir):
os.mkdir(self.save_model_dir)
self.save_model_name=os.path.join(self.save_model_dir,'Model')
param_path = os.path.join(self.model_dir, "params.json")
print("[*] MODEL dir: %s" % self.model_dir)
print("[*] PARAM path: %s" % param_path)
with open(param_path, 'w') as fp:
json.dump(self.config.__dict__, fp, indent=4, sort_keys=True)
config=self.config
if config.is_train and not config.load_path:
config.log_code_dir=os.path.join(self.model_dir,'code')
for path in [self.model_dir, config.log_code_dir]:
if not os.path.exists(path):
os.makedirs(path)
#Copy python code in directory into model_dir/code for future reference:
code_dir=os.path.dirname(os.path.realpath(sys.argv[0]))
model_files = [f for f in listdir(code_dir) if isfile(join(code_dir, f))]
for f in model_files:
if f.endswith('.py'):
shutil.copy2(f,config.log_code_dir)
def prepare_dirs_and_logger(config):
formatter = logging.Formatter("%(asctime)s:%(levelname)s::%(message)s")
logger = logging.getLogger()
for hdlr in logger.handlers:
logger.removeHandler(hdlr)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
if config.load_path:
if config.load_path.startswith(config.log_dir):
config.model_dir = config.load_path
else:
if config.load_path.startswith(config.dataset):
config.model_name = config.load_path
else:
config.model_name = "{}_{}".format(config.dataset, config.load_path)
else:
config.model_name = "{}_{}".format(config.dataset, get_time())
if not hasattr(config, 'model_dir'):
config.model_dir = os.path.join(config.log_dir, config.model_name)
config.data_path = os.path.join(config.data_dir, config.dataset)
if config.is_train:
config.log_code_dir=os.path.join(config.model_dir,'code')
for path in [config.log_dir, config.data_dir,
config.model_dir, config.log_code_dir]:
if not os.path.exists(path):
os.makedirs(path)
#Copy python code in directory into model_dir/code for future reference:
code_dir=os.path.dirname(os.path.realpath(sys.argv[0]))
model_files = [f for f in listdir(code_dir) if isfile(join(code_dir, f))]
for f in model_files:
if f.endswith('.py'):
shutil.copy2(f,config.log_code_dir)
def prepare_dirs_and_logger(config):
formatter = logging.Formatter("%(asctime)s:%(levelname)s::%(message)s")
logger = logging.getLogger()
for hdlr in logger.handlers:
logger.removeHandler(hdlr)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
if config.load_path:
if config.load_path.startswith(config.log_dir):
config.model_dir = config.load_path
else:
if config.load_path.startswith(config.dataset):
config.model_name = config.load_path
else:
config.model_name = "{}_{}".format(config.dataset, config.load_path)
else:
config.model_name = "{}_{}".format(config.dataset, get_time())
if not hasattr(config, 'model_dir'):
config.model_dir = os.path.join(config.log_dir, config.model_name)
config.data_path = os.path.join(config.data_dir, config.dataset)
if not config.load_path:
config.log_code_dir=os.path.join(config.model_dir,'code')
for path in [config.log_dir, config.data_dir,
config.model_dir, config.log_code_dir]:
if not os.path.exists(path):
os.makedirs(path)
#Copy python code in directory into model_dir/code for future reference:
code_dir=os.path.dirname(os.path.realpath(sys.argv[0]))
model_files = [f for f in listdir(code_dir) if isfile(join(code_dir, f))]
for f in model_files:
if f.endswith('.py'):
shutil.copy2(f,config.log_code_dir)