Python string 模块,replace() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用string.replace()。
def eval_python(self, result):
code = result.group('code')
code = string.replace(code, '\t', ' ')
try:
if self.local_dict:
result = eval(code, self.global_dict, self.local_dict)
else:
result = eval(code, self.global_dict)
return str(result)
except:
self.errorLogger('\n---- Error parsing: ----\n')
self.errorLogger(code)
self.errorLogger('\n------------------------\n')
raise
#---------------------------------------------------------------------------
# This routine is only called when OUTPUT() is included in executed Python
# code from the templates. It evaluates its parameter as if it was a
# template and appends the result to the OUTPUT_TEXT variable in the global
# dictionary.
def _extract_file_info(self, _file):
"""Extract file information."""
_file = json.loads(_file)
info = {'filename': _file['name'],
'link_raw': string.replace(_file['link'], 'dl=0', 'raw=1'),
'link': _file['link']}
if self._is_image_file(_file['name']):
extra_fields = {'url_m': info['link_raw'],
'url_b': info['link_raw'],
'title': info['filename']}
info.update(extra_fields)
if self._is_video_file(_file['name']):
url = self._create_raw_cors_link(_file['link'])
extra_fields = {'video_url': url}
info.update(extra_fields)
if self._is_audio_file(_file['name']):
url = self._create_raw_cors_link(_file['link'])
extra_fields = {'audio_url': url}
info.update(extra_fields)
if self._is_pdf_file(_file['name']):
url = self._create_raw_cors_link(_file['link'])
extra_fields = {'pdf_url': url}
info.update(extra_fields)
return {'info': info}
def test_tasks_attributes_for_image_files(self):
#For image file extensions: link, filename, url, url_m, url_b, title
image_ext = ['png', 'jpg', 'jpeg', 'gif']
file_data = 'myfile.extension'
for ext in image_ext:
data = string.replace(file_data,'extension', ext)
form_data = {
'files': [data],
'bucket': 'mybucket'
}
tasks = BulkTaskS3Import(**form_data).tasks()
assert tasks[0]['info']['filename'] == "myfile.%s" % ext
assert tasks[0]['info']['link'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['url_m'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['url_b'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['title'] == "myfile.%s" % ext
def test_tasks_attributes_for_video_files(self):
#For video file extension: link, filename, url, video_url
video_ext = ['mp4', 'm4v', 'ogg', 'ogv', 'webm', 'avi']
file_data = 'myfile.extension'
for ext in video_ext:
data = string.replace(file_data,'extension', ext)
form_data = {
'files': [data],
'bucket': 'mybucket'
}
tasks = BulkTaskS3Import(**form_data).tasks()
assert tasks[0]['info']['filename'] == "myfile.%s" % ext
assert tasks[0]['info']['link'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['video_url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
def test_tasks_attributes_for_audio_files(self):
#For audio file extension: link, filename, url, audio_url
audio_ext = ['mp4', 'm4a', 'mp3', 'ogg', 'oga', 'webm', 'wav']
file_data = 'myfile.extension'
for ext in audio_ext:
data = string.replace(file_data,'extension', ext)
form_data = {
'files': [data],
'bucket': 'mybucket'
}
tasks = BulkTaskS3Import(**form_data).tasks()
assert tasks[0]['info']['filename'] == "myfile.%s" % ext
assert tasks[0]['info']['link'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
assert tasks[0]['info']['audio_url'] == "https://mybucket.s3.amazonaws.com/myfile.%s" % ext
def test_tasks_attributes_for_video_files(self):
#For video file extension: link, filename, link_raw, video_url
video_ext = ['mp4', 'm4v', 'ogg', 'ogv', 'webm', 'avi']
file_data = (u'{"bytes":286,'
u'"link":"https://www.dropbox.com/s/l2b77qvlrequ6gl/test.extension?dl=0",'
u'"name":"test.extension",'
u'"icon":"https://www.dropbox.com/static/images/icons64/page_white_text.png"}')
for ext in video_ext:
data = string.replace(file_data,'extension', ext)
form_data = {'files': [data]}
tasks = BulkTaskDropboxImport(**form_data).tasks()
assert tasks[0]['info']['filename'] == "test.%s" % ext
assert tasks[0]['info']['link'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?dl=0" % ext
assert tasks[0]['info']['link_raw'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?raw=1" % ext
assert tasks[0]['info']['video_url'] == "https://dl.dropboxusercontent.com/s/l2b77qvlrequ6gl/test.%s" % ext
def test_tasks_attributes_for_audio_files(self):
#For audio file extension: link, filename, link_raw, audio_url
audio_ext = ['mp4', 'm4a', 'mp3', 'ogg', 'oga', 'webm', 'wav']
file_data = (u'{"bytes":286,'
u'"link":"https://www.dropbox.com/s/l2b77qvlrequ6gl/test.extension?dl=0",'
u'"name":"test.extension",'
u'"icon":"https://www.dropbox.com/static/images/icons64/page_white_text.png"}')
for ext in audio_ext:
data = string.replace(file_data,'extension', ext)
form_data = {'files': [data]}
tasks = BulkTaskDropboxImport(**form_data).tasks()
assert tasks[0]['info']['filename'] == "test.%s" % ext
assert tasks[0]['info']['link'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?dl=0" % ext
assert tasks[0]['info']['link_raw'] == "https://www.dropbox.com/s/l2b77qvlrequ6gl/test.%s?raw=1" % ext
assert tasks[0]['info']['audio_url'] == "https://dl.dropboxusercontent.com/s/l2b77qvlrequ6gl/test.%s" % ext
def ParseWhois_INT(self):
int_contacts = (
{"page_field": "Registrant", "rec_field": "registrant"},
{"page_field": "Administrative Contact", "rec_field": "administrative"},
{"page_field": "Technical Contact", "rec_field": "technical"})
page = string.replace(self.page, "\r\n", "\n")
for contact in int_contacts:
page_field = contact['page_field']
s = "%s:(.*)\n\W" % page_field
m = re.search(s, page, re.DOTALL)
#if m: print m.group(1)
print "-------------------"
##
## ----------------------------------------------------------------------
##
##
## ----------------------------------------------------------------------
##
def strToHex(string):
"""
@param string: string to be converted into its hexadecimal value.
@type string: C{str}
@return: the hexadecimal converted string.
@rtype: C{str}
"""
hexStr = ""
for character in string:
if character == "\n":
character = " "
hexChar = "%2x" % ord(character)
hexChar = hexChar.replace(" ", "0")
hexChar = hexChar.upper()
hexStr += hexChar
return hexStr
def fileToStr(fileName):
"""
@param fileName: file path to read the content and return as a no
NEWLINE string.
@type fileName: C{file.open}
@return: the file content as a string without TAB and NEWLINE.
@rtype: C{str}
"""
filePointer = open(fileName, "r")
fileText = filePointer.read()
fileText = fileText.replace(" ", "")
fileText = fileText.replace("\t", "")
fileText = fileText.replace("\r", "")
fileText = fileText.replace("\n", " ")
return fileText
def _do_attr(self, n, value):
''''_do_attr(self, node) -> None
Process an attribute.'''
W = self.write
W(' ')
W(n)
W('="')
s = string.replace(value, "&", "&")
s = string.replace(s, "<", "<")
s = string.replace(s, '"', '"')
s = string.replace(s, '\011', '	')
s = string.replace(s, '\012', '
')
s = string.replace(s, '\015', '
')
W(s)
W('"')
def _do_attr(self, n, value):
''''_do_attr(self, node) -> None
Process an attribute.'''
W = self.write
W(' ')
W(n)
W('="')
s = string.replace(value, "&", "&")
s = string.replace(s, "<", "<")
s = string.replace(s, '"', '"')
s = string.replace(s, '\011', '	')
s = string.replace(s, '\012', '
')
s = string.replace(s, '\015', '
')
W(s)
W('"')
def more (self):
esc_from = self.esc_from
esc_to = self.esc_to
buffer = self.buffer + self.producer.more()
if buffer:
buffer = string.replace (buffer, esc_from, esc_to)
i = self.find_prefix_at_end (buffer, esc_from)
if i:
# we found a prefix
self.buffer = buffer[-i:]
return buffer[:-i]
else:
# no prefix, return it all
self.buffer = b''
return buffer
else:
return buffer
def do_put(self, s):
try:
params = s.split(' ')
if len(params) > 1:
src_path = params[0]
dst_path = params[1]
elif len(params) == 1:
src_path = params[0]
dst_path = ''
src_file = os.path.basename(src_path)
fh = open(src_path, 'rb')
dst_path = string.replace(dst_path, '/','\\')
import ntpath
pathname = ntpath.join(ntpath.join(self.__pwd,dst_path), src_file)
drive, tail = ntpath.splitdrive(pathname)
logging.info("Uploading %s to %s" % (src_file, pathname))
self.__transferClient.putFile(drive[:-1]+'$', tail, fh.read)
fh.close()
except Exception, e:
logging.critical(str(e))
pass
def do_cat(self, line, command = sys.stdout.write):
pathName = string.replace(line,'/','\\')
pathName = ntpath.normpath(ntpath.join(self.pwd,pathName))
res = self.findPathName(pathName)
if res is None:
logging.error("Not found!")
return
if res.isDirectory() > 0:
logging.error("It's a directory!")
return
if res.isCompressed() or res.isEncrypted() or res.isSparse():
logging.error('Cannot handle compressed/encrypted/sparse files! :(')
return
stream = res.getStream(None)
chunks = 4096*10
written = 0
for i in range(stream.getDataSize()/chunks):
buf = stream.read(i*chunks, chunks)
written += len(buf)
command(buf)
if stream.getDataSize() % chunks:
buf = stream.read(written, stream.getDataSize() % chunks)
command(buf)
logging.info("%d bytes read" % stream.getDataSize())
def do_put(self, s):
try:
params = s.split(' ')
if len(params) > 1:
src_path = params[0]
dst_path = params[1]
elif len(params) == 1:
src_path = params[0]
dst_path = ''
src_file = os.path.basename(src_path)
fh = open(src_path, 'rb')
dst_path = string.replace(dst_path, '/','\\')
import ntpath
pathname = ntpath.join(ntpath.join(self.__pwd,dst_path), src_file)
drive, tail = ntpath.splitdrive(pathname)
logging.info("Uploading %s to %s" % (src_file, pathname))
self.__transferClient.putFile(drive[:-1]+'$', tail, fh.read)
fh.close()
except Exception, e:
logging.critical(str(e))
pass
def do_put(self, s):
try:
if self.transferClient is None:
self.connect_transferClient()
params = s.split(' ')
if len(params) > 1:
src_path = params[0]
dst_path = params[1]
elif len(params) == 1:
src_path = params[0]
dst_path = '/'
src_file = os.path.basename(src_path)
fh = open(src_path, 'rb')
f = dst_path + '/' + src_file
pathname = string.replace(f,'/','\\')
logging.info("Uploading %s to %s\%s" % (src_file, self.share, dst_path))
self.transferClient.putFile(self.share, pathname, fh.read)
fh.close()
except Exception, e:
logging.error(str(e))
pass
self.send_data('\r\n')
def do_put(self, s):
try:
if self.transferClient is None:
self.connect_transferClient()
params = s.split(' ')
if len(params) > 1:
src_path = params[0]
dst_path = params[1]
elif len(params) == 1:
src_path = params[0]
dst_path = '/'
src_file = os.path.basename(src_path)
fh = open(src_path, 'rb')
f = dst_path + '/' + src_file
pathname = string.replace(f,'/','\\')
logging.info("Uploading %s to %s\%s" % (src_file, self.share, dst_path))
self.transferClient.putFile(self.share, pathname.decode(sys.stdin.encoding), fh.read)
fh.close()
except Exception, e:
logging.error(str(e))
pass
self.send_data('\r\n')
def copy_file(self, src, tree, dst):
LOG.info("Uploading file %s" % dst)
if isinstance(src, str):
# We have a filename
fh = open(src, 'rb')
else:
# We have a class instance, it must have a read method
fh = src
f = dst
pathname = string.replace(f,'/','\\')
try:
self.connection.putFile(tree, pathname, fh.read)
except:
LOG.critical("Error uploading file %s, aborting....." % dst)
raise
fh.close()
def retr_file(self, service, filename, callback, mode = FILE_OPEN, offset = 0, password = None, shareAccessMode = SMB_ACCESS_READ):
filename = string.replace(filename, '/', '\\')
fid = -1
tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password)
try:
fid = self.nt_create_andx(tid, filename, shareAccessMode = shareAccessMode, accessMask = 0x20089)
res = self.query_file_info(tid, fid)
datasize = SMBQueryFileStandardInfo(res)['EndOfFile']
self.__nonraw_retr_file(tid, fid, offset, datasize, callback)
finally:
if fid >= 0:
self.close(tid, fid)
self.disconnect_tree(tid)
def check_dir(self, service, path, password = None):
path = string.replace(path,'/', '\\')
tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password)
try:
smb = NewSMBPacket()
smb['Tid'] = tid
smb['Mid'] = 0
cmd = SMBCommand(SMB.SMB_COM_CHECK_DIRECTORY)
cmd['Parameters'] = ''
cmd['Data'] = SMBCheckDirectory_Data(flags = self.__flags2)
cmd['Data']['DirectoryName'] = path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else path
smb.addCommand(cmd)
self.sendSMB(smb)
while 1:
s = self.recvSMB()
if s.isValidAnswer(SMB.SMB_COM_CHECK_DIRECTORY):
return
finally:
self.disconnect_tree(tid)
def rmdir(self, service, path, password = None):
path = string.replace(path,'/', '\\')
# Check that the directory exists
self.check_dir(service, path, password)
tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password)
try:
path = path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else path
smb = NewSMBPacket()
smb['Tid'] = tid
createDir = SMBCommand(SMB.SMB_COM_DELETE_DIRECTORY)
createDir['Data'] = SMBDeleteDirectory_Data(flags=self.__flags2)
createDir['Data']['DirectoryName'] = path
smb.addCommand(createDir)
self.sendSMB(smb)
while 1:
s = self.recvSMB()
if s.isValidAnswer(SMB.SMB_COM_DELETE_DIRECTORY):
return
finally:
self.disconnect_tree(tid)
def mkdir(self, service, path, password = None):
path = string.replace(path,'/', '\\')
tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password)
try:
path = path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else path
smb = NewSMBPacket()
smb['Tid'] = tid
smb['Mid'] = 0
createDir = SMBCommand(SMB.SMB_COM_CREATE_DIRECTORY)
createDir['Data'] = SMBCreateDirectory_Data(flags=self.__flags2)
createDir['Data']['DirectoryName'] = path
smb.addCommand(createDir)
self.sendSMB(smb)
smb = self.recvSMB()
if smb.isValidAnswer(SMB.SMB_COM_CREATE_DIRECTORY):
return 1
return 0
finally:
self.disconnect_tree(tid)
def rename(self, service, old_path, new_path, password = None):
old_path = string.replace(old_path,'/', '\\')
new_path = string.replace(new_path,'/', '\\')
tid = self.tree_connect_andx('\\\\' + self.__remote_name + '\\' + service, password)
try:
smb = NewSMBPacket()
smb['Tid'] = tid
smb['Mid'] = 0
renameCmd = SMBCommand(SMB.SMB_COM_RENAME)
renameCmd['Parameters'] = SMBRename_Parameters()
renameCmd['Parameters']['SearchAttributes'] = ATTR_SYSTEM | ATTR_HIDDEN | ATTR_DIRECTORY
renameCmd['Data'] = SMBRename_Data(flags = self.__flags2)
renameCmd['Data']['OldFileName'] = old_path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else old_path
renameCmd['Data']['NewFileName'] = new_path.encode('utf-16le') if self.__flags2 & SMB.FLAGS2_UNICODE else new_path
smb.addCommand(renameCmd)
self.sendSMB(smb)
smb = self.recvSMB()
if smb.isValidAnswer(SMB.SMB_COM_RENAME):
return 1
return 0
finally:
self.disconnect_tree(tid)
def rmdir(self, shareName, pathName, password = None):
# ToDo: Handle situations where share is password protected
pathName = string.replace(pathName,'/', '\\')
pathName = ntpath.normpath(pathName)
if len(pathName) > 0 and pathName[0] == '\\':
pathName = pathName[1:]
treeId = self.connectTree(shareName)
fileId = None
try:
fileId = self.create(treeId, pathName, desiredAccess=DELETE | FILE_READ_ATTRIBUTES | SYNCHRONIZE,
shareMode=FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE,
creationOptions=FILE_DIRECTORY_FILE | FILE_OPEN_REPARSE_POINT,
creationDisposition=FILE_OPEN, fileAttributes=0)
from impacket import smb
delete_req = smb.SMBSetFileDispositionInfo()
delete_req['DeletePending'] = True
self.setInfo(treeId, fileId, inputBlob=delete_req, fileInfoClass=SMB2_FILE_DISPOSITION_INFO)
finally:
if fileId is not None:
self.close(treeId, fileId)
self.disconnectTree(treeId)
return True
def remove(self, shareName, pathName, password = None):
# ToDo: Handle situations where share is password protected
pathName = string.replace(pathName,'/', '\\')
pathName = ntpath.normpath(pathName)
if len(pathName) > 0 and pathName[0] == '\\':
pathName = pathName[1:]
treeId = self.connectTree(shareName)
fileId = None
try:
fileId = self.create(treeId, pathName,DELETE | FILE_READ_ATTRIBUTES, FILE_SHARE_DELETE, FILE_NON_DIRECTORY_FILE | FILE_DELETE_ON_CLOSE, FILE_OPEN, 0)
finally:
if fileId is not None:
self.close(treeId, fileId)
self.disconnectTree(treeId)
return True
def storeFile(self, shareName, path, callback, mode = FILE_OVERWRITE_IF, offset = 0, password = None, shareAccessMode = FILE_SHARE_WRITE):
# ToDo: Handle situations where share is password protected
path = string.replace(path,'/', '\\')
path = ntpath.normpath(path)
if len(path) > 0 and path[0] == '\\':
path = path[1:]
treeId = self.connectTree(shareName)
fileId = None
try:
fileId = self.create(treeId, path, FILE_WRITE_DATA, shareAccessMode, FILE_NON_DIRECTORY_FILE, mode, 0)
finished = False
writeOffset = offset
while not finished:
data = callback(self._Connection['MaxWriteSize'])
if len(data) == 0:
break
written = self.write(treeId, fileId, data, writeOffset, len(data))
writeOffset += written
finally:
if fileId is not None:
self.close(treeId, fileId)
self.disconnectTree(treeId)
def save_plot(self,plot_name,**kwargs):
logger = logging.getLogger("plotting")
if plot_name not in self.plots:
logger.warn('Tried to generate a plot called %s that does not exist' % plot_name)
# raise an exception here?
else:
# # the filename to save to is known by the handler, which needs to be assigned to this logger
# # look at the handlers attached to this logger instance
# ph=None
# for h in self.handlers:
# # we want an instance of a PlotHandler - we'll take the first one we find
# # (behaviour will be unpredictable if there is more than one handler of this type)
# if isinstance(h,PlotHandler):
# ph=h
# break
# if ph:
# TO DO - need to be sure of safe file names
if not os.path.isdir(self.plot_path):
os.makedirs(self.plot_path)
filename = self.plot_path + "/" + string.replace(plot_name, " ", "_") + ".pdf"
logger.info('Generating a plot in file %s' % filename)
self.plots[plot_name].generate_plot(filename,**kwargs)
# else:
# logger.warn('No handler of type PlotHandler is attached to this logger - cannot save plots')
def _do_attr(self, n, value):
''''_do_attr(self, node) -> None
Process an attribute.'''
W = self.write
W(' ')
W(n)
W('="')
s = string.replace(value, "&", "&")
s = string.replace(s, "<", "<")
s = string.replace(s, '"', '"')
s = string.replace(s, '\011', '	')
s = string.replace(s, '\012', '
')
s = string.replace(s, '\015', '
')
W(s)
W('"')
def parseResp(self, resp):
def fan(v):
def var_func_00000026(x):
return [x, (x.upper)(), (x.lower)()]
return distinct((lambda var_0000002B:selectMany(var_func_00000026, var_0000002B))([v, ((v.replace)("\\", "\\\\").replace)("\"", "\\\"")]))
def split(v, t):
for tag in fan(t):
if (len(v) != 2):
v = (v[0].split)(tag)
else:
break
return v
e = ((self.injector).emitter)
if ((e.prefix) == None):
return resp
p = split([resp], (e.prefix))
if (len(p) < 2):
return resp
return split([p[1]], (e.suffix))[0]
def retr_file(self, service, filename, callback, mode = SMB_O_OPEN, offset = 0, password = None, timeout = None):
filename = string.replace(filename, '/', '\\')
fid = -1
tid = self.__connect_tree('\\\\' + self.__remote_name + '\\' + service, SERVICE_ANY, password, timeout)
try:
fid, attrib, lastwritetime, datasize, grantedaccess, filetype, devicestate, action, serverfid = self.__open_file(tid, filename, mode, SMB_ACCESS_READ | SMB_SHARE_DENY_WRITE)
#if not datasize:
datasize = self.__query_file_info(tid, fid)
if self.__can_read_raw:
self.__raw_retr_file(tid, fid, offset, datasize, callback)
else:
self.__nonraw_retr_file(tid, fid, offset, datasize, callback, timeout)
finally:
if fid >= 0:
self.__close_file(tid, fid)
self.__disconnect_tree(tid)
def stor_file(self, service, filename, callback, mode = SMB_O_CREAT | SMB_O_TRUNC, offset = 0, password = None, timeout = None):
filename = string.replace(filename, '/', '\\')
fid = -1
tid = self.__connect_tree('\\\\' + self.__remote_name + '\\' + service, SERVICE_ANY, password, timeout)
try:
fid, attrib, lastwritetime, datasize, grantedaccess, filetype, devicestate, action, serverfid = self.__open_file(tid, filename, mode, SMB_ACCESS_WRITE | SMB_SHARE_DENY_WRITE)
# If the max_transmit buffer size is more than 16KB, upload process using non-raw mode is actually
# faster than using raw-mode.
if self.__max_transmit_size < 16384 and self.__can_write_raw:
# Once the __raw_stor_file returns, fid is already closed
self.__raw_stor_file(tid, fid, offset, datasize, callback, timeout)
fid = -1
else:
self.__nonraw_stor_file(tid, fid, offset, datasize, callback, timeout)
finally:
if fid >= 0:
self.__close_file(tid, fid)
self.__disconnect_tree(tid)
def googlesearch(query, ext):
print query
google="https://www.google.co.in/search?filter=0&q=site:"
getrequrl="https://www.google.co.in/search?filter=0&num=100&q=%s&start=" % (query)
hdr = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
'Accept-Encoding': 'none',
'Accept-Language': 'en-US,en;q=0.8',
'Connection': 'keep-alive'}
req=urllib2.Request(getrequrl, headers=hdr)
response=urllib2.urlopen(req)
data = response.read()
data=re.sub('<b>','',data)
for e in ('>','=','<','\\','(',')','"','http',':','//'):
data = string.replace(data,e,' ')
r1 = re.compile('[-_.a-zA-Z0-9.-_]*'+'\.'+ ext)
res = r1.findall(data)
if res==[]:
print "No results were found"
else:
return res
def test_inplace_rewrites(self):
# Check that strings don't copy and modify cached single-character strings
self.checkequal('a', 'A', 'lower')
self.checkequal(True, 'A', 'isupper')
self.checkequal('A', 'a', 'upper')
self.checkequal(True, 'a', 'islower')
self.checkequal('a', 'A', 'replace', 'A', 'a')
self.checkequal(True, 'A', 'isupper')
self.checkequal('A', 'a', 'capitalize')
self.checkequal(True, 'a', 'islower')
self.checkequal('A', 'a', 'swapcase')
self.checkequal(True, 'a', 'islower')
self.checkequal('A', 'a', 'title')
self.checkequal(True, 'a', 'islower')
def _ExpandVariables(self, data, substitutions):
"""Expands variables "$(variable)" in data.
Args:
data: object, can be either string, list or dictionary
substitutions: dictionary, variable substitutions to perform
Returns:
Copy of data where each references to "$(variable)" has been replaced
by the corresponding value found in substitutions, or left intact if
the key was not found.
"""
if isinstance(data, str):
for key, value in substitutions.iteritems():
data = data.replace('$(%s)' % key, value)
return data
if isinstance(data, list):
return [self._ExpandVariables(v, substitutions) for v in data]
if isinstance(data, dict):
return {k: self._ExpandVariables(data[k], substitutions) for k in data}
return data
def _do_attr(self, n, value):
''''_do_attr(self, node) -> None
Process an attribute.'''
W = self.write
W(' ')
W(n)
W('="')
s = string.replace(value, "&", "&")
s = string.replace(s, "<", "<")
s = string.replace(s, '"', '"')
s = string.replace(s, '\011', '	')
s = string.replace(s, '\012', '
')
s = string.replace(s, '\015', '
')
W(s)
W('"')
def test_inplace_rewrites(self):
# Check that strings don't copy and modify cached single-character strings
self.checkequal('a', 'A', 'lower')
self.checkequal(True, 'A', 'isupper')
self.checkequal('A', 'a', 'upper')
self.checkequal(True, 'a', 'islower')
self.checkequal('a', 'A', 'replace', 'A', 'a')
self.checkequal(True, 'A', 'isupper')
self.checkequal('A', 'a', 'capitalize')
self.checkequal(True, 'a', 'islower')
self.checkequal('A', 'a', 'swapcase')
self.checkequal(True, 'a', 'islower')
self.checkequal('A', 'a', 'title')
self.checkequal(True, 'a', 'islower')
def test_inplace_rewrites(self):
# Check that strings don't copy and modify cached single-character strings
self.checkequal('a', 'A', 'lower')
self.checkequal(True, 'A', 'isupper')
self.checkequal('A', 'a', 'upper')
self.checkequal(True, 'a', 'islower')
self.checkequal('a', 'A', 'replace', 'A', 'a')
self.checkequal(True, 'A', 'isupper')
self.checkequal('A', 'a', 'capitalize')
self.checkequal(True, 'a', 'islower')
self.checkequal('A', 'a', 'swapcase')
self.checkequal(True, 'a', 'islower')
self.checkequal('A', 'a', 'title')
self.checkequal(True, 'a', 'islower')
def format_invoke_command(self, string):
"""
Formats correctly the string ouput from the invoke() method,
replacing line breaks and tabs when necessary.
"""
string = string.replace('\\n', '\n')
formated_response = ''
for line in string.splitlines():
if line.startswith('REPORT'):
line = line.replace('\t', '\n')
if line.startswith('[DEBUG]'):
line = line.replace('\t', ' ')
formated_response += line + '\n'
formated_response = formated_response.replace('\n\n', '\n')
return formated_response
def is_http_log_entry(self, string):
"""
Determines if a log entry is an HTTP-formatted log string or not.
"""
# Debug event filter
if 'Zappa Event' in string:
return False
# IP address filter
for token in string.replace('\t', ' ').split(' '):
try:
if (token.count('.') is 3 and token.replace('.', '').isnumeric()):
return True
except Exception: # pragma: no cover
pass
return False
def readItems(data_dir):
fr = open(data_dir,'r')
alllines = fr.readlines()
num = len(alllines)
cmv = np.zeros((num,3))
imgID = []
for i in range(num):
line = alllines[i]
temp = string.replace(line,'\r','');temp = string.replace(temp,'\n',''); temp = temp.split(' ')
imgID.append(temp[2])
cmv[i,:] = [temp[0],temp[1],temp[3]]
return cmv, imgID
def sub(self, s):
for k, v in self.macros.items():
s = string.replace(s, k, v)
return s
def escape(s, replace=string.replace):
s = replace(s, "&", "&")
s = replace(s, "<", "<")
return replace(s, ">", ">",)
def _platform(*args):
""" Helper to format the platform string in a filename
compatible format e.g. "system-version-machine".
"""
# Format the platform string
platform = string.join(
map(string.strip,
filter(len, args)),
'-')
# Cleanup some possible filename obstacles...
replace = string.replace
platform = replace(platform,' ','_')
platform = replace(platform,'/','-')
platform = replace(platform,'\\','-')
platform = replace(platform,':','-')
platform = replace(platform,';','-')
platform = replace(platform,'"','-')
platform = replace(platform,'(','-')
platform = replace(platform,')','-')
# No need to report 'unknown' information...
platform = replace(platform,'unknown','')
# Fold '--'s and remove trailing '-'
while 1:
cleaned = replace(platform,'--','-')
if cleaned == platform:
break
platform = cleaned
while platform[-1] == '-':
platform = platform[:-1]
return platform
def _syscmd_file(target,default=''):
""" Interface to the system's file command.
The function uses the -b option of the file command to have it
ommit the filename in its output and if possible the -L option
to have the command follow symlinks. It returns default in
case the command should fail.
"""
if sys.platform in ('dos','win32','win16','os2'):
# XXX Others too ?
return default
target = _follow_symlinks(target).replace('"', '\\"')
try:
f = os.popen('file "%s" 2> %s' % (target, DEV_NULL))
except (AttributeError,os.error):
return default
output = string.strip(f.read())
rc = f.close()
if not output or rc:
return default
else:
return output
### Information about the used architecture
# Default values for architecture; non-empty strings override the
# defaults given as parameters
def handle_data( self, data ) :
data = string.replace( data, '\r', '' )
...
def safe(self, input):
input = string.replace(input, "'", "\\'")
input = string.replace(input, '"', '\\"')
input = string.replace(input, ";", "\\;")
input = string.replace(input, "%", "\\%")
input = string.replace(input, "_", "\\_")
return input
def exec_python(self, result):
# Condition the code. Replace all tabs with four spaces. Then make
# sure that we unindent every line by the indentation level of the
# first line.
code = result.group('code')
code = string.replace(code, '\t', ' ')
result2 = re.search(r'(?P<prefix>\n[ ]*)[#a-zA-Z0-9''"]', code)
if not result2:
raise ParsingError,'Invalid template code expression: ' + code
code = string.replace(code, result2.group('prefix'), '\n')
code = code + '\n'
try:
self.global_dict['OUTPUT_TEXT'] = ''
if self.local_dict:
exec code in self.global_dict, self.local_dict
else:
exec code in self.global_dict
return self.global_dict['OUTPUT_TEXT']
except:
self.errorLogger('\n---- Error parsing: ----\n')
self.errorLogger(code)
self.errorLogger('\n------------------------\n')
raise
# Subroutine called from re module for every block of code to be
# evaluated. Returned the result of the evaluation (should be a string).