Python os 模块,join() 实例源码
我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用os.join()。
def update(user, password, lang=None):
langs = getlangs(lang)
puts(u"Updating %s" % ', '.join(langs))
for loc in langs:
with indent(2):
puts(u"Downloading PO for %s" % loc)
url = (u'https://www.transifex.com/projects/p/formhub/'
u'resource/django/l/%(lang)s/download/for_use/' % {'lang': loc})
try:
tmp_po_file = download_with_login(url, TX_LOGIN_URL,
login=user, password=password,
ext='po',
username_field='identification',
password_field='password',
form_id=1)
po_file = os.path.join(REPO_ROOT, 'locale', loc,
'LC_MESSAGES', 'django.po')
with indent(2):
puts(u"Copying downloaded file to %s" % po_file)
shutil.move(tmp_po_file, po_file)
except Exception as e:
puts(colored.red(u"Unable to update %s "
u"from Transifex: %r" % (loc, e)))
puts(colored.green("sucesssfuly retrieved %s" % loc))
compile_mo(langs)
def performDirAndFileRename(renameDict, fCreateDirs=False):
filenameAfterRename = renameDict['filename']
dirAfterRename = g.args['outputdir'] # note this may be an empty string when 'dirnamespec' was specified by user
# note that syntax for both filenamespec and dirnamespec were verified during cmd-line arg parsing
if g.args['dirnamespec']:
dirAfterRename = os.path.join(dirAfterRename, rename.performRename(g.args['dirnamespec'], renameDict))
if fCreateDirs and not os.path.exists(dirAfterRename):
applog_v("Creating directory tree \"{:s}\"".format(dirAfterRename))
os.makedirs(dirAfterRename)
renameDict['path'] = dirAfterRename # update dict with possible generated directory from above
if g.args['filenamespec']:
filenameAfterRename = rename.performRename(g.args['filenamespec'], renameDict)
if not filenameAfterRename:
applog_e("--filenamespec resulted in an empty filename. Please review your specification string")
sys.exit(ERRNO_FILENAMESPEC_RESULT_EMPTY_STR)
if '/' in filenameAfterRename or '\\' in filenameAfterRename:
applog_e("--filenamespec can not have a path or path characters in it ('/' or '\\')")
sys.exit(ERRNO_FILENAMESPEC_HAS_PATH_CHARACTERS)
dirAfterRenameAbsolute = os.path.abspath(dirAfterRename)
return (dirAfterRenameAbsolute, filenameAfterRename)
#
# performs launch of application and arguments specified in 'downloadexec' command-line option
#
def update(user, password, lang=None):
langs = getlangs(lang)
puts(u"Updating %s" % ', '.join(langs))
for loc in langs:
with indent(2):
puts(u"Downloading PO for %s" % loc)
url = (u'https://www.transifex.com/projects/p/formhub/'
u'resource/django/l/%(lang)s/download/for_use/' % {'lang': loc})
try:
tmp_po_file = download_with_login(url, TX_LOGIN_URL,
login=user, password=password,
ext='po',
username_field='identification',
password_field='password',
form_id=1)
po_file = os.path.join(REPO_ROOT, 'locale', loc,
'LC_MESSAGES', 'django.po')
with indent(2):
puts(u"Copying downloaded file to %s" % po_file)
shutil.move(tmp_po_file, po_file)
except Exception as e:
puts(colored.red(u"Unable to update %s "
u"from Transifex: %r" % (loc, e)))
puts(colored.green("sucesssfuly retrieved %s" % loc))
compile_mo(langs)
def add(lang):
langs = getlangs(lang)
puts(u"Adding %s" % ', '.join(langs))
for loc in langs:
with indent(2):
puts(u"Generating PO for %s" % loc)
shell_call(u"django-admin.py makemessages -l %(lang)s "
u"-e py,html,email,txt" % {'lang': loc})
for app in I18N_APPS:
with indent(4):
puts(u"Generating PO for app %s" % app)
with chdir(os.path.join(REPO_ROOT, app)):
shell_call(u"django-admin.py makemessages "
u"-d djangojs -l %(lang)s" % {'lang': loc})
puts(colored.green("sucesssfuly generated %s" % loc))
def compile_mo(lang=None):
langs = getlangs(lang)
puts(u"Compiling %s" % ', '.join(langs))
for loc in langs:
with indent(2):
puts(u"Compiling %s" % loc)
shell_call(u"django-admin.py compilemessages -l %(lang)s "
% {'lang': loc})
for app in I18N_APPS:
with indent(4):
puts(u"Compiling app %s" % app)
with chdir(os.path.join(REPO_ROOT, app)):
shell_call(u"django-admin.py compilemessages -l %(lang)s"
% {'lang': loc})
puts(colored.green("sucesssfuly compiled %s" % loc))
def _load_model(self):
"""The method to download the model from S3 and load to the memory.
Args:
saved (bool): wether to save the model files or just load it to the memory.
Returns:
gensim.models.doc2vec.Doc2Vec: The word-embedding model object.
"""
try:
model = Doc2Vec.load(os.path.join(LOCAL_CACHE_DIRECTORY, self.model_name))
return model
except:
files = list_files(self.s3_conn, self.s3_path)
if not self.saved:
with tempfile.TemporaryDirectory() as td:
for f in files:
filepath = os.path.join(td, f)
if not os.path.exists(filepath):
logging.warning('calling download from %s to %s', self.s3_path + f, filepath)
download(self.s3_conn, filepath, os.path.join(self.s3_path, f))
model = Doc2Vec.load(os.path.join(td, self.model_name))
else:
if not os.path.isdir(LOCAL_CACHE_DIRECTORY):
os.mkdir(LOCAL_CACHE_DIRECTORY)
for f in files:
filepath = os.path.join(LOCAL_CACHE_DIRECTORY, f)
if not os.path.exists(filepath) and self.saved:
logging.warning('calling download from %s to %s', self.s3_path + f, filepath)
download(self.s3_conn, filepath, os.path.join(self.s3_path, f))
model = Doc2Vec.load(os.path.join(LOCAL_CACHE_DIRECTORY, self.model_name))
return model
def _load_lookup(self):
"""The method to download the lookup dictionary from S3 and load to the memory.
Returns:
dict: a lookup table for mapping gensim index to soc code.
"""
try:
filepath = os.path.join(LOCAL_CACHE_DIRECTORY, self.lookup_name)
with open(filepath, 'r') as handle:
lookup = json.load(handle)
return lookup
except:
if not self.saved:
with tempfile.TemporaryDirectory() as td:
filepath = os.path.join(td, self.lookup_name)
print(filepath)
logging.warning('calling download from %s to %s', self.s3_path + self.lookup_name, filepath)
download(self.s3_conn, filepath, os.path.join(self.s3_path, self.lookup_name))
with open(filepath, 'r') as handle:
lookup = json.load(handle)
else:
filepath = os.path.join(LOCAL_CACHE_DIRECTORY, self.lookup_name)
if not os.path.exists(filepath):
logging.warning('calling download from %s to %s', self.s3_path + self.lookup_name, filepath)
download(self.s3_conn, filepath , os.join(self.s3_path, self.lookup_name))
with open(filepath, 'r') as handle:
lookup = json.load(handle)
return lookup
def establishAppEnvironment():
g.isWin32 = (platform.system() == 'Windows')
g.isOSX = (platform.system() == 'Darwin')
g.isFrozen = (getattr(sys, 'frozen', False)) # note for OSX isFrozen is always false because py2app only marks airnef.pyw as frozen when we're a py2app
#
# determine the directory our script resides in, in case the
# user is executing from a different working directory.
#
g.appDir = os.path.dirname(os.path.realpath(sys.argv[0]))
#
# determine directory for our APPDATA, which contains log
# and configuration files. For Win32 if we're frozen this
# goes in the dedicated OS area for application data files
#
g.appDataDir = None
if g.isFrozen and g.isWin32:
if os.getenv('LOCALAPPDATA'):
g.appDataDir = os.path.join(os.getenv('LOCALAPPDATA'), "airmtp\\appdata") # typically C:\Users\<username>\AppData\Local\airnef\appdata
elif g.isOSX: # for OSX we always try to store our app data under Application Support
userHomeDir = os.getenv('HOME')
if userHomeDir:
applicationSupportDir = os.path.join(userHomeDir, 'Library/Application Support')
if os.path.exists(applicationSupportDir): # probably not necessary to check existence since every system should have this directory
g.appDataDir = os.path.join(applicationSupportDir, 'airmtp/appdata')
if not g.appDataDir:
# none of runtime-specific cases above selected an app data directory - use directory based off our app directory
g.appDataDir = os.path.join(g.appDir, "appdata")
# create our app-specific subdirectories if necessary
if not os.path.exists(g.appDataDir):
os.makedirs(g.appDataDir)
#
# transltes a date or date+time string from the user into an
# epoch time (ie, time in seconds).
#
def add(lang):
langs = getlangs(lang)
puts(u"Adding %s" % ', '.join(langs))
for loc in langs:
with indent(2):
puts(u"Generating PO for %s" % loc)
shell_call(u"django-admin.py makemessages -l %(lang)s "
u"-e py,html,email,txt" % {'lang': loc})
for app in I18N_APPS:
with indent(4):
puts(u"Generating PO for app %s" % app)
with chdir(os.path.join(REPO_ROOT, app)):
shell_call(u"django-admin.py makemessages "
u"-d djangojs -l %(lang)s" % {'lang': loc})
puts(colored.green("sucesssfuly generated %s" % loc))
def compile_mo(lang=None):
langs = getlangs(lang)
puts(u"Compiling %s" % ', '.join(langs))
for loc in langs:
with indent(2):
puts(u"Compiling %s" % loc)
shell_call(u"django-admin.py compilemessages -l %(lang)s "
% {'lang': loc})
for app in I18N_APPS:
with indent(4):
puts(u"Compiling app %s" % app)
with chdir(os.path.join(REPO_ROOT, app)):
shell_call(u"django-admin.py compilemessages -l %(lang)s"
% {'lang': loc})
puts(colored.green("sucesssfuly compiled %s" % loc))
def main():
import os
import sys
from securityonion_airgap_download import compare_md5s
import subprocess
args = parse_arguments()
if not os.path.exists(args.input_file):
print 'ERROR: ' + args.input_file + ' doesn\'t exist. Exitting.'
sys.exit(1)
elif os.path.isdir(args.input_file):
#for f in os.listdir(args.input_file):
# file = os.join(args.input_file, f)
print 'ERROR: Script currently doesn\'t support crawling a directory. Exitting.'
#Maybe list dir, select newest tarball, and overwrite value of args.input_file. Then change next elseif to just if.
sys.exit(1)
elif not os.path.isdir(args.input_file):
print '\n[MAIN: Setup]'
if os.path.exists('.'.join([args.input_file, 'md5'])):
compare_md5s(os.path.dirname(os.path.abspath(args.input_file)))
decompress_tarfile(args.input_file)
base_dir = args.input_file[:-7]
print 'Base Dir: ' + base_dir
script_dir = os.path.dirname(os.path.realpath(__file__))
ip2c_script = script_dir + '/squert_ip2c_update.py'
#ip2c_cmd = script_dir + '/squert_ip2c_update.py -d ' + os.path.join(base_dir, 'RIR')
ids_script = script_dir + '/ids_offline_update.py'
#print os.path.abspath(base_dir)
if args.geoip:
print '\n[MAIN -> IDS: GeoIP]'
subprocess.call(['python', ids_script, '--geoip', '-G' + os.path.join(os.path.abspath(base_dir), 'GeoIP')])
elif args.rules:
print '\n[MAIN -> IDS: Snort Rules]'
# what about Doing blacklist?
subprocess.call(['python', ids_script, '--rules', '-R' + os.path.join(os.path.abspath(base_dir), 'Snort')])
elif args.ip2c:
print '\n[MAIN -> IP2C]'
##subprocess.call(['python', ip2c_script, '-h'])
##subprocess.call(['python', ip2c_cmd])
subprocess.call(['python', ip2c_script, '-d' + os.path.join(os.path.abspath(base_dir), 'RIR')])
##subprocess.call(['sudo', 'python', ip2c_script, '-d' + os.path.join(os.path.abspath(base_dir), 'RIR')])
else:
print '\n[MAIN -> IDS: Blacklists, GeoIP, Rules]'
subprocess.call(['python', ids_script, '-B' + os.path.join(os.path.abspath(base_dir), 'Snort', 'Blacklist'), '-G' + os.path.join(os.path.abspath(base_dir), 'GeoIP'), '-R' + os.path.join(os.path.abspath(base_dir), 'Snort')])
print '\n[MAIN -> IP2C]'
subprocess.call(['python', ip2c_script, '-d' + os.path.join(os.path.abspath(base_dir), 'RIR')])
print '\nFinished!'
def getMtpDeviceInfo():
mtpTcpCmdResult = mtpwifi.execMtpOp(g.socketPrimary, MTP_OP_GetDeviceInfo)
mtpDeviceInfo = parseMtpDeviceInfo(mtpTcpCmdResult.dataReceived)
if g.mtpDeviceInfo:
# this is a retry invocation. make sure we're talking with the same camera as before
if mtpDeviceInfo != g.mtpDeviceInfo:
applog_e("Discovered different camera during retry invocation. Orig camera was Model \"{:s}\", S/N \":{:s}\", New camera is \"{:s}\", S/N \":{:s}\"".format(\
g.mtpDeviceInfo.modelStr, g.mtpDeviceInfo.serialNumberStr, mtpDeviceInfo.modelStr, mtpDeviceInfo.serialNumberStr))
sys.exit(ERRNO_DIFFERENT_CAMREA_DURING_RETRY)
g.mtpDeviceInfo = mtpDeviceInfo
applog_d(g.mtpDeviceInfo)
#
# determine make of camera for use in any make-specific logic in our app
#
makeStrUpper = g.mtpDeviceInfo.manufacturerStr.upper()
if makeStrUpper.find("NIKON") != -1:
g.cameraMake = CAMERA_MAKE_NIKON
elif makeStrUpper.find("CANON") != -1:
g.cameraMake = CAMERA_MAKE_CANON
elif makeStrUpper.find("SONY") != -1:
g.cameraMake = CAMERA_MAKE_SONY
else:
g.cameraMake = CAMERA_MAKE_UNDETERMINED
#
# set any program options/behavior that is specific to the make of the camera
#
processCameraMakeDetermination()
#
# build path (dir + rootfilename) that will serve as the template for all metadata
# files we create and store locally. this name needs to be unique to the camera
# attached and locatable on future invocations, so we use a combination of the camera
# model and serial number
#
g.cameraLocalMetadataPathAndRootName = os.path.join(g.appDataDir, "{:s}-SN{:s}".format(g.mtpDeviceInfo.modelStr, g.mtpDeviceInfo.serialNumberStr))
applog_i("Camera Model \"{:s}\", S/N \"{:s}\"".format(g.mtpDeviceInfo.modelStr, g.mtpDeviceInfo.serialNumberStr))
#
# gets the slot index (1 or 2) from an MTP storage ID
#
def printMtpObjectDirectoryListing():
applog_i("") # newline separator for logging
fUsingRenameEngine = g.args['filenamespec'] != None or g.args['dirnamespec'] != None
if fUsingRenameEngine:
renameDict = genRenameDictKeysCommonToAllMtpObjects()
#
# scan all objects and generate listing for each file that passes the user-configured filters
#
totalBytesOfImagesInObjectsListed = 0
countFilesListed = 0
mtpObject = getNextUserFilteredMtpFileObject(-1)
while mtpObject:
if fUsingRenameEngine:
# update dict with fields that change for each file. note we're using session download count as lifetime for the preview of the renaming
updateRenameDictKeysSpecificToMtpObject(renameDict, mtpObject, countFilesListed, countFilesListed)
(dirAfterRename, filenameAfterRename) = performDirAndFileRename(renameDict, False)
dirAndFilenameAfterRename = os.path.join(dirAfterRename, filenameAfterRename)
#
# print information about this file
#
timeStr = strutil.getDateTimeStr(mtpObject.captureDateEpoch, fMilitaryTime=False)
sizeStr = "{:13,}".format(mtpObject.mtpObjectInfo.objectCompressedSize)
fullPathStr = mtpObject.genFullPathStr()
if g.countCardsUsed > 1:
# prepend slot number of file to path string
fullPathStr = "CARD{:d}\\".format(getSlotIndexFromStorageId(mtpObject.mtpObjectInfo.storageId)) + fullPathStr
if not fUsingRenameEngine:
applog_i("{:s} {:s} {:s}".format(timeStr, sizeStr, fullPathStr))
else:
applog_i("{:s} {:s} {:s} -> {:s}".format(timeStr, sizeStr, fullPathStr, dirAndFilenameAfterRename))
totalBytesOfImagesInObjectsListed += mtpObject.mtpObjectInfo.objectCompressedSize
countFilesListed += 1
# get next file that passes user-configured filters
mtpObject = getNextUserFilteredMtpFileObject(mtpObject)
#
# print listing summary
#
applog_i(" {:4d} File(s) {:13,} bytes".format(countFilesListed, totalBytesOfImagesInObjectsListed))
applog_i(" {:4d} Dir(s) {:13,} bytes free {:s}".format(MtpObject._CountMtpObjectDirectories, g.mtpStorageInfoList[0].freeSpaceBytes,\
"[CARD 1]" if g.countCardsUsed > 1 else ""))
for cardIndex in xrange(1, g.countCardsUsed):
applog_i(" {:13,} bytes free [CARD {:d}]".format(g.mtpStorageInfoList[cardIndex].freeSpaceBytes, cardIndex+1))
#
# retrieves an MTP device property from the camera. mtpPropteryCode is a
# MTP_DeviceProp_* value. If fIgnoreIfNotSupported is TRUE then
# empty data is returned if the camera reports that the property is
# not supported.
#