Python xml.etree.ElementTree 模块,dump() 实例源码
我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用xml.etree.ElementTree.dump()。
def get_media_servers(self):
try:
requests.packages.urllib3.disable_warnings()
url = "http://"+self.server_ip+":47365/getMediaServers"
print(url)
r = requests.get(url)
root = ET.fromstring(r.content)
#print(ET.dump(root))
#for atype in root.findall('mediaserver'):
for child in root:
print("+", child.tag, child.get('name'), child.get('udn'))
return r
except Exception as err:
print("Exception get_media_servers: {0}".format(err))
return None
def get_zones(self):
try:
requests.packages.urllib3.disable_warnings()
url = "http://"+self.server_ip+":47365/getZones"
print(url)
r = requests.get(url)
root = ET.fromstring(r.content)
#print(ET.dump(root))
for atype in root.findall('zones'):
for child in atype:
print(child.tag, child.get('udn'))
for room in child:
print("+", room.tag, room.get('name'), room.get('udn'))
for renderer in room:
print(" +", renderer.tag, renderer.get('name'), renderer.get('udn'))
return r
except Exception as err:
print("Exception get_zones: {0}".format(err))
return None
def _dl_first_xml(self, first_url):
log.o(log_text.method_got_first_url(first_url))
# NOTE download and parse as xml
first_xml = b.dl_html(first_url)
var._['_raw_first_xml'] = first_xml # save raw xml text
try:
first = ET.fromstring(first_xml)
except Exception as e:
er = err.ParseXMLError('parse first xml text failed, first_url ', first_url)
er.text = first
raise er from e
# check first Error
if first.find('error') != None:
raise err.MethodError('first xml info Error', ET.dump(first.find('error')))
var._['_vid_info']['vip'] = str(first.find('channel').get('vip'))
return first
def show_xml(document):
from xml.etree import ElementTree as XML
xml_document = XML.Element("results")
legs_xml = XML.SubElement(xml_document, 'legs')
for n, leg in enumerate(document['legs'], start=1):
leg_xml = XML.SubElement(legs_xml, 'leg', n=str(n))
leg_xml.text = leg
teams_xml = XML.SubElement(xml_document, 'teams')
for team in document['teams']:
team_xml = XML.SubElement(teams_xml, "team")
name_xml = XML.SubElement(team_xml, "name")
name_xml.text = team['name']
position_xml = XML.SubElement(team_xml, "position")
for n, position in enumerate(team['position'], start=1):
leg_xml = XML.SubElement(position_xml, "leg", n=str(n))
leg_xml.text = str(position)
pi = XML.ProcessingInstruction("xml", 'version="1.0"')
XML.dump(pi)
XML.dump(xml_document)
def _analyze_entity_node(self, node, entity):
interface = node.get("interface")
if interface is not None:
mac_address = self.mac_addr_helper.get_interface_mac_addr(interface)
entity.set_interface(interface, mac_address)
for child in node.getchildren():
if child.tag == "packet":
# pre processing "include" node
# TODO: Possible to be deleted
#self.__add_include_to_packet_node(child)
# analyze packet node!
self._set_packet_node(child, entity)
if self.print_tc_preprocessing is True:
self.__indent(node)
ET.dump(node)
def writeToExternalFile( self, filename, metadata ):
tree = self.convertMetadataToXML( self, metadata )
#ET.dump(tree)
tree.write(filename, encoding='utf-8')
def writeToExternalFile( self, filename, metadata ):
tree = self.convertMetadataToXML( self, metadata )
#ET.dump(tree)
tree.write(filename, encoding='utf-8')
def bug_xmltoolkitX1():
"""
dump() doesn't flush the output buffer
>>> tree = ET.XML("<doc><table><tbody/></table></doc>")
>>> ET.dump(tree); print("tail")
<doc><table><tbody /></table></doc>
tail
"""
def __callback_emm_state(self,msg):
"""
Given the EMM message, update EMM state and substate.
:param msg: the NAS signaling message that carries EMM state
"""
self.__emm_status.state = msg.data["EMM State"]
self.__emm_status.substate = msg.data["EMM Substate"]
tmp = msg.data["PLMN"].split('-')
self.__emm_status.guti.mcc = tmp[0]
self.__emm_status.guti.mnc = tmp[1]
self.__emm_status.guti.mme_group_id = msg.data["GUTI MME Group ID"]
self.__emm_status.guti.mme_code = msg.data["GUTI MME Code"]
self.__emm_status.guti.m_tmsi = msg.data["GUTI M-TMSI"]
self.log_info(self.__emm_status.dump())
#broadcast
state = {
'conn state': self.__emm_status.state,
'conn substate': self.__emm_status.substate,
}
# self.log_info('EMM_STATE', str(state))
self.broadcast_info('EMM_STATE', state)
if self.callflow_state_machine.update_state(msg):
self.log_info("Call flow status: " + str(self.callflow_state_machine.get_current_state()))
def __callback_emm(self,msg):
"""
Extract EMM status and configurations from the NAS messages
:param msg: the EMM NAS message
"""
for field in msg.data.iter('field'):
if field.get('show') == "UE network capability":
# print str(ET.dump(field))
for val in field.iter('field'):
if val.get('name') == 'nas_eps.emm.acc_csfb_cap':
csfb_cap = True if val.get('show') == '1' else False
self.log_info("CSFB Capbility: " + str(csfb_cap))
self.broadcast_info('CSFB_CAP', str(csfb_cap))
if field.get('show')=="EPS mobile identity - GUTI":
field_val={}
field_val['e212.mcc']=None
field_val['e212.mnc']=None
field_val['nas_eps.emm.mme_grp_id']=None
field_val['nas_eps.emm.mme_code']=None
field_val['nas_eps.emm.m_tmsi']=None
for val in field.iter('field'):
field_val[val.get('name')]=val.get('show')
self.__emm_status.guti.mcc=field_val['e212.mcc']
self.__emm_status.guti.mnc=field_val['e212.mnc']
self.__emm_status.guti.mme_group_id=field_val['nas_eps.emm.mme_grp_id']
self.__emm_status.guti.mme_code=field_val['nas_eps.emm.mme_code']
self.__emm_status.guti.m_tmsi=field_val['nas_eps.emm.m_tmsi']
def dump(self):
"""
Report the EMM status
:returns: a string that encodes EMM status
"""
return (self.__class__.__name__
+ ' EMM.state='+xstr(self.state) + ' EMM.substate='+xstr(self.substate)
+ ' MCC=' + xstr(self.guti.mcc) + ' MNC=' + xstr(self.guti.mnc)
+ ' MMEGI=' + xstr(self.guti.mme_group_id) + ' MMEC=' + xstr(self.guti.mme_code)
+ ' TMSI=' + xstr(self.guti.m_tmsi))
def dump(self):
# TODO: deal with potential KeyError here
dump_text = ' EPS_ID=' + xstr(self.eps_id) + ' type=' + xstr(bearer_type[self.type]) \
+ ":\n\t"+self.qos.dump_rate()+'\n\t'+self.qos.dump_delivery()
return dump_text
def bug_xmltoolkitX1():
"""
dump() doesn't flush the output buffer
>>> tree = ET.XML("<doc><table><tbody/></table></doc>")
>>> ET.dump(tree); sys.stdout.write("tail")
<doc><table><tbody /></table></doc>
tail
"""
def bug_xmltoolkitX1():
"""
dump() doesn't flush the output buffer
>>> tree = ET.XML("<doc><table><tbody/></table></doc>")
>>> ET.dump(tree); sys.stdout.write("tail")
<doc><table><tbody /></table></doc>
tail
"""
def compare_resources(module, res1, res2):
# we now have 2 nodes that we can compare, so lets dump them into files for comparring
n1_file_fd, n1_tmp_path = tempfile.mkstemp()
n2_file_fd, n2_tmp_path = tempfile.mkstemp()
n1_file = open(n1_tmp_path, 'w')
n2_file = open(n2_tmp_path, 'w')
## dump the XML resource definitions into temporary files
sys.stdout = n1_file
ET.dump(res1)
sys.stdout = n2_file
ET.dump(res2)
sys.stdout = sys.__stdout__
##
n1_file.close()
n2_file.close()
## normalize the files and store results in new files - this also removes some unimportant spaces and stuff
n3_file_fd, n3_tmp_path = tempfile.mkstemp()
n4_file_fd, n4_tmp_path = tempfile.mkstemp()
rc, out, err = module.run_command('xmllint --format --output ' + n3_tmp_path + ' ' + n1_tmp_path)
rc, out, err = module.run_command('xmllint --format --output ' + n4_tmp_path + ' ' + n2_tmp_path)
## addd files that should be cleaned up
module.add_cleanup_file(n1_tmp_path)
module.add_cleanup_file(n2_tmp_path)
module.add_cleanup_file(n3_tmp_path)
module.add_cleanup_file(n4_tmp_path)
## now compare files
diff = ''
rc, out, err = module.run_command('diff ' + n3_tmp_path + ' ' + n4_tmp_path)
if rc != 0:
# if there was difference then show the diff
n3_file = open(n3_tmp_path, 'r+')
n4_file = open(n4_tmp_path, 'r+')
diff = {
'before_header': '',
'before': to_native(b('').join(n3_file.readlines())),
'after_header': '',
'after': to_native(b('').join(n4_file.readlines())),
}
return rc, diff
def create_xml_file(self, processed_object):
root = ET.Element('data')
for keys in processed_object:
child = ET.SubElement(root, keys)
child.text = str(processed_object[keys])
ET.dump(root)
tree = ET.ElementTree(root)
tree.write('results.xml')
def bug_xmltoolkitX1():
"""
dump() doesn't flush the output buffer
>>> tree = ET.XML("<doc><table><tbody/></table></doc>")
>>> ET.dump(tree); sys.stdout.write("tail")
<doc><table><tbody /></table></doc>
tail
"""
def _write_xml(root, xml_file):
from xml.dom import minidom
#print(ElementTree.dump(root))
dom = minidom.parseString(ElementTree.tostring(root))
output = open(xml_file, 'w')
output.write(dom.toprettyxml(indent=" "))
output.close()
return xml_file
def bug_xmltoolkitX1():
"""
dump() doesn't flush the output buffer
>>> tree = ET.XML("<doc><table><tbody/></table></doc>")
>>> ET.dump(tree); sys.stdout.write("tail")
<doc><table><tbody /></table></doc>
tail
"""
def printToConsole(self):
'''
FOR DEBUGGING ONLY! Print the full XML document to the console.
'''
ET.dump(self.root)
def check(self, fileName, data):
suffix = QFileInfo(fileName).suffix().lower()
if suffix=='':
if type(data)!=list:
suffix = 'xml'
else:
suffix = 'csv'
print("Get: [{}]".format(fileName))
try:
if suffix=='xml':
if bool(data)==False:
tree = ET.ElementTree(file=fileName)
data = tree.getroot()
if self.args.file_data:
ET.dump(data)
n = data.tag=='pyslvs' and data.find('info')!=None
elif suffix=='csv':
if bool(data)==False:
with open(fileName, newline=str()) as stream:
reader = csv.reader(stream, delimiter=' ', quotechar='|')
for row in reader:
data += ' '.join(row).split('\t,')
if self.args.file_data:
print(data)
n = (len([e for e, x in enumerate(data) if x=='_info_'])==4 and
len([e for e, x in enumerate(data) if x=='_table_'])==8 and
len([e for e, x in enumerate(data) if x=='_design_'])==2)
return n, data
except:
return False, list()
def __callback_esm_state(self,msg):
"""
Given the ESM message, update ESM state
:param msg: the NAS signaling message that carries EMM state
"""
self.__cur_eps_id = msg.data["EPS bearer ID"]
if self.__cur_eps_id not in self.__esm_status:
self.__esm_status[self.__cur_eps_id] = EsmStatus()
# print self.__cur_eps_id, str(self.__esm_status), str(bearer_type), msg.data["EPS bearer type"], str(msg.data['timestamp'])
self.__esm_status[self.__cur_eps_id].eps_id = int(msg.data["EPS bearer ID"])
self.__esm_status[self.__cur_eps_id].type = int(msg.data["EPS bearer type"])
if self.__esm_status[self.__cur_eps_id].type == 255:
self.__esm_status[self.__cur_eps_id].type = 1
self.__esm_status[self.__cur_eps_id].qos.qci = msg.data["QCI"]
self.__esm_status[self.__cur_eps_id].qos.max_bitrate_ulink = msg.data["UL MBR"]
self.__esm_status[self.__cur_eps_id].qos.max_bitrate_dlink = msg.data["DL MBR"]
self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_ulink=msg.data["UL GBR"]
self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_dlink=msg.data["DL MBR"]
self.__esm_status[self.__cur_eps_id].qos.max_bitrate_ulink_ext=msg.data["UL MBR ext"]
self.__esm_status[self.__cur_eps_id].qos.max_bitrate_dlink_ext=msg.data["DL MBR ext"]
self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_ulink_ext=msg.data["UL GBR ext"]
self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_dlink_ext=msg.data["DL MBR ext"]
self.__esm_status[self.__cur_eps_id].timestamp = msg.data["timestamp"]
self.log_info(self.__esm_status[self.__cur_eps_id].dump())
self.profile.update("LteNasProfile:"+self.__emm_status.profile_id()+".eps.qos:"+bearer_type[self.__esm_status[self.__cur_eps_id].type],
{'qci':self.__esm_status[self.__cur_eps_id].qos.qci,
'max_bitrate_ulink':self.__esm_status[self.__cur_eps_id].qos.max_bitrate_ulink,
'max_bitrate_dlink':self.__esm_status[self.__cur_eps_id].qos.max_bitrate_dlink,
'guaranteed_bitrate_ulink':self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_ulink,
'guaranteed_bitrate_dlink':self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_dlink,
'max_bitrate_ulink_ext':self.__esm_status[self.__cur_eps_id].qos.max_bitrate_ulink_ext,
'max_bitrate_dlink_ext':self.__esm_status[self.__cur_eps_id].qos.max_bitrate_dlink_ext,
'guaranteed_bitrate_ulink_ext':self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_ulink_ext,
'guaranteed_bitrate_dlink_ext':self.__esm_status[self.__cur_eps_id].qos.guaranteed_bitrate_dlink_ext,
})
# broadcast
state = {
'conn state': esm_state[int(msg.data["EPS bearer state"]) - 1],
'timestamp' : str(msg.data['timestamp'])
}
# self.log_info(str(state))
self.broadcast_info('ESM_STATE', state)