Python string 模块,Template() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用string.Template()。
def run_batch_query(self, query: str,
labels: List[str] = None,
params: List[dict] = None,
chunk_size: int = 1000):
node_labels = ':{0}'.format(':'.join(labels)) \
if labels else ''
query_template = Template("UNWIND {params} AS params " + query)
labeled_query = query_template.safe_substitute(labels=node_labels)
chunk_count = 1
def batch():
for i in range(0, len(params), chunk_size):
logger.debug('starting chunk %s', i)
result = (yield labeled_query,
dict(params=params[i:i + chunk_size]))
logger.debug(result)
result = self.run_in_tx(batch(), chunk_count=chunk_count)
return result
def generate_menu(name, menu_text):
"""Generate and return a sublime-menu from a template."""
from . import persist
plugin_dir = os.path.join(sublime.packages_path(), persist.PLUGIN_DIRECTORY)
path = os.path.join(plugin_dir, '{}.sublime-menu.template'.format(name))
with open(path, encoding='utf8') as f:
template = f.read()
# Get the indent for the menus within the template,
# indent the chooser menus except for the first line.
indent = MENU_INDENT_RE.search(template).group(1)
menu_text = indent_lines(menu_text, indent)
text = Template(template).safe_substitute({'menus': menu_text})
path = os.path.join(plugin_dir, '{}.sublime-menu'.format(name))
with open(path, mode='w', encoding='utf8') as f:
f.write(text)
return text
def get_template(self, template_name):
tried = []
for template_file in self.iter_template_filenames(template_name):
try:
with io.open(template_file, encoding=settings.FILE_CHARSET) as fp:
template_code = fp.read()
except IOError as e:
if e.errno == errno.ENOENT:
tried.append((
Origin(template_file, template_name, self),
'Source does not exist',
))
continue
raise
return Template(template_code)
else:
raise TemplateDoesNotExist(template_name, tried=tried, backend=self)
def get_wrapper_template(self, declaration):
arg_desc = []
for option in declaration['options']:
option_desc = [self.TYPE_NAMES.get(arg['type'], arg['type']) + ' ' + arg['name']
for arg in option['arguments']
if not arg.get('ignore_check', False)]
# TODO: this should probably go to THPLongArgsPlugin
if option_desc:
arg_desc.append('({})'.format(', '.join(option_desc)))
else:
arg_desc.append('no arguments')
arg_desc.sort(key=len)
arg_desc = ['"' + desc + '"' for desc in arg_desc]
arg_str = ', '.join(arg_desc)
readable_name = declaration['python_name']
return Template(self.WRAPPER_TEMPLATE.safe_substitute(
readable_name=readable_name, num_options=len(arg_desc),
expected_args=arg_str))
def declare_methods(self, stateless):
tensor_methods = ''
for declaration in (self.declarations if not stateless else self.stateless_declarations):
flags = 'METH_VARARGS'
flags += ' | ' + declaration.get('method_flags') if 'method_flags' in declaration else ''
if not declaration.get('only_register'):
flags += ' | METH_KEYWORDS'
if declaration.get('override_method_flags'):
flags = declaration['override_method_flags']
entry = Template(' {"$python_name", (PyCFunction)$name, $flags, NULL},\n').substitute(
python_name=declaration['python_name'], name=declaration['name'], flags=flags
)
if 'defined_if' in declaration:
entry = self.preprocessor_guard(entry, declaration['defined_if'])
tensor_methods += entry
return self.TENSOR_METHODS_DECLARATION.substitute(methods=tensor_methods, stateless=('' if not stateless else 'stateless_'))
def is_available(room_email,start_time,end_time):
xml_template = open("getavailibility_template.xml", "r").read()
xml = Template(xml_template)
headers = {}
headers["Content-type"] = "text/xml; charset=utf-8"
data=unicode(xml.substitute(email=room_email,starttime=start_time,endtime=end_time)).strip()
response=requests.post(url,headers = headers, data= data, auth= HttpNtlmAuth(user,password))
tree = ET.fromstring(response.text.encode('utf-8'))
status = "Free"
# arrgh, namespaces!!
elems=tree.findall(".//{http://schemas.microsoft.com/exchange/services/2006/types}BusyType")
for elem in elems:
status=elem.text
elems=tree.findall(".//faultcode")
if elems:
sys.stderr.write("Error occured\n")
sys.stderr.write("tree: "+str(tree)+"\n")
sys.stderr.write("response: "+response.text.encode('utf-8')+"\n")
status= "N/A"
sys.stderr.write("Room status: "+str(status)+"\n")
return (status == "Free")
def execute(self, command, params):
"""
Send a command to the remote server.
Any path subtitutions required for the URL mapped to the command should be
included in the command parameters.
:Args:
- command - A string specifying the command to execute.
- params - A dictionary of named parameters to send with the command as
its JSON payload.
"""
command_info = self._commands[command]
assert command_info is not None, 'Unrecognised command %s' % command
data = utils.dump_json(params)
path = string.Template(command_info[1]).substitute(params)
url = '%s%s' % (self._url, path)
return self._request(command_info[0], url, body=data)
def __init__(self, fmt=None, datefmt="%Y-%m-%d %H:%M:%S", exceptionfmt="Traceback (most recent call last):\n%(traceback)s\n%(classname)s: %(message)s"):
# set standard Formatter values
self.datefmt = datefmt
self._fmt = fmt
self._exceptionfmt = exceptionfmt
# set colors
self._fmt = Template(fmt).safe_substitute(SHELL_COLORS)
self._exceptionfmt = Template(exceptionfmt).safe_substitute(SHELL_COLORS)
# do we have a tty?
if sys.stdout.isatty() or os.isatty(sys.stdout.fileno()):
self.useColors = True
else:
self.useColors = False
# remove all colors
if not self.useColors:
self._fmt = re.sub("\033\[(\d+|;)+m", "", self._fmt)
self._exceptionfmt = re.sub("\033\[(\d+|;)+m", "", self._exceptionfmt)
def __init__(self, fmt=None, datefmt="%Y-%m-%d %H:%M:%S", exceptionfmt="Traceback (most recent call last):\n%(traceback)s\n%(classname)s: %(message)s"):
# set standard Formatter values
self.datefmt = datefmt
self._fmt = fmt
self._exceptionfmt = exceptionfmt
# set colors
self._fmt = Template(fmt).safe_substitute(SHELL_COLORS)
self._exceptionfmt = Template(exceptionfmt).safe_substitute(SHELL_COLORS)
# do we have a tty?
if sys.stdout.isatty() or os.isatty(sys.stdout.fileno()):
self.useColors = True
else:
self.useColors = False
# remove all colors
if not self.useColors:
self._fmt = re.sub("\033\[(\d+|;)+m", "", self._fmt)
self._exceptionfmt = re.sub("\033\[(\d+|;)+m", "", self._exceptionfmt)
def _get_submodule_code(op):
parameters = ', '.join('%s &%s' % (_dtype_to_ctype[t], name)
for i, (name, t)
in enumerate(zip(op.param_names, op.types)))
typedecl = ''.join(('typedef %s in%d_type;\n' % (_dtype_to_ctype[t], i))
for i, t in enumerate(op.types[:op.nin]))
typedecl += ''.join(('typedef %s out%d_type;\n' % (_dtype_to_ctype[t], i))
for i, t in enumerate(op.types[op.nin:]))
module_code = string.Template('''
__device__ void ${name}(${parameters}) {
${typedecl}
${operation};
}
''').substitute(
name=op.name,
parameters=parameters,
operation=op.operation,
typedecl=typedecl)
return module_code + '\n'
def _get_pre_code(in_vars, out_vars, operation):
in_params = ', '.join('%s v%s' % (_dtype_to_ctype[v.ty], v.num)
for v in in_vars)
out_params = ''.join('%s v%s;\n' % (_dtype_to_ctype[v.ty], v.num)
for v in out_vars)
module_code = string.Template('''
__device__ ${return_type} _pre_map(${in_params}) {
${out_params}
${operation};
return ${return_var};
}
''').substitute(
return_type=_dtype_to_ctype[out_vars[0].ty],
in_params=in_params,
out_params=out_params,
operation=operation,
return_var='v%d' % out_vars[0].num)
return module_code
def process_expansions(dct):
def expand(val, dct):
if isinstance(val, six.integer_types) or isinstance(val, bool):
return val
if isinstance(val, six.string_types):
dct2 = copy.deepcopy(dct)
for env_key, env_val in six.iteritems(os.environ):
dct2[env_key] = env_val
return string.Template(val).safe_substitute(dct2)
if isinstance(val, list):
return [expand(x, dct) for x in val]
if isinstance(val, dict):
return {k: expand(v,val) for k,v in six.iteritems(val)}
return val
for key,val in six.iteritems(dct):
nval = expand(val, dct)
dct[key] = nval
def translate(self):
visitor = self.translator_class(self.document)
self.document.walkabout(visitor)
# copy parts
for part in self.visitor_attributes:
setattr(self, part, getattr(visitor, part))
# get template string from file
try:
template_file = open(self.document.settings.template, 'rb')
except IOError:
template_file = open(os.path.join(self.default_template_path,
self.document.settings.template), 'rb')
template = string.Template(unicode(template_file.read(), 'utf-8'))
template_file.close()
# fill template
self.assemble_parts() # create dictionary of parts
self.output = template.substitute(self.parts)
def substitute(template, context):
"""
Performs string substitution.
Args:
template (str): The string to perform the substitution on.
context (dict): A mapping of keys to values for substitution.
Returns:
str: The substituted string.
"""
from string import Template
template = Template(template)
return template.safe_substitute(context)
def translate(self):
visitor = self.translator_class(self.document)
self.document.walkabout(visitor)
# copy parts
for part in self.visitor_attributes:
setattr(self, part, getattr(visitor, part))
# get template string from file
try:
template_file = open(self.document.settings.template, 'rb')
except IOError:
template_file = open(os.path.join(self.default_template_path,
self.document.settings.template), 'rb')
template = string.Template(unicode(template_file.read(), 'utf-8'))
template_file.close()
# fill template
self.assemble_parts() # create dictionary of parts
self.output = template.substitute(self.parts)
def init_with_genesis(smoketest_genesis):
with open(GENESIS_PATH, 'wb') as handler:
json.dump(smoketest_genesis, handler)
cmd = '$RST_GETH_BINARY --datadir $RST_DATADIR init {}'.format(GENESIS_PATH)
args = shlex.split(
Template(cmd).substitute(os.environ)
)
init = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
out, err = init.communicate()
assert init.returncode == 0
return (out, err)
def format_index_html(self, dirname):
INDEX_TEMPLATE = u'''
<html>
<title>Directory listing for $dirname</title>
<body>
<h2>Directory listing for $dirname</h2>
<hr>
<ul>
$html
</ul>
<hr>
</body></html>
'''
html = ''
if not isinstance(dirname, unicode):
dirname = dirname.decode(sys.getfilesystemencoding())
for name in os.listdir(dirname):
fullname = os.path.join(dirname, name)
suffix = u'/' if os.path.isdir(fullname) else u''
html += u'<li><a href="%s%s">%s%s</a>\r\n' % (name, suffix, name, suffix)
return string.Template(INDEX_TEMPLATE).substitute(dirname=dirname, html=html)
def execute(self, command, params):
"""
Send a command to the remote server.
Any path subtitutions required for the URL mapped to the command should be
included in the command parameters.
:Args:
- command - A string specifying the command to execute.
- params - A dictionary of named parameters to send with the command as
its JSON payload.
"""
command_info = self._commands[command]
assert command_info is not None, 'Unrecognised command %s' % command
data = utils.dump_json(params)
path = string.Template(command_info[1]).substitute(params)
url = '%s%s' % (self._url, path)
return self._request(command_info[0], url, body=data)
def __init__(self, detail=None, headers=None, comment=None,
body_template=None, **kw):
Response.__init__(self,
status='%s %s' % (self.code, self.title),
**kw)
Exception.__init__(self, detail)
if headers:
self.headers.extend(headers)
self.detail = detail
self.comment = comment
if body_template is not None:
self.body_template = body_template
self.body_template_obj = Template(body_template)
if self.empty_body:
del self.content_type
del self.content_length
def handle(self, *options):
if not options:
self.show_usage('You must provide a project name')
project_name = options[0]
template_dir = os.path.join(noopy.__path__[0], 'project_template')
os.mkdir(project_name)
shutil.copytree(os.path.join(template_dir, 'src'), '{}/src'.format(project_name))
context = dict(
project_name=project_name,
lambda_prefix=to_pascal_case(project_name),
)
context['account_id'] = raw_input('Input aws account id > ').strip()
context['role_arn'] = raw_input('Input role arn to be granted to lambda function > ').strip()
for file_path in glob.glob('{}/*.py'.format(template_dir)):
with open(file_path, 'r') as f:
content = f.read()
filename = os.path.split(file_path)[1]
with open(os.path.join(project_name, filename), 'w') as f:
f.write(Template(content).substitute(**context))
print 'Project "{}" created'.format(project_name)
def start_scrapy_project(project_name):
"""Bootstrap a portia project with default scrapy files."""
if PY2:
project_name = encode(project_name)
files = find_files(project_name)
out_files = {}
for path, contents in files.items():
contents = string.Template(contents).substitute(
project_name=project_name,
ProjectName=string_camelcase(project_name)
)
if path.endswith('.tmpl'):
path = path[:-len('.tmpl')]
if path.endswith('scrapy.cfg'):
path = 'scrapy.cfg'
out_files[path] = contents
out_files['setup.py'] = SETUP(project_name)
return out_files
def _bbox_2_wkt(bbox, srid):
'''
Given a bbox dictionary, return a WKTSpatialElement, transformed
into the database\'s CRS if necessary.
returns e.g. WKTSpatialElement("POLYGON ((2 0, 2 1, 7 1, 7 0, 2 0))", 4326)
'''
db_srid = int(config.get('ckan.spatial.srid', '4326'))
bbox_template = Template('POLYGON (($minx $miny, $minx $maxy, $maxx $maxy, $maxx $miny, $minx $miny))')
wkt = bbox_template.substitute(minx=bbox['minx'],
miny=bbox['miny'],
maxx=bbox['maxx'],
maxy=bbox['maxy'])
if srid and srid != db_srid:
# Input geometry needs to be transformed to the one used on the database
input_geometry = ST_Transform(WKTElement(wkt,srid),db_srid)
else:
input_geometry = WKTElement(wkt,db_srid)
return input_geometry
def show_kernel_error(self, error):
"""Show kernel initialization errors."""
# Remove unneeded blank lines at the beginning
eol = sourcecode.get_eol_chars(error)
if eol:
error = error.replace(eol, '<br>')
# Don't break lines in hyphens
# From http://stackoverflow.com/q/7691569/438386
error = error.replace('-', '‑')
message = _("An error occurred while starting the kernel")
kernel_error_template = Template(KERNEL_ERROR)
page = kernel_error_template.substitute(css_path=CSS_PATH,
message=message,
error=error)
self.setHtml(page)
def authenticate(self, username, password):
server = settings.get('authentication.config.server')
port = settings.get('authentication.config.port')
bind_user = settings.get('authentication.config.bind_user')
bind_password = settings.get('authentication.config.bind_password')
query = Template(settings.get('authentication.config.user_query'))
with simpleldap.Connection(server, port, bind_user, bind_password) as conn:
try:
user = conn.get(query.substitute(username=username))
except simpleldap.ObjectNotFound:
return None
with simpleldap.Connection(server, port) as conn:
if conn.authenticate(user.dn, password):
return User(
username=username,
name=user.first('cn'),
groups=[self._split_ldap_spec(x)['CN'] for x in user.get('memberof', [])]
)
return None
def execute_for_backends(self, cmd, pxname, svname, wait_for_status = None):
"""
Run some command on the specified backends. If no backends are provided they will
be discovered automatically (all backends)
"""
# Discover backends if none are given
if pxname is None:
backends = self.discover_all_backends()
else:
backends = [pxname]
# Run the command for each requested backend
for backend in backends:
# Fail when backends were not found
state = self.get_state_for(backend, svname)
if (self.fail_on_not_found or self.wait) and state is None:
self.module.fail_json(msg="The specified backend '%s/%s' was not found!" % (backend, svname))
self.execute(Template(cmd).substitute(pxname = backend, svname = svname))
if self.wait:
self.wait_until_status(backend, svname, wait_for_status)
def create_capability_files(template_path, themes_path, map_path, fonts_path, use_debug, shapepath):
template = Template( open( os.path.join(template_path, "SeaChart_THEME.map"), 'r' ).read() )
for theme in os.listdir(themes_path):
# Remove file suffix
theme = os.path.splitext(theme)[0]
debug_string = ""
if use_debug:
debug_string = str.format(debug_template, theme)
d = get_dictionary(theme, map_path, fonts_path, debug_string)
if shapepath:
d['SHAPEPATH'] = shapepath
fileout = open( os.path.join(map_path, "SeaChart_" + theme + ".map"), 'w' )
fileout.write( template.substitute(d) )
def create_legend_files(template_path, themes_path, map_path, fonts_path, use_debug):
template = Template( open( os.path.join(template_path, "SeaChart_Legend_THEME.map"), 'r' ).read() )
for theme in os.listdir(themes_path):
# Remove file suffix
theme = os.path.splitext(theme)[0]
debug_string = ""
if use_debug:
debug_string = str.format(debug_template, theme)
d = get_dictionary(theme, map_path, fonts_path, debug_string)
legend_path = dirutils.force_sub_dir(map_path, "legends")
fileout = open( os.path.join(legend_path, "SeaChart_Legend_" + theme + ".map"), 'w' )
fileout.write( template.substitute(d) )
def write_html_link_index(out_dir, link):
with open(LINK_INDEX_TEMPLATE, 'r', encoding='utf-8') as f:
link_html = f.read()
path = os.path.join(out_dir, 'index.html')
print(' ? Updating: index.html')
with open(path, 'w', encoding='utf-8') as f:
f.write(Template(link_html).substitute({
**link,
**link['latest'],
'type': link['type'] or 'website',
'tags': link['tags'] or 'untagged',
'bookmarked': datetime.fromtimestamp(float(link['timestamp'])).strftime('%Y-%m-%d %H:%M'),
'updated': datetime.fromtimestamp(float(link['updated'])).strftime('%Y-%m-%d %H:%M'),
'archive_org': link['latest'].get('archive_org') or 'https://web.archive.org/save/{}'.format(link['url']),
'wget': link['latest'].get('wget') or link['domain'],
}))
chmod_file(path)
def execute(self, command, params):
"""
Send a command to the remote server.
Any path subtitutions required for the URL mapped to the command should be
included in the command parameters.
:Args:
- command - A string specifying the command to execute.
- params - A dictionary of named parameters to send with the command as
its JSON payload.
"""
command_info = self._commands[command]
assert command_info is not None, 'Unrecognised command %s' % command
data = utils.dump_json(params)
path = string.Template(command_info[1]).substitute(params)
url = '%s%s' % (self._url, path)
return self._request(command_info[0], url, body=data)
def test_keyword_arguments_safe(self):
eq = self.assertEqual
raises = self.assertRaises
s = Template('$who likes $what')
eq(s.safe_substitute(who='tim', what='ham'), 'tim likes ham')
eq(s.safe_substitute(dict(who='tim'), what='ham'), 'tim likes ham')
eq(s.safe_substitute(dict(who='fred', what='kung pao'),
who='tim', what='ham'),
'tim likes ham')
s = Template('the mapping is $mapping')
eq(s.safe_substitute(dict(foo='none'), mapping='bozo'),
'the mapping is bozo')
eq(s.safe_substitute(dict(mapping='one'), mapping='two'),
'the mapping is two')
d = dict(mapping='one')
raises(TypeError, s.substitute, d, {})
raises(TypeError, s.safe_substitute, d, {})
def test_delimiter_override(self):
eq = self.assertEqual
raises = self.assertRaises
class AmpersandTemplate(Template):
delimiter = '&'
s = AmpersandTemplate('this &gift is for &{who} &&')
eq(s.substitute(gift='bud', who='you'), 'this bud is for you &')
raises(KeyError, s.substitute)
eq(s.safe_substitute(gift='bud', who='you'), 'this bud is for you &')
eq(s.safe_substitute(), 'this &gift is for &{who} &')
s = AmpersandTemplate('this &gift is for &{who} &')
raises(ValueError, s.substitute, dict(gift='bud', who='you'))
eq(s.safe_substitute(), 'this &gift is for &{who} &')
class PieDelims(Template):
delimiter = '@'
s = PieDelims('@who likes to eat a bag of @{what} worth $100')
self.assertEqual(s.substitute(dict(who='tim', what='ham')),
'tim likes to eat a bag of ham worth $100')
def __init__(self, fmt=None, datefmt=None, style='%'):
"""
Initialize the formatter with specified format strings.
Initialize the formatter either with the specified format string, or a
default as described above. Allow for specialized date formatting with
the optional datefmt argument (if omitted, you get the ISO8601 format).
Use a style parameter of '%', '{' or '$' to specify that you want to
use one of %-formatting, :meth:`str.format` (``{}``) formatting or
:class:`string.Template` formatting in your format string.
.. versionchanged: 3.2
Added the ``style`` parameter.
"""
if style not in _STYLES:
raise ValueError('Style must be one of: %s' % ','.join(
_STYLES.keys()))
self._style = _STYLES[style](fmt)
self._fmt = self._style._fmt
self.datefmt = datefmt
def bootstrap(self, **vars):
path = Template(self.path).substitute(**vars)
self._create(path)
if self.mode is not None:
os.chmod(path, self.mode)
if os.getuid() == 0:
if self.username is not None:
uid = pwd.getpwnam(self.username).pw_uid
else:
uid = -1
if self.group is not None:
gid = grp.getgrnam(self.group).gr_gid
else:
gid = -1
if uid != -1 or gid != -1:
os.chown(path, uid, gid)
def chord():
chords = request.args.get('transcript')
confidence = request.args.get('confidence')
'''do midi stuff with chord request'''
t = Template("request='${chords}' (${confidence})")
str = t.substitute({"chords": chords, "confidence": confidence})
print("========================================")
print(str)
print("========================================")
resp = parser(chords)
print json.dumps(resp, indent=4, sort_keys=True)
chord = Chord()
chord.getchord()
for note in resp['notes']:
chord.getchord(note['note'], note['params'][0], resp['octave'])
chord.play()
return json.dumps(resp)
def copy_template(template_path: Path, path: Path, variables: dict):
for d in template_path.iterdir():
target_path = path / d.relative_to(template_path)
if d.is_dir():
copy_template(d, target_path, variables)
elif target_path.exists():
# better not overwrite any existing files!
raise click.UsageError('Target file "{}" already exists. Aborting!'.format(target_path))
else:
with Action('Writing {}..'.format(target_path)):
target_path.parent.mkdir(parents=True, exist_ok=True)
with d.open() as fd:
contents = fd.read()
template = string.Template(contents)
contents = template.safe_substitute(variables)
with target_path.open('w') as fd:
fd.write(contents)
def substitute_file(from_file, to_file, substitutions):
""" Substitute contents in from_file with substitutions and
output to to_file using string.Template class
Raises: IOError file the file to replace from is not found
Arguments:
----------
from_file -- template file to load
to_file -- substituted file
substitutions -- dictionary of substitutions.
"""
with open(from_file, "r") as f_in:
source = string.Template(f_in.read())
with open(to_file, "w") as f_out:
outcome = source.safe_substitute(substitutions)
f_out.write(outcome)
def playlistinfo(self, sortkey='pos'):
"""Shows playlist information sorted by key
"""
new_buf = wc.buffer_search("", "mpc: playlist")
if len(new_buf) == 0:
new_buf = wc.buffer_new('mpc: playlist', "", "", "", "")
pl = self._mpdc.playlistinfo()
try:
# Numerical sort
spl = sorted(pl,
cmp=lambda x,y: cmp(int(x), int(y)),
key=itemgetter(sortkey))
except ValueError:
# Alpha sort
lcmp = lambda x,y: cmp(x.lower(), y.lower())
spl = sorted(pl,
cmp=lambda x,y: cmp(x.lower(), y.lower()),
key=itemgetter(sortkey))
t = Template(wc.config_get_plugin("playinfo"))
for line in spl:
wc.prnt(new_buf, t.safe_substitute(line))
return pl
def playlistinfo(self, sortkey='pos'):
"""Shows playlist information sorted by key
"""
new_buf = wc.buffer_search("", "mpc: playlist")
if len(new_buf) == 0:
new_buf = wc.buffer_new('mpc: playlist', "", "", "", "")
pl = self._mpdc.playlistinfo()
try:
# Numerical sort
spl = sorted(pl,
cmp=lambda x,y: cmp(int(x), int(y)),
key=itemgetter(sortkey))
except ValueError:
# Alpha sort
lcmp = lambda x,y: cmp(x.lower(), y.lower())
spl = sorted(pl,
cmp=lambda x,y: cmp(x.lower(), y.lower()),
key=itemgetter(sortkey))
t = Template(wc.config_get_plugin("playinfo"))
for line in spl:
wc.prnt(new_buf, t.safe_substitute(line))
return pl
def z_make_templates(source, target, replacements):
for f in os.listdir(source):
newtarget = os.path.join(target, f)
newsource = os.path.join(source, f)
if os.path.isdir(newsource):
os.makedirs(newtarget)
print z_info('Created folder ' + f)
z_make_templates(newsource, newtarget, replacements)
else:
targetfile = io.open(newtarget, 'w')
sourcefile = io.open(newsource, 'r')
for line in sourcefile:
template = Template(line)
targetfile.write(template.substitute(replacements))
targetfile.close()
print z_info('Created file ' + f)
# Merge two dicts recursively.
# Values from y will override the ones from x
# Returns x or y if the other is empty.
def _add_file_to_c2_repo(config, template_file_path, params, dest_path_in_c2_repo):
with open(template_file_path, 'r') as f:
templatized_file = string.Template(f.read())
dest_file = os.path.join(config["benign_repo_path"], dest_path_in_c2_repo)
with open(dest_file, "w") as f:
f.write(templatized_file.safe_substitute(params))
# Add file to the c2 repo
orig_dir = os.path.abspath(os.curdir)
# cd into cloned git repo to do git munging there
os.chdir(config["benign_repo_path"])
if "nothing to commit" not in str(subprocess.check_output("git status", shell=True)):
# Add agent.py and push
subprocess.check_output("git add %s" % dest_path_in_c2_repo, shell=True)
subprocess.check_output("git commit -m 'Add %s'" % dest_path_in_c2_repo, shell=True)
subprocess.check_output("git push --repo %s" % config["primary_clone_url"], shell=True)
os.chdir(orig_dir)
def _parse_yaml_file(fname):
# type: (str) -> Dict[str, Any]
"""Parse YAML file with environment variable substitution.
Parameters
----------
fname : str
yaml file name.
Returns
-------
table : Dict[str, Any]
the yaml file as a dictionary.
"""
content = read_file(fname)
# substitute environment variables
content = string.Template(content).substitute(os.environ)
return yaml.load(content)
def get_wrapper_template(self, declaration):
arg_desc = []
def describe_arg(arg):
desc = self.TYPE_NAMES[arg['type']] + ' ' + arg['name']
if arg.get('nullable'):
return '[{} or None]'.format(desc)
return desc
for option in declaration['options']:
option_desc = [describe_arg(arg)
for arg in option['arguments']
if not arg.get('ignore_check', False)]
if option_desc:
arg_desc.append('({})'.format(', '.join(option_desc)))
else:
arg_desc.append('no arguments')
arg_desc.sort(key=len)
arg_desc = ['"' + desc + '"' for desc in arg_desc]
arg_str = ', '.join(arg_desc)
return Template(self.WRAPPER_TEMPLATE.safe_substitute(expected_args=arg_str))
def get_html():
if DEV:
try:
with open("dev.html") as fptr:
html = Template(fptr.read())
return html
except Exception as err:
print(err)
print("Delivering embedded HTML")
return INDEX
def html(self):
# ????????
form_html = ''
if self.form_def is not None:
form_html = self.form_def.render(self.data, self._template, self.edit_fields, self.omit_fields, self.errors, **self.options)
for widget in self._widgets:
form_html += widget.html()
buttons = self.form_def.buttons(self._buttons)
else:
fields_html = {}
for widget in self._widgets:
if isinstance(widget, hidden_input):
form_html += widget.html()
else:
fields_html[widget.name] = widget.html()
if not self._template: self.layout()
form_html += Template(self._template).safe_substitute(fields_html)
buttons = self._gen_buttons_html(self._buttons)
if 'submit' in self._triggers:
kss_url = self._triggers['submit'][0][0]
else:
kss_url = ''
klass = ' '.join(self.klass)
if kss_url:
klass += ' KSSLoad'
loading_data = 'data-loading="%s"' % self.loading_text
else:
loading_data = ''
desc, h3 = '', ''
if self.title: h3 = '<h3>%s</h3>' % self.title
if self.description: desc = '<div class="discreet m_b_3">%s</div>' % self.description
if self._layout == 'inline':
return '''<form action="%s" %s class="%s" method="post">%s%s<table style="width: 100%%"><tr><td>%s</td><td>%s</td></tr></table>%s</form>''' % \
(kss_url or self.action, loading_data, klass, h3, desc, form_html, buttons, self.hidden_input)
else:
return '''<form action="%s" %s class="%s" method="post">%s%s%s%s%s</form>''' % \
(kss_url or self.action, loading_data, klass, h3, desc, form_html, buttons, self.hidden_input)
def _interpolate_env_vars(key):
return Template(key).substitute(defaultdict(lambda: "", os.environ))
def append_to_note(guid, new_content, main_new_content, add_if_already_exists):
#Get the note to be updated using the note's guid http://dev.evernote.com/documentation/reference/NoteStore.html#Fn_NoteStore_getNote
note_store = _get_note_store()
log_progress('load the \'2Archive\' note')
note = note_store.getNote(guid, True, True, False, False)
#Regular expressions used to replicate ENML tags. These same tags will be used to "rebuild" the note with the existing note metadata
log_progress('do the regEx stuff')
xmlTag = re.search('<\?xml.*?>', note.content).group()
docTag = re.search('<\!DOCTYPE.*?>', note.content).group()
noteOpenTag = re.search('<\s*en-note.*?>', note.content).group()
noteCloseTag = re.search('<\s*/en-note.*?>', note.content).group()
breakTag = '<br />'
#Rebuild the note using the new content
log_progress('Rebuild the note using the new content')
content = note.content.replace(xmlTag, "").replace(noteOpenTag, "").replace(noteCloseTag, "").replace(docTag, "").strip()
if main_new_content in content:
if add_if_already_exists:
content += breakTag + "".join(new_content)
else:
log_progress('url already in note')
else:
content += breakTag + ''.join(new_content)
template = Template ('$xml $doc $openTag $body $closeTag')
note.content = template.substitute(xml=xmlTag,doc=docTag,openTag=noteOpenTag,body=content,closeTag=noteCloseTag)
#Update the note
log_progress('save the updated note to evernote')
try:
_get_note_store().updateNote(note)
except:
print sys.exc_info()
#Return updated note (object) to the function
return note
def _getStates(self):
return self.__corePackage.states
# FIXME: implement this on our own without the Template class. How to do proper
# escaping?
def resolve(self, text):
if isinstance(text, str):
resolver = IncludeHelper.Resolver(self.__fileLoader, self.__baseDir, self.__varBase, text)
t = Template(text)
t.delimiter = '$<'
t.pattern = self.__pattern
ret = t.substitute(resolver)
sourceAnchor = "_BOB_SOURCES[$LINENO]=" + quote(self.__sourceName)
return ("\n".join(resolver.prolog + [sourceAnchor, ret]), "\n".join(resolver.incDigests))
else:
return (None, None)
def cli(project, base_image, base_deps, add_deps, requirements):
project_dir = utils.get_project_dir()
scrapy_config = shub_utils.get_config()
if not scrapy_config.has_option('settings', project):
raise shub_exceptions.BadConfigException(
'Settings for the project is not found')
settings_module = scrapy_config.get('settings', project)
values = {
'base_image': base_image,
'system_deps': _format_system_deps(base_deps, add_deps),
'system_env': _format_system_env(settings_module),
'requirements': _format_requirements(project_dir, requirements),
}
values = {key: value if value else '' for key, value in values.items()}
source = Template(DOCKERFILE_TEMPLATE.strip())
results = source.substitute(values)
results = results.replace('\n\n', '\n')
click.echo("The following Dockerfile will be created:\n{}".format(results))
valid = {"yes": True, "y": True, "ye": True,
"no": False, "n": False}
while True:
dockefile_path = os.path.join(project_dir, 'Dockerfile')
choice = input("Save to {}: (y/n)".format(dockefile_path)).lower()
if choice in valid:
if valid[choice]:
with open(dockefile_path, 'w') as dockerfile:
dockerfile.write(results)
click.echo('Saved.')
break
click.echo("Please respond with 'yes'('y') or 'no'(n)")