Python xml.etree.ElementTree 模块,Element() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用xml.etree.ElementTree.Element()。
def formatAlertsCount(numberofalerts, outformat):
""" Create XML / Json Structure with number of Alerts in requested timespan """
if outformat == "xml":
ewssimpleinfo = ET.Element('EWSSimpleIPInfo')
alertCount = ET.SubElement(ewssimpleinfo, 'AlertCount')
if numberofalerts:
alertCount.text = str(numberofalerts)
else:
alertCount.text = str(0)
prettify(ewssimpleinfo)
alertcountxml = '<?xml version="1.0" encoding="UTF-8"?>'
alertcountxml += (ET.tostring(ewssimpleinfo, encoding="utf-8", method="xml")).decode('utf-8')
return alertcountxml
else:
return ({'AlertCount': numberofalerts})
def exportXML(self):
elements = list()
for node in anytree.PreOrderIter(self.tree):
nodeName = parameters.encodeXMLName(node.name)
if not elements:
top = xmlElement(self.tree.name)
top.attrib['name'] = self.tree.name
top.attrib['is_checked'] = 'False'
top.attrib['is_default'] = 'False'
elements.append(top)
continue
for e in elements:
if e.attrib['name'] == node.parent.name:
se = xmlSubElement(e, nodeName)
se.attrib['name'] = node.name
se.attrib['is_checked'] = 'True' if node.is_checked else 'False'
se.attrib['is_default'] = 'True' if node.is_default else 'False'
se.attrib['parent'] = e.attrib['name']
elements.append(se)
break
# else:
# print('could not find parent for {}'.format(node.name))
string = xmlElementTree.tostring(top, encoding='unicode', method='xml')
return string
def to_xml(self):
"""Produces an XML out of the point data. Disregards the "action" field."""
el = etree.Element(self.osm_type, id=str(self.osm_id), version=str(self.version))
for tag, value in self.tags.items():
etree.SubElement(el, 'tag', k=tag, v=value)
if self.osm_type == 'node':
el.set('lat', str(self.lat))
el.set('lon', str(self.lon))
elif self.osm_type == 'way':
for node_id in self.members:
etree.SubElement(el, 'nd', ref=str(node_id))
elif self.osm_type == 'relation':
for member in self.members:
m = etree.SubElement(el, 'member')
for i, n in enumerate(('type', 'ref', 'role')):
m.set(n, str(member[i]))
return el
def get_xml(self, encoding='utf-8'):
"""
Returns the XML-code of this schedule.
The returned byte string contains the XML in the requested encoding. You save the byte sting to file in binary
mode (the file will encoded the requested encoding). Or you can convert the byte string to a string with
.decode(encoding).
:param str encoding: The encoding of the XML.
:rtype: bytes
"""
tree = Element(None)
self.generate_xml(tree)
xml_string = ElementTree.tostring(tree)
document = minidom.parseString(xml_string)
return document.toprettyxml(indent=' ', encoding=encoding)
# ------------------------------------------------------------------------------------------------------------------
def to_xml(self):
read = Element('read')
if self.echo is not None:
read.set('echo', self.echo)
if self.length is not None:
length = Element('length')
length.text = str(self.length)
read.append(length)
for thing in ('delim', 'match', 'assign'):
if getattr(self, thing) is not None:
read.append(getattr(self, thing).to_xml())
if self.timeout is not None:
timeout = Element('timeout')
timeout.text = str(self.timeout)
read.append(timeout)
return read
def to_xml(self):
root = Element('pov')
cbid = Element('cbid')
cbid.text = self.target
root.append(cbid)
seed_node = Element('seed')
seed_node.text = self.seed
root.append(seed_node)
replay = Element('replay')
root.append(replay)
for action in self.actions:
replay.append(action.to_xml())
# hack to make sure all crashes happen regardless of sockets closing
# replay.append(Delay(500).to_xml())
return root
def to_xml(self):
root = Element('pov')
cbid = Element('cbid')
cbid.text = self.target
root.append(cbid)
replay = Element('replay')
root.append(replay)
for action in self.actions:
replay.append(action.to_xml())
# hack to make sure all crashes happen regardless of sockets closing
# replay.append(Delay(500).to_xml())
return root
def test_to_xml(self):
random_cbid = random_string(random.randint(8, 24))
random_actions = [Decl('a', Value([Data("abcdef")])),
Delay(random.randint(0, 10000))]
# create XML representation by hand
element = Element('pov')
cbid = Element('cbid')
cbid.text = random_cbid
element.append(cbid)
replay = Element('replay')
for action in random_actions:
replay.append(action.to_xml())
element.append(replay)
# create POV and XML representation automatically
pov = CQE_POV(random_cbid, random_actions)
self.assertTrue(repr(pov) == ElementTree.tostring(element))
def to_xml(self):
if self.format is not None:
tag = Element('data', attrib={'format': self.format})
else:
tag = Element('data')
if self.format == 'hex':
tag.text = self.data.encode('hex')
else: # asciic case
encoded = ''
for c in self.data:
if c in _PRINTABLE:
encoded += c
elif c in ('\n', '\r', '\t'):
encoded += {
'\n': '\\n',
'\r': '\\r',
'\t': '\\t',
}[c]
else:
encoded += '\\x{:02x}'.format(ord(c))
tag.text = encoded
return tag
def as_xml(self):
xml_output = Element(self.plugin_command, title=self.COMMAND_TITLE)
xml_output.append(Element('vulnerable', isVulnerable=str(self.is_vulnerable)))
if len(self.support_protocols) > 0:
protocol_xml = Element('supportProtocols')
for p in self.support_protocols:
protocol_xml.append(Element('protocol',name=p))
xml_output.append(protocol_xml)
if len(self.support_vulnerable_ciphers) > 0:
cipher_xml = Element('vulnerableCipherSuites')
for c in self.support_vulnerable_ciphers:
cipher_xml.append(Element('cipherSuite',name=c))
xml_output.append(cipher_xml)
return xml_output
def as_xml(self):
xml_result = Element(self.plugin_command, title=self.COMMAND_TITLE)
resumption_rate_xml = Element(
'sessionResumptionWithSessionIDs',
attrib={'totalAttempts': str(self.attempted_resumptions_nb),
'errors': str(len(self.errored_resumptions_list)),
'isSupported': str(self.attempted_resumptions_nb == self.successful_resumptions_nb),
'successfulAttempts': str(self.successful_resumptions_nb),
'failedAttempts': str(self.failed_resumptions_nb)}
)
# Add error messages if there was any
for error_msg in self.errored_resumptions_list:
resumption_error_xml = Element('error')
resumption_error_xml.text = error_msg
resumption_rate_xml.append(resumption_error_xml)
xml_result.append(resumption_rate_xml)
return xml_result
def as_xml(self):
xml_result = Element(self.plugin_command, title=self.COMMAND_TITLE)
# We keep the session resumption XML node
resum_rate_xml = super(ResumptionResult, self).as_xml()
session_resum_xml = resum_rate_xml[0]
xml_result.append(session_resum_xml)
# Add the ticket resumption node
xml_resum_ticket_attr = {}
if self.ticket_resumption_error:
xml_resum_ticket_attr['error'] = self.ticket_resumption_error
else:
xml_resum_ticket_attr['isSupported'] = str(self.is_ticket_resumption_supported)
if not self.is_ticket_resumption_supported:
xml_resum_ticket_attr['reason'] = self.ticket_resumption_failed_reason
xml_resum_ticket = Element('sessionResumptionWithTLSTickets', attrib=xml_resum_ticket_attr)
xml_result.append(xml_resum_ticket)
return xml_result
def as_xml(self):
xml_result = Element(self.plugin_command, title=self.COMMAND_TITLE)
is_hsts_supported = True if self.hsts_header else False
xml_hsts_attr = {'isSupported': str(is_hsts_supported)}
if is_hsts_supported:
# Do some light parsing of the HSTS header
hsts_header_split = self.hsts_header.split('max-age=')[1].split(';')
hsts_max_age = hsts_header_split[0].strip()
hsts_subdomains = False
if len(hsts_header_split) > 1 and 'includeSubdomains' in hsts_header_split[1]:
hsts_subdomains = True
xml_hsts_attr['maxAge'] = hsts_max_age
xml_hsts_attr['includeSubdomains'] = str(hsts_subdomains)
xml_hsts = Element('httpStrictTransportSecurity', attrib=xml_hsts_attr)
xml_result.append(xml_hsts)
return xml_result
def UrhoWriteTriggers(triggersList, filename, fOptions):
triggersElem = ET.Element('animation')
for trigger in triggersList:
triggerElem = ET.SubElement(triggersElem, "trigger")
if trigger.time is not None:
triggerElem.set("time", FloatToString(trigger.time))
if trigger.ratio is not None:
triggerElem.set("normalizedtime", FloatToString(trigger.ratio))
# We use a string variant, for other types See typeNames[] in Variant.cpp
# and XMLElement::GetVariant()
triggerElem.set("type", "String")
triggerElem.set("value", str(trigger.data))
WriteXmlFile(triggersElem, filename, fOptions)
#--------------------
# Utils
#--------------------
# Search for the most complete element mask
def save(self):
root = ET.Element('TS')
if self.version:
root.attrib['version'] = self.version
if self.language:
root.attrib['language'] = self.language
for ctx in sorted(self.__contexts.itervalues()):
ctx.save(root)
rough_string = ET.tostring(root, 'utf-8')
reparsed = minidom.parseString(rough_string)
text = reparsed.toprettyxml(indent=" ")
text = text.encode('utf-8')
with open(self.__file, 'wb') as f:
f.write(text)
def xml(self):
el = ET.Element(self.name)
keys = self.attrs.keys()
keys = sorted(keys)
for a in keys:
value = self.attrs[a]
if isinstance(value, bool):
el.set(a, str(value).lower())
else:
el.set(a, str(value))
if self.body:
el.text = self.body
for verb in self.verbs:
el.append(verb.xml())
return el
def export(self, filename):
self.nodes, self.renderer = self.compute()
initWidth, initHeight = (self.options['initialWidth'],
self.options['initialHeight'])
doc = ElementTree.Element('svg', width=str(initWidth),
height=str(initHeight))
transform = self.getTranslation()
trans = ElementTree.SubElement(doc, 'g', transform=transform)
ElementTree.SubElement(trans, 'g', attrib={'class': 'dummy-layer'})
mainLayer = self.add_main(trans)
self.add_timeline(mainLayer)
if self.options['showTicks']:
self.add_axis(mainLayer)
self.add_links(mainLayer)
self.add_labels(mainLayer)
self.add_dots(mainLayer)
svglines = ElementTree.tostring(doc)
with open(filename, 'wb') as fid:
fid.write(svglines)
return svglines
def writeXML(dataObj, fname, dtdPath):
if dataObj.predSev == 0:
sev = "ABSENT"
elif dataObj.predSev == 1:
sev = "MILD"
elif dataObj.predSev == 2:
sev = "MODERATE"
elif dataObj.predSev == 3:
sev = "SEVERE"
root = ET.Element("RDoC")
ET.SubElement(root, "TEXT").text = dataObj.text.content
tags = ET.SubElement(root, "TAGS")
pos_val = ET.SubElement(tags, "POSITIVE_VALENCE")
pos_val.set('score', sev)
pos_val.set('annotated_by', dataObj.Nannotators)
tree = ET.ElementTree(root)
tree.write(fname)
def get_tdblock_as_xmlobj(self, db_details):
""" To get the testdata blocks from the database as xml object """
rootobj = False
td_collection = db_details.get('td_collection')
td_document = db_details.get('td_document')
if td_collection is not None and td_document is not None:
tddoc = self.get_doc_from_db(td_collection, td_document)
else:
tddoc = False
if tddoc is not False:
root = ET.Element('data')
self.convert_tddict_to_xmlobj(root, tddoc['data'])
rootobj = root
return rootobj
def get_globalblock_as_xmlobj(self, db_details):
""" To get the global block from the database as xml object """
globalobj = False
global_collection = db_details.get('global_collection')
global_document = db_details.get('global_document')
if global_collection is not None and global_document is not None:
globaldoc = self.get_doc_from_db(global_collection,
global_document)
else:
globaldoc = False
if globaldoc is not False:
global_root = ET.Element('global')
self.convert_tddict_to_xmlobj(global_root, globaldoc['global'])
globalobj = global_root
return globalobj
def sub_from_varconfigfile(string, varconfigfile, start_pat="${", end_pat="}"):
""" """
try:
# when varconfigfile is an XMl object(root element) - this happens
# only when varconfigfile is taken from database server
if isinstance(varconfigfile, ElementTree.Element) is True:
cfg_elem_obj = ConfigurationElement("Varconfig_from_database",
start_pat, end_pat)
cfg_elem_obj.parse_data(varconfigfile, elem_type="xml_object")
else:
cfg_elem_obj = ConfigurationElement(varconfigfile, start_pat, end_pat)
cfg_elem_obj.parse_data(varconfigfile)
newstring = cfg_elem_obj.expand_variables(string)
except TypeError as exception:
print_info("At least one of the variables in command string is not found in " + varconfigfile)
#print_exception(exception)
return False
return newstring
def get_list_from_varconfigfile(string, varconfigfile, start_pat="${", end_pat="}"):
""" """
try:
# when varconfigfile is an XMl object(root element) - this happens
# only when varconfigfile is taken from database server
if isinstance(varconfigfile, ElementTree.Element) is True:
cfg_elem_obj = ConfigurationElement("Varconfig_from_database",
start_pat, end_pat)
cfg_elem_obj.parse_data(varconfigfile, elem_type="xml_object")
else:
cfg_elem_obj = ConfigurationElement(varconfigfile, start_pat, end_pat)
cfg_elem_obj.parse_data(varconfigfile)
newstring = cfg_elem_obj.get_list(string)
except TypeError as exception:
print_info("At least one of the variables in command string is not found in " + varconfigfile)
#print_exception(exception)
return False
return newstring
def getElementWithTagAttribValueMatch(start, tag, attrib, value):
"""
When start is an xml datafile, it finds the root and first element with:
tag, attrib, value.
Or when it's an xml element, it finds the first child element with:
tag, attrib, value.
If there is not a match, it returns False.
"""
node = False
if isinstance(start, (file, str)):
# check if file exist here
if file_Utils.fileExists(start):
node = ElementTree.parse(start).getroot()
else:
print_warning('The file={0} is not found.'.format(start))
elif isinstance(start, ElementTree.Element):
node = start
if node is not False and node is not None:
elementName = ".//%s[@%s='%s']" % (tag, attrib, value)
element = node.find(elementName)
else:
element = node
return element
def getChildElementWithSpecificXpath(start, xpath):
"""
This method takes a xml file or parent element as input and finds the first child
containing specified xpath
Returns the child element.
Arguments:
start = xml file or parent element
xpath = a valid xml path value as supported by python, refer https://docs.python.org/2/library/xml.etree.elementtree.html
"""
node = False
if isinstance(start, (file, str)):
# check if file exist here
if file_Utils.fileExists(start):
node = ElementTree.parse(start).getroot()
else:
print_warning('The file={0} is not found.'.format(start))
elif isinstance(start, ElementTree.Element):
node = start
if node is not False or node is not None:
element = node.find(xpath)
else:
element = False
return element
def confirm_url(question, attrib_value):
""" This function recursively checks whether a given url is a valid
repository or not. If it isn't, it promps the user to enter a new url and
checks that.
:Arguments:
1. question (xml.etree.ElementTree.Element) = The question tag from data.xml
2. attrib_value (str) = the url to be checked
:Returns:
1. attrib_value (str) = valid url
"""
if not check_url_is_a_valid_repo(attrib_value):
attrib_value = raw_input("Please enter a valid URL: ")
attrib_value = confirm_url(question, attrib_value)
return attrib_value
def add_drivers_to_tags(tag, drivers, driver_numbers):
""" This function appends the driver tags sets the attributes and
attribute names to the corresponding driver tag
:Arguments:
1. tag (xml.etree.ElementTree.Element) = Current tag to which the newly
formed driver tags may be appended
2. drivers (list[str]) = list of driver names available to the user
3. driver_numbers (list[int]) = list of the numbers which correspond to
the driver names that the user wants.
"""
print "Selected drivers:"
for driver_number in driver_numbers:
driver_number = driver_number * 2 - 1
if driver_number > len(drivers):
print "Corresponding driver for " + str((driver_number+1)/2) +\
" not found."
continue
print str((driver_number+1)/2) + ". " + drivers[driver_number]
driver_tag = Element("driver")
driver_tag.set("name", drivers[driver_number])
tag.append(driver_tag)
def diff_attributes_values(root, node, tag, attribute, values):
""" This function creates tags in the new xml file which contain
information about the value tags from data.xml
:Arguments:
1. root (xml.etree.ElementTree.Element) = parent of the current node from
data.xml
2. node (xml.etree.ElementTree.Element) = current node from data.xml
3. tag (xml.etree.ElementTree.Element) = current node that would be added
to the new xml file
4. attribute (xml.etree.ElementTree.Element) = The current attribure tag
5. values (list[xml.etree.ElementTree.Element]) = complete list of
value tags in that particular nesting from data.xml
"""
for i in range(0, len(values)):
info = values[i].find("info")
if info is not None:
print info.text
print "Warrior recommends that all these dependencies be installed on" \
" your machine."
get_answer_for_depen(root, node, tag, attribute, values)
def main():
""" This function basically creates the xml file by calling various other
functions, runs the file and then saves it.
"""
root = Element('data')
dir_path = os.path.dirname(os.path.realpath(sys.argv[0]))
rel_path = get_relative_path(dir_path, "data.xml")
tree = xml.etree.ElementTree.parse(rel_path)
input_root = tree.getroot()
nodes = get_firstlevel_children(input_root, "tag")
populate_xml(root, nodes)
temp_xml = 'temp.xml'
pretty_xml = minidom.parseString(xml.etree.ElementTree.tostring(root))\
.toprettyxml(indent=" ")
with open(temp_xml, "w") as config_file:
config_file.write(pretty_xml)
config_file.flush()
config_file.close()
save_file(temp_xml)
def _getqrtag(self, text, rect):
qr = qrcode.QRCode(error_correction=qrcode.constants.ERROR_CORRECT_L)
qr.add_data(text)
qr.make(fit=True)
img = qr.make_image()
bio = io.BytesIO()
img.save(bio)
pngqr = bio.getvalue()
base64qr = base64.b64encode(pngqr)
#<image x="110" y="20" width="280px" height="160px" xlink:href="data:image/png;base64,……"/>
imagetag = ElementTree.Element("image")
imagetag.set("xlink:href", "data:image/png;base64,"+base64qr.decode("ascii"))
imagetag.set("x", str(rect.x))
imagetag.set("y", str(rect.y))
imagetag.set("width", str(rect.w))
imagetag.set("height", str(rect.h))
return imagetag
def _prepare_document(self):
"""
Build the main document node and set xml namespaces.
"""
self._xml = ET.Element("Document")
self._xml.set("xmlns",
"urn:iso:std:iso:20022:tech:xsd:" + self.schema)
self._xml.set("xmlns:xsi",
"http://www.w3.org/2001/XMLSchema-instance")
ET.register_namespace("",
"urn:iso:std:iso:20022:tech:xsd:" + self.schema)
ET.register_namespace("xsi",
"http://www.w3.org/2001/XMLSchema-instance")
CstmrDrctDbtInitn_node = ET.Element("CstmrDrctDbtInitn")
self._xml.append(CstmrDrctDbtInitn_node)
def mountIso(self,instance,dev, image):
tree = ElementTree.fromstring(self.getInsXMLDesc(instance,0))
for disk in tree.findall('devices/disk'):
if disk.get('device') == 'cdrom':
for elm in disk:
if elm.tag == 'target':
if elm.get('dev') == dev:
src_media = ElementTree.Element('source')
src_media.set('file', image)
disk.append(src_media)
if instance.state()[0] == 1:
xml_disk = ElementTree.tostring(disk)
try:
instance.attachDevice(xml_disk)
except libvirt.libvirtError,e:
return '??????????{result}'.format(result=e.get_error_message())
xmldom = self.getInsXMLDesc(instance,1)
if instance.state()[0] == 5:
xmldom = ElementTree.tostring(tree)
try:
return self.defineXML(xmldom)
except libvirt.libvirtError,e:
return '??????????{result}'.format(result=e.get_error_message())
def setInterfaceBandwidth(self,instance,port,bandwidth):
'''????'''
domXml = instance.XMLDesc(0)
root = ElementTree.fromstring(domXml)
try:
for dev in root.findall('.//devices/'):
if dev.tag == 'interface':
for iter in dev:
if iter.tag == 'target' and iter.get('dev') == port:
bwXml = ElementTree.SubElement(dev,'bandwidth')
inbdXml = ElementTree.Element('inbound')
inbdXml.set('average',str(int(bandwidth)*1024))
inbdXml.set('peak',str(int(bandwidth)*1024))
inbdXml.set('burst','1024')
outbdXml = ElementTree.Element('outbound')
outbdXml.set('average',str(int(bandwidth)*1024))
outbdXml.set('peak',str(int(bandwidth)*1024))
outbdXml.set('burst','1024')
bwXml.append(inbdXml)
bwXml.append(outbdXml)
domXml = ElementTree.tostring(root)
except Exception,e:
return {"status":"faild",'data':e}
if self.defineXML(domXml):return {"status":"success",'data':None}
def create_xml_page(dictionary: dict, creator_name='DocSeg') -> 'Page':
page = Page.from_dict(dictionary)
# Create xml
root = ET.Element('PcGts')
root.set('xmlns', _ns['p'])
# Metadata
generated_on = str(datetime.datetime.now())
metadata = ET.SubElement(root, 'Metadata')
creator = ET.SubElement(metadata, 'Creator')
creator.text = creator_name
created = ET.SubElement(metadata, 'Created')
created.text = generated_on
last_change = ET.SubElement(metadata, 'LastChange')
last_change.text = generated_on
root.append(page.to_xml())
for k, v in _attribs.items():
root.attrib[k] = v
return root
def _build_xml(self, tag, val=None, children=Void()):
"""Construct an xml element with a given tag, value, and children."""
attribs = {'val': val} if val is not None else {}
# If children is a list then convert each element separately, if it is
# a tuple, treat it as a single element
if isinstance(children, self.Void):
children = ()
elif isinstance(children, list):
children = [self._from_value(child) for child in children]
else:
children = (self._from_value(children),)
xml = ET.Element(tag, attribs)
xml.extend(children)
return xml
def query(self, cmd, state, *args, **kwargs):
"""Create an XML string for the 'interp' command."""
# Attrs:
# raw - ?
# bool - Verbose output
# int - The current state id
# Args:
# string - The query to evaluate
elt = ET.Element('call', {'val': 'interp',
'raw': 'true',
'verbose': 'true',
'id': str(state)})
elt.text = cmd
return ('Query',
ET.tostring(elt,
kwargs.get('encoding', 'utf-8')))
def send_feedback(self):
"""Print stored items to console/Alfred as XML."""
root = ET.Element('items')
for item in self._items:
root.append(item.elem)
sys.stdout.write('<?xml version="1.0" encoding="utf-8"?>\n')
sys.stdout.write(ET.tostring(root).encode('utf-8'))
sys.stdout.flush()
####################################################################
# Updating methods
####################################################################
def build_soap_xml(items, namespace=None):
"""Build a SOAP XML.
:param items: a list of dictionaries where key is the element name
and the value is the element text.
:param namespace: the namespace for the elements, None for no
namespace. Defaults to None
:returns: a XML string.
"""
def _create_element(name, value=None):
xml_string = name
if namespace:
xml_string = "{%(namespace)s}%(item)s" % {'namespace': namespace,
'item': xml_string}
element = ElementTree.Element(xml_string)
element.text = value
return element
soap_namespace = "http://www.w3.org/2003/05/soap-envelope"
envelope_element = ElementTree.Element("{%s}Envelope" % soap_namespace)
body_element = ElementTree.Element("{%s}Body" % soap_namespace)
for item in items:
for i in item:
insertion_point = _create_element(i)
if isinstance(item[i], dict):
for j, value in item[i].items():
insertion_point.append(_create_element(j, value))
else:
insertion_point.text = item[i]
body_element.append(insertion_point)
envelope_element.append(body_element)
return ElementTree.tostring(envelope_element)
def onFinishBuilding(app, exception):
currentVersion = app.env.config["version"]
if "latest_docs_version" in app.env.config["html_context"].keys():
latestVersion = app.env.config["html_context"]["latest_docs_version"]
else:
latestVersion = "dev"
base_domain = app.env.config["html_context"]["SITEMAP_DOMAIN"]
file_path = "./_build/algolia_index/index.json"
sitemap_path = "./_build/sitemap/sitemap_" + currentVersion + ".xml"
checkDirectory(file_path)
checkDirectory(sitemap_path)
f = open(file_path, 'w+')
root = ET.Element("urlset")
root.set("xmlns", "http://www.sitemaps.org/schemas/sitemap/0.9")
for link in indexObjs:
url = ET.SubElement(root, "url")
ET.SubElement(url, "loc").text = base_domain + str(currentVersion) + "/" + link["url"]
ET.SubElement(url, "changefreq").text = "daily"
ET.SubElement(url, "priority").text = "1" if ( currentVersion == latestVersion ) else "0.5"
ET.ElementTree(root).write(sitemap_path)
f.write(json.dumps(indexObjs))
def eccu_doc_for_purge_dir(path_to_purge):
root = ET.Element('eccu')
parent = root
names = (name for name in path_to_purge.split('/') if name)
for name in names:
parent = ET.SubElement(parent, 'match:recursive-dirs', {'value': name})
revalidate = ET.SubElement(parent, 'revalidate')
revalidate.text = 'now'
return ET.tostring(root, encoding='ascii') #'<?xml version="1.0"?>\n' + ET.tostring(root)
#xml = StringIO()
#xml.write('<?xml version="1.0"?>\n')
#xml.write(ET.tostring(root))
#return xml
def add_to_document(self, parent):
"""Adds an ``Argument`` object to this ElementTree document.
Adds an <arg> subelement to the parent element, typically <args>
and sets up its subelements with their respective text.
:param parent: An ``ET.Element`` to be the parent of a new <arg> subelement
:returns: An ``ET.Element`` object representing this argument.
"""
arg = ET.SubElement(parent, "arg")
arg.set("name", self.name)
if self.title is not None:
ET.SubElement(arg, "title").text = self.title
if self.description is not None:
ET.SubElement(arg, "description").text = self.description
if self.validation is not None:
ET.SubElement(arg, "validation").text = self.validation
# add all other subelements to this Argument, represented by (tag, text)
subelements = [
("data_type", self.data_type),
("required_on_edit", self.required_on_edit),
("required_on_create", self.required_on_create)
]
for name, value in subelements:
ET.SubElement(arg, name).text = str(value).lower()
return arg
def to_xml(self):
"""Creates an ``ET.Element`` representing self, then returns it.
:returns root, an ``ET.Element`` representing this scheme.
"""
root = ET.Element("scheme")
ET.SubElement(root, "title").text = self.title
# add a description subelement if it's defined
if self.description is not None:
ET.SubElement(root, "description").text = self.description
# add all other subelements to this Scheme, represented by (tag, text)
subelements = [
("use_external_validation", self.use_external_validation),
("use_single_instance", self.use_single_instance),
("streaming_mode", self.streaming_mode)
]
for name, value in subelements:
ET.SubElement(root, name).text = str(value).lower()
endpoint = ET.SubElement(root, "endpoint")
args = ET.SubElement(endpoint, "args")
# add arguments as subelements to the <args> element
for arg in self.arguments:
arg.add_to_document(args)
return root
def write_to(self, stream):
"""Write an XML representation of self, an ``Event`` object, to the given stream.
The ``Event`` object will only be written if its data field is defined,
otherwise a ``ValueError`` is raised.
:param stream: stream to write XML to.
"""
if self.data is None:
raise ValueError("Events must have at least the data field set to be written to XML.")
event = ET.Element("event")
if self.stanza is not None:
event.set("stanza", self.stanza)
event.set("unbroken", str(int(self.unbroken)))
# if a time isn't set, let Splunk guess by not creating a <time> element
if self.time is not None:
ET.SubElement(event, "time").text = str(self.time)
# add all other subelements to this Event, represented by (tag, text)
subelements = [
("source", self.source),
("sourcetype", self.sourceType),
("index", self.index),
("host", self.host),
("data", self.data)
]
for node, value in subelements:
if value is not None:
ET.SubElement(event, node).text = value
if self.done:
ET.SubElement(event, "done")
stream.write(ET.tostring(event))
stream.flush()
def send_feedback(self):
"""Print stored items to console/Alfred as XML."""
root = ET.Element('items')
for item in self._items:
root.append(item.elem)
sys.stdout.write('<?xml version="1.0" encoding="utf-8"?>\n')
sys.stdout.write(ET.tostring(root).encode('utf-8'))
sys.stdout.flush()
####################################################################
# Updating methods
####################################################################
def send_feedback(self):
"""Print stored items to console/Alfred as XML."""
root = ET.Element('items')
for item in self._items:
root.append(item.elem)
sys.stdout.write('<?xml version="1.0" encoding="utf-8"?>\n')
sys.stdout.write(ET.tostring(root).encode('utf-8'))
sys.stdout.flush()
####################################################################
# Updating methods
####################################################################
def CDATA(text=None):
"""
A CDATA element factory function that uses the function itself as the tag
(based on the Comment factory function in the ElementTree implementation).
"""
element = etree.Element(CDATA)
element.text = text
return element
# We're replacing the _write method of the ElementTree class so that it would
# recognize and correctly print out CDATA sections.
def __walk(self, node, parent):
name = self.__genName(node.tag)
tag = self.__getNamespace(node.tag)
if parent is None:
self.root = name
self.lines.append("%s = ET.Element(%s)" % (name, tag))
else:
self.lines.append("%s = ET.SubElement(%s, %s)" % (name, parent, tag))
# handles text
try:
t = node.text.strip()
if t == '': t = None
except:
t = None
if t is not None:
self.lines.append("%s.text = kwargs.get('', '%s') # PARAMETERIZE" % (name, t))
# handles attributes
for key,val in node.items():
key = self.__getNamespace(key)
self.lines.append("%s.set(%s, kwargs.get('', '%s')) # PARAMETERIZE" % (name, key, val))
for i in node.getchildren():
self.__walk(i, name)
def add_to_document(self, parent):
"""Adds an ``Argument`` object to this ElementTree document.
Adds an <arg> subelement to the parent element, typically <args>
and sets up its subelements with their respective text.
:param parent: An ``ET.Element`` to be the parent of a new <arg> subelement
:returns: An ``ET.Element`` object representing this argument.
"""
arg = ET.SubElement(parent, "arg")
arg.set("name", self.name)
if self.title is not None:
ET.SubElement(arg, "title").text = self.title
if self.description is not None:
ET.SubElement(arg, "description").text = self.description
if self.validation is not None:
ET.SubElement(arg, "validation").text = self.validation
# add all other subelements to this Argument, represented by (tag, text)
subelements = [
("data_type", self.data_type),
("required_on_edit", self.required_on_edit),
("required_on_create", self.required_on_create)
]
for name, value in subelements:
ET.SubElement(arg, name).text = str(value).lower()
return arg
def to_xml(self):
"""Creates an ``ET.Element`` representing self, then returns it.
:returns root, an ``ET.Element`` representing this scheme.
"""
root = ET.Element("scheme")
ET.SubElement(root, "title").text = self.title
# add a description subelement if it's defined
if self.description is not None:
ET.SubElement(root, "description").text = self.description
# add all other subelements to this Scheme, represented by (tag, text)
subelements = [
("use_external_validation", self.use_external_validation),
("use_single_instance", self.use_single_instance),
("streaming_mode", self.streaming_mode)
]
for name, value in subelements:
ET.SubElement(root, name).text = str(value).lower()
endpoint = ET.SubElement(root, "endpoint")
args = ET.SubElement(endpoint, "args")
# add arguments as subelements to the <args> element
for arg in self.arguments:
arg.add_to_document(args)
return root
def write_to(self, stream):
"""Write an XML representation of self, an ``Event`` object, to the given stream.
The ``Event`` object will only be written if its data field is defined,
otherwise a ``ValueError`` is raised.
:param stream: stream to write XML to.
"""
if self.data is None:
raise ValueError("Events must have at least the data field set to be written to XML.")
event = ET.Element("event")
if self.stanza is not None:
event.set("stanza", self.stanza)
event.set("unbroken", str(int(self.unbroken)))
# if a time isn't set, let Splunk guess by not creating a <time> element
if self.time is not None:
ET.SubElement(event, "time").text = str(self.time)
# add all other subelements to this Event, represented by (tag, text)
subelements = [
("source", self.source),
("sourcetype", self.sourceType),
("index", self.index),
("host", self.host),
("data", self.data)
]
for node, value in subelements:
if value is not None:
ET.SubElement(event, node).text = value
if self.done:
ET.SubElement(event, "done")
stream.write(ET.tostring(event))
stream.flush()