Python xml.etree.ElementTree 模块,fromstring() 实例源码
我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用xml.etree.ElementTree.fromstring()。
def _set_boot_device(conn, domain, device):
"""Set the boot device.
:param conn: active libvirt connection.
:param domain: libvirt domain object.
:raises: LibvirtError if failed update domain xml.
"""
parsed = ET.fromstring(domain.XMLDesc())
os = parsed.find('os')
boot_list = os.findall('boot')
# Clear boot list
for boot_el in boot_list:
os.remove(boot_el)
boot_el = ET.SubElement(os, 'boot')
boot_el.set('dev', device)
try:
conn.defineXML(ET.tostring(parsed))
except libvirt.libvirtError as e:
raise isd_exc.LibvirtError(err=e)
def choose_aws_role(assertion):
""" Choose AWS role from SAML assertion """
aws_attribute_role = 'https://aws.amazon.com/SAML/Attributes/Role'
attribute_value_urn = '{urn:oasis:names:tc:SAML:2.0:assertion}AttributeValue'
roles = []
role_tuple = namedtuple("RoleTuple", ["principal_arn", "role_arn"])
root = ET.fromstring(base64.b64decode(assertion))
for saml2attribute in root.iter('{urn:oasis:names:tc:SAML:2.0:assertion}Attribute'):
if saml2attribute.get('Name') == aws_attribute_role:
for saml2attributevalue in saml2attribute.iter(attribute_value_urn):
roles.append(role_tuple(*saml2attributevalue.text.split(',')))
for index, role in enumerate(roles):
role_name = role.role_arn.split('/')[1]
print("%d: %s" % (index+1, role_name))
role_choice = input('Please select the AWS role: ')-1
return roles[role_choice]
def getFileURLs(file_ids):
'''
Retrieve the ftp location for a list of file IDs
@param file_ids: List of file IDs
@return List of ftp locations
'''
info_url='http://modwebsrv.modaps.eosdis.nasa.gov/axis2/services/MODAPSservices/getFileUrls?fileIds='
for file_id in file_ids:
info_url += str(file_id) + ','
info_url = info_url[:-1]
url = urlopen(info_url)
tree = ET.fromstring(url.read().decode())
url.close()
return [ child.text for child in tree ]
def checkCobertura(config, checkoutSteps, buildSteps, packageSteps, **kwargs):
found = False
for s in checkoutSteps:
if s.getPackage().getName().endswith("unittests"): found = True
for s in buildSteps:
if s.getPackage().getName().endswith("unittests"): found = True
for s in packageSteps:
if s.getPackage().getName().endswith("unittests"): found = True
if found:
root = ElementTree.fromstring(config)
publishers = root.find("publishers")
if publishers.find("hudson.plugins.cobertura.CoberturaPublisher") is None:
publishers.append(PLUGIN)
config = ElementTree.tostring(root, encoding="UTF-8")
return config
def _scanDir(self, workspace, dir):
self.__dir = dir
try:
info = ElementTree.fromstring(subprocess.check_output(
["svn", "info", "--xml", dir],
cwd=workspace, universal_newlines=True))
self.__url = info.find('entry/url').text
self.__revision = int(info.find('entry').get('revision'))
self.__repoRoot = info.find('entry/repository/root').text
self.__repoUuid = info.find('entry/repository/uuid').text
status = subprocess.check_output(["svn", "status", dir],
cwd=workspace, universal_newlines=True)
self.__dirty = status != ""
except subprocess.CalledProcessError as e:
raise BuildError("Svn audit failed: " + str(e))
except OSError as e:
raise BuildError("Error calling git: " + str(e))
except ElementTree.ParseError as e:
raise BuildError("Invalid XML received from svn")
def testSetGitShallowClone(self):
self.executeBobJenkinsCmd("set-options -o scm.git.shallow=42 myTestJenkins")
self.executeBobJenkinsCmd("push -q myTestJenkins")
send = self.jenkinsMock.getServerData()
config = ElementTree.fromstring(send[0][1])
for clone in config.iter('hudson.plugins.git.extensions.impl.CloneOption'):
found = 0
for a in clone.getiterator():
if a.tag == 'shallow':
assert(a.text == 'true')
found += 1
if a.tag == 'depth':
assert(a.text == '42')
found += 1
assert(found == 2)
self.executeBobJenkinsCmd("set-options -o scm.git.shallow=-1 myTestJenkins")
with self.assertRaises(Exception) as c:
self.executeBobJenkinsCmd("push -q myTestJenkins")
assert(type(c.exception) == BuildError)
def testShortDescription(self):
self.createComplexRecipes()
self.executeBobJenkinsCmd("set-options --shortdescription myTestJenkinsComplex")
self.executeBobJenkinsCmd("push -q myTestJenkinsComplex")
send = self.jenkinsMock.getServerData()
result_set = set()
try:
for i in send:
if i[0] == '/createItem?name=dependency-two':
for items in ElementTree.fromstring(i[1]).iter('description'):
for line in [x for x in items.itertext()][0].splitlines():
if line.startswith('<li>') and line.endswith('</li>'):
result_set.add(line[4:-5])
except:
print("Malformed Data Recieved")
self.assertEqual(result_set, {'root/dependency-one/dependency-two'})
def FromXml(self, xml):
if not xml:
raise WxPayException("xml?????")
# ??ElementTree
try:
import xml.etree.cElementTree as ET
except ImportError:
import xml.etree.ElementTree as ET
# ??xml
xml2dict = {}
xml_tree = ET.fromstring(xml)
for child_node in xml_tree:
xml2dict[child_node.tag] = child_node.text
self.values = xml2dict
return self.values
# ?????????url??
def _get_defects(self, xml_string):
'''evaluate the xml string returned by cppcheck (on sdterr) and use it to create
a list of defects.
'''
defects = []
for error in ElementTree.fromstring(xml_string).iter('error'):
defect = {}
defect['id'] = error.get('id')
defect['severity'] = error.get('severity')
defect['msg'] = str(error.get('msg')).replace('<','<')
defect['verbose'] = error.get('verbose')
for location in error.findall('location'):
defect['file'] = location.get('file')
defect['line'] = str(int(location.get('line')) - 1)
defects.append(defect)
return defects
def _create_html_index(self, files):
name = self.generator.get_name()
root = ElementTree.fromstring(CPPCHECK_HTML_FILE)
title = root.find('head/title')
title.text = 'cppcheck - report - %s' % name
body = root.find('body')
for div in body.findall('div'):
if div.get('id') == 'page':
page = div
break
for div in page.findall('div'):
if div.get('id') == 'header':
h1 = div.find('h1')
h1.text = 'cppcheck report - %s' % name
if div.get('id') == 'content':
content = div
self._create_html_table(content, files)
s = ElementTree.tostring(root, method='html')
s = CCPCHECK_HTML_TYPE + s
node = self.generator.path.get_bld().find_or_declare('cppcheck/index.html')
node.write(s)
return node
def _create_html_table(self, content, files):
table = ElementTree.fromstring(CPPCHECK_HTML_TABLE)
for name, val in files.items():
f = val['htmlfile']
s = '<tr><td colspan="4"><a href="%s">%s</a></td></tr>\n' % (f,name)
row = ElementTree.fromstring(s)
table.append(row)
errors = sorted(val['errors'], key=lambda e: int(e['line']) if e.has_key('line') else sys.maxint)
for e in errors:
if not e.has_key('line'):
s = '<tr><td></td><td>%s</td><td>%s</td><td>%s</td></tr>\n' % (e['id'], e['severity'], e['msg'])
else:
attr = ''
if e['severity'] == 'error':
attr = 'class="error"'
s = '<tr><td><a href="%s#line-%s">%s</a></td>' % (f, e['line'], e['line'])
s+= '<td>%s</td><td>%s</td><td %s>%s</td></tr>\n' % (e['id'], e['severity'], attr, e['msg'])
row = ElementTree.fromstring(s)
table.append(row)
content.append(table)
def _get_defects(self, xml_string):
'''evaluate the xml string returned by cppcheck (on sdterr) and use it to create
a list of defects.
'''
defects = []
for error in ElementTree.fromstring(xml_string).iter('error'):
defect = {}
defect['id'] = error.get('id')
defect['severity'] = error.get('severity')
defect['msg'] = str(error.get('msg')).replace('<','<')
defect['verbose'] = error.get('verbose')
for location in error.findall('location'):
defect['file'] = location.get('file')
defect['line'] = str(int(location.get('line')) - 1)
defects.append(defect)
return defects
def _create_html_index(self, files):
name = self.generator.get_name()
root = ElementTree.fromstring(CPPCHECK_HTML_FILE)
title = root.find('head/title')
title.text = 'cppcheck - report - %s' % name
body = root.find('body')
for div in body.findall('div'):
if div.get('id') == 'page':
page = div
break
for div in page.findall('div'):
if div.get('id') == 'header':
h1 = div.find('h1')
h1.text = 'cppcheck report - %s' % name
if div.get('id') == 'content':
content = div
self._create_html_table(content, files)
s = ElementTree.tostring(root, method='html')
s = CCPCHECK_HTML_TYPE + s
node = self.generator.path.get_bld().find_or_declare('cppcheck/index.html')
node.write(s)
return node
def _create_html_table(self, content, files):
table = ElementTree.fromstring(CPPCHECK_HTML_TABLE)
for name, val in files.items():
f = val['htmlfile']
s = '<tr><td colspan="4"><a href="%s">%s</a></td></tr>\n' % (f,name)
row = ElementTree.fromstring(s)
table.append(row)
errors = sorted(val['errors'], key=lambda e: int(e['line']) if e.has_key('line') else sys.maxint)
for e in errors:
if not e.has_key('line'):
s = '<tr><td></td><td>%s</td><td>%s</td><td>%s</td></tr>\n' % (e['id'], e['severity'], e['msg'])
else:
attr = ''
if e['severity'] == 'error':
attr = 'class="error"'
s = '<tr><td><a href="%s#line-%s">%s</a></td>' % (f, e['line'], e['line'])
s+= '<td>%s</td><td>%s</td><td %s>%s</td></tr>\n' % (e['id'], e['severity'], attr, e['msg'])
row = ElementTree.fromstring(s)
table.append(row)
content.append(table)
def _create_html_index(self, files):
name = self.generator.get_name()
root = ElementTree.fromstring(CPPCHECK_HTML_FILE)
title = root.find('head/title')
title.text = 'cppcheck - report - %s' % name
body = root.find('body')
for div in body.findall('div'):
if div.get('id') == 'page':
page = div
break
for div in page.findall('div'):
if div.get('id') == 'header':
h1 = div.find('h1')
h1.text = 'cppcheck report - %s' % name
if div.get('id') == 'content':
content = div
self._create_html_table(content, files)
s = ElementTree.tostring(root, method='html')
s = CCPCHECK_HTML_TYPE + s
node = self.generator.path.get_bld().find_or_declare('cppcheck/index.html')
node.write(s)
return node
def _create_html_table(self, content, files):
table = ElementTree.fromstring(CPPCHECK_HTML_TABLE)
for name, val in files.items():
f = val['htmlfile']
s = '<tr><td colspan="4"><a href="%s">%s</a></td></tr>\n' % (f,name)
row = ElementTree.fromstring(s)
table.append(row)
errors = sorted(val['errors'], key=lambda e: int(e['line']) if e.has_key('line') else sys.maxint)
for e in errors:
if not e.has_key('line'):
s = '<tr><td></td><td>%s</td><td>%s</td><td>%s</td></tr>\n' % (e['id'], e['severity'], e['msg'])
else:
attr = ''
if e['severity'] == 'error':
attr = 'class="error"'
s = '<tr><td><a href="%s#line-%s">%s</a></td>' % (f, e['line'], e['line'])
s+= '<td>%s</td><td>%s</td><td %s>%s</td></tr>\n' % (e['id'], e['severity'], attr, e['msg'])
row = ElementTree.fromstring(s)
table.append(row)
content.append(table)
def get_xml_trees(db, host, pid, usernames, graceful=False):
""" Params: db, host, paragraph id, the list of usernames wanted,
Optional:
graceful: True if no excpetions are to be raised
excpetion raised if a user did not submit an annotation for the passage
returns a list of xml roots elements
"""
cur = get_cursor(db, host)
xmls = []
for username in usernames:
username = str(username) # precaution for cases bad input e.g. 101
cur_uid = get_uid(db, host, username)
cur.execute("SELECT xml FROM xmls WHERE paid=" + PLACE_HOLDER +
" AND uid=" + PLACE_HOLDER + "ORDER BY ts DESC",
(pid, cur_uid))
raw_xml = cur.fetchone()
if not raw_xml and not graceful:
raise Exception("The user " + username +
" did not submit an annotation for this passage")
else:
xmls.append(fromstring(raw_xml[0]))
return xmls
def get_by_xid(db, host, xid, graceful=False):
"""Returns the passages that correspond to the xid
Optional:
graceful: True if no excpetions are to be raised
excpetion raised if xid does no exist
"""
# precaution for bad input e.g. 101->'101'
xid = str(xid)
cur = get_cursor(db, host)
cur.execute("SELECT xml FROM xmls WHERE id" + "=" +
PLACE_HOLDER, (int(xid),))
raw_xml = cur.fetchone()
if not raw_xml and not graceful:
raise Exception("The xid " + xid + " does not exist")
elif raw_xml:
return fromstring(raw_xml[0])
def get_mac_address(conn, domain_name):
"""Get MAC address from domain XML."""
domain = conn.lookupByName(domain_name)
domain_xml = domain.XMLDesc()
domain_tree = etree.fromstring(domain_xml)
devices = domain_tree.find('devices')
interfaces = devices.iterfind('interface')
for interface in interfaces:
source = interface.find('source')
if source is None or source.get('network') != 'vagrant-private-dhcp':
continue
mac_element = interface.find('mac')
mac_address = mac_element.get('address')
return mac_address
raise NoPrivateDHCPInterfaceException()
def __init__(self, path, *args, **kwargs):
cls = self.__class__
super(cls, self).__init__(path, *args, **kwargs)
while True:
sess = PeektaggedSectionHeader(self.fh)
if sess.tag == 'pkts':
break
sess.payload = self.fh.read(sess.len)
if sess.tag == PEEKTAGGED_FILE_MAGIC:
root = ET.fromstring(sess.payload)
if root.tag != 'VersionInfo':
raise PeektaggedException("Corrupted version info")
for child in root:
setattr(self, child.tag, child.text)
elif sess.tag == 'sess':
root = ET.fromstring(sess.payload)
for child in root:
if child.tag == 'PacketCount':
setattr(self, 'total_packets', int(child.text))
break
def CreateClassFromXMLString(target_class, xml_string, string_encoding=None):
"""Creates an instance of the target class from the string contents.
Args:
target_class: class The class which will be instantiated and populated
with the contents of the XML. This class must have a _tag and a
_namespace class variable.
xml_string: str A string which contains valid XML. The root element
of the XML string should match the tag and namespace of the desired
class.
string_encoding: str The character encoding which the xml_string should
be converted to before it is interpreted and translated into
objects. The default is None in which case the string encoding
is not changed.
Returns:
An instance of the target class with members assigned according to the
contents of the XML - or None if the root XML tag and namespace did not
match those of the target class.
"""
encoding = string_encoding or XML_STRING_ENCODING
if encoding and isinstance(xml_string, unicode):
xml_string = xml_string.encode(encoding)
tree = ElementTree.fromstring(xml_string)
return _CreateClassFromElementTree(target_class, tree)
def parse(xml_string, target_class=None, version=1, encoding=None):
"""Parses the XML string according to the rules for the target_class.
Args:
xml_string: str or unicode
target_class: XmlElement or a subclass. If None is specified, the
XmlElement class is used.
version: int (optional) The version of the schema which should be used when
converting the XML into an object. The default is 1.
encoding: str (optional) The character encoding of the bytes in the
xml_string. Default is 'UTF-8'.
"""
if target_class is None:
target_class = XmlElement
if isinstance(xml_string, unicode):
if encoding is None:
xml_string = xml_string.encode(STRING_ENCODING)
else:
xml_string = xml_string.encode(encoding)
tree = ElementTree.fromstring(xml_string)
return _xml_element_from_tree(tree, target_class, version)
def doSomethingWithResult(response):
if response is None:
return "KO"
else:
tree = ET.fromstring(response.text)
status = "Free"
# arrgh, namespaces!!
elems=tree.findall(".//{http://schemas.microsoft.com/exchange/services/2006/types}BusyType")
for elem in elems:
status=elem.text
tree2=ET.fromstring(response.request.body)
elems2=tree2.findall(".//{http://schemas.microsoft.com/exchange/services/2006/types}Address")
for e in elems2:
room=e.text
#sys.stderr.write(str(now.isoformat())+": Get BusyType for room "+str(rooms[room])+": len: "+str(len(elems))+" => "+str(elems[0])+"\n")
sys.stderr.write(str(now.isoformat())+": Status for room "+str(rooms[room])+": "+str(room)+" => "+status+"\n")
result.append((status, rooms[room], room))
def snabb_state(query_output):
root = ET.fromstring(query_output)
print ("<snabb>")
for instance in root:
# In each instance, we need to query the id, pci, pid.
print ("<instance>")
for child in instance:
PRINT_TAG(child, "id")
PRINT_TAG(child,"pid")
PRINT_TAG(child,"next_hop_mac_v4")
PRINT_TAG(child,"next_hop_mac_v6")
#child = None
#for child in instance:
if child.tag == "pci":
for pcis in child:
for pci_child in pcis:
PRINT_TAG(pci_child,"rxpackets")
PRINT_TAG(pci_child,"txpackets")
PRINT_TAG(pci_child,"rxdrop")
PRINT_TAG(pci_child,"txdrop")
print "</instance>"
print ("</snabb>")
return
def snabb_statistics(output,argv):
root = ET.fromstring(output)
print "<snabb>"
found = 0
instance_id = ""
for i in range(0,len(argv)):
if argv[i] == "id":
instance_id = argv[i+1]
break
for instance in root:
if instance_id != '' and instance.findall("./id")[0].text != argv[2]:
pass
else:
found += 1
stats_per_instance(instance)
if found == 0:
print "<statistics><id_error>no instance found</id_error></statistics>"
print "</snabb>"
return
def snabb_statistics(output,argv):
root = ET.fromstring(output)
print "<snabb>"
found = 0
instance_id = ""
for i in range(0,len(argv)):
if argv[i] == "id":
instance_id = argv[i+1]
break
for instance in root:
if instance_id != '' and instance.findall("./id")[0].text != argv[2]:
pass
else:
found += 1
stats_per_instance(instance)
if found == 0:
print "<statistics><id_error>no instance found</id_error></statistics>"
print "</snabb>"
return
def _http_get_and_parse(self, cmd, payload = {}):
"""
Call http_get and parse the returned Foscam data into a hash. The data
all looks like: var id='000C5DDC9D6C';
"""
ret = {}
data = self._http_get(cmd,payload)
self.parent.logger.debug("FoscamHD2:_http_get_and_parse:%s: data=%s" % (self.name,data))
if data is False:
code = -1
else:
# Return code is good, unless CGI result changes it later
code = 0
root = ET.fromstring(data)
for child in root.iter():
if child.tag == 'result':
code = int(child.text)
elif child.tag != 'CGI_Result':
ret[child.tag] = child.text
self.parent.logger.debug("FoscamHD2:_http_get_and_parse:%s: code=%d, ret=%s" % (self.name,code,ret))
return code,ret
def ip(self, name):
leases = {}
conn = self.conn
for network in conn.listAllNetworks():
for lease in network.DHCPLeases():
ip = lease['ipaddr']
mac = lease['mac']
leases[mac] = ip
try:
vm = conn.lookupByName(name)
xml = vm.XMLDesc(0)
root = ET.fromstring(xml)
except:
return None
for element in root.getiterator('{kvirt}info'):
e = element.find('{kvirt}ip')
if e is not None:
return e.text
nic = root.getiterator('interface')[0]
mac = nic.find('mac').get('address')
if vm.isActive() and mac in leases:
return leases[mac]
else:
return None
def volumes(self, iso=False):
isos = []
templates = []
default_templates = [os.path.basename(t) for t in defaults.TEMPLATES.values() if t is not None]
conn = self.conn
for storage in conn.listStoragePools():
storage = conn.storagePoolLookupByName(storage)
storage.refresh(0)
storagexml = storage.XMLDesc(0)
root = ET.fromstring(storagexml)
for element in root.getiterator('path'):
storagepath = element.text
break
for volume in storage.listVolumes():
if volume.endswith('iso'):
isos.append("%s/%s" % (storagepath, volume))
elif volume.endswith('qcow2') or volume.endswith('qc2') or volume in default_templates:
templates.append("%s/%s" % (storagepath, volume))
if iso:
return sorted(isos, key=lambda s: s.lower())
else:
return sorted(templates, key=lambda s: s.lower())
def _uploadimage(self, name, pool='default', origin='/tmp', suffix='.ISO'):
name = "%s%s" % (name, suffix)
conn = self.conn
poolxml = pool.XMLDesc(0)
root = ET.fromstring(poolxml)
for element in root.getiterator('path'):
poolpath = element.text
break
imagepath = "%s/%s" % (poolpath, name)
imagexml = self._xmlvolume(path=imagepath, size=0, diskformat='raw')
pool.createXML(imagexml, 0)
imagevolume = conn.storageVolLookupByPath(imagepath)
stream = conn.newStream(0)
imagevolume.upload(stream, 0, 0)
with open("%s/%s" % (origin, name)) as ori:
stream.sendAll(self.handler, ori)
stream.finish()
def remove_cloudinit(self, name):
conn = self.conn
try:
vm = conn.lookupByName(name)
xml = vm.XMLDesc(0)
root = ET.fromstring(xml)
except:
print("VM %s not found" % name)
return {'result': 'failure', 'reason': "VM %s not found" % name}
for element in root.getiterator('disk'):
disktype = element.get('device')
if disktype == 'cdrom':
source = element.find('source')
path = source.get('file')
if source is None:
break
volume = conn.storageVolLookupByPath(path)
volume.delete(0)
element.remove(source)
newxml = ET.tostring(root)
conn.defineXML(newxml)
def vm_ports(self, name):
conn = self.conn
networks = []
try:
vm = conn.lookupByName(name)
except:
common.pprint("VM %s not found" % name, color='red')
return networks
xml = vm.XMLDesc(0)
root = ET.fromstring(xml)
for element in root.getiterator('interface'):
networktype = element.get('type')
if networktype == 'bridge':
network = element.find('source').get('bridge')
else:
network = element.find('source').get('network')
networks.append(network)
return networks
def _get_bridge(self, name):
conn = self.conn
bridges = [interface.name() for interface in conn.listAllInterfaces()]
if name in bridges:
return name
try:
net = self.conn.networkLookupByName(name)
except:
return None
netxml = net.XMLDesc(0)
root = ET.fromstring(netxml)
bridge = root.getiterator('bridge')
if bridge:
attributes = bridge[0].attrib
bridge = attributes.get('name')
return bridge
def _le_xml(self, arquivo):
if arquivo is None:
return False
if not isinstance(arquivo, basestring):
arquivo = etree.tounicode(arquivo)
if arquivo is not None:
if isinstance(arquivo, basestring):
if NAMESPACE_NFSE in arquivo:
arquivo = por_acentos(arquivo)
if u'<' in arquivo:
self._xml = etree.fromstring(tira_abertura(arquivo))
else:
arq = open(arquivo)
txt = ''.join(arq.readlines())
txt = tira_abertura(txt)
arq.close()
self._xml = etree.fromstring(txt)
else:
self._xml = etree.parse(arquivo)
return True
return False
def validar(self):
arquivo_esquema = self.caminho_esquema + self.arquivo_esquema
# Aqui é importante remover a declaração do encoding
# para evitar erros de conversão unicode para ascii
xml = tira_abertura(self.xml).encode(u'utf-8')
esquema = etree.XMLSchema(etree.parse(arquivo_esquema))
if not esquema.validate(etree.fromstring(xml)):
for e in esquema.error_log:
if e.level == 1:
self.alertas.append(e.message.replace('{http://www.portalfiscal.inf.br/nfe}', ''))
elif e.level == 2:
self.erros.append(e.message.replace('{http://www.portalfiscal.inf.br/nfe}', ''))
return esquema.error_log
def xml_to_dict(data):
try:
root = ET.fromstring(data)
except ET.ParseError:
root = []
d = {}
for item in root:
dd = {}
for subitem in item:
m = {}
m['text'] = subitem.text
m.update(subitem.attrib)
dd[subitem.tag] = m
d[dd['title']['text']] = dd
return d
def test_check_host_status_by_cibadmin(
self, mock_get_cib_xml, mock_set_cib_xml, mock_have_quorum,
mock_get_node_state_tag_list, mock_check_if_status_changed):
mock_get_cib_xml.return_value = STATUS_TAG_XML
mock_set_cib_xml.return_value = None
mock_have_quorum.return_value = 1
status_tag = ElementTree.fromstring(STATUS_TAG_XML)
node_state_tag_list = status_tag.getchildren()
mock_get_node_state_tag_list.return_value = node_state_tag_list
mock_check_if_status_changed.return_value = None
obj = handle_host.HandleHost()
ret = obj._check_host_status_by_cibadmin()
self.assertEqual(0, ret)
mock_get_cib_xml.assert_called_once_with()
mock_set_cib_xml.assert_called_once_with(STATUS_TAG_XML)
mock_have_quorum.assert_called_once_with()
mock_get_node_state_tag_list.assert_called_once_with()
mock_check_if_status_changed.assert_called_once_with(
node_state_tag_list)
def test_check_host_status_by_cibadmin_no_quorum(
self, mock_get_cib_xml, mock_set_cib_xml, mock_have_quorum,
mock_get_node_state_tag_list, mock_check_if_status_changed):
mock_get_cib_xml.return_value = STATUS_TAG_XML
mock_set_cib_xml.return_value = None
mock_have_quorum.return_value = 0
status_tag = ElementTree.fromstring(STATUS_TAG_XML)
node_state_tag_list = status_tag.getchildren()
mock_get_node_state_tag_list.return_value = node_state_tag_list
mock_check_if_status_changed.return_value = None
obj = handle_host.HandleHost()
ret = obj._check_host_status_by_cibadmin()
self.assertEqual(0, ret)
mock_get_cib_xml.assert_called_once_with()
mock_set_cib_xml.assert_called_once_with(STATUS_TAG_XML)
mock_have_quorum.assert_called_once_with()
mock_get_node_state_tag_list.assert_called_once_with()
mock_check_if_status_changed.assert_called_once_with(
node_state_tag_list)
def testParseClassNode(self):
example_xml_string = (
'<class name="Class1" extends="java.lang.Object">'
'<method name="method1">'
'</method>'
'<method name="method2">'
'</method>'
'</class>')
actual = dexdump._ParseClassNode(
ElementTree.fromstring(example_xml_string))
expected = {
'methods': ['method1', 'method2'],
'superclass': 'java.lang.Object',
}
self.assertEquals(expected, actual)
def del_tags_from_xml(xml, tag_list=[]):
"""
It deletes the tags either by their names or xpath
Arguments:
1.xml: It takes xml file path or xml string as input
2.tag_list: It contains list of tags which needs to be removed
Returns:
It returns xml string
"""
if os.path.exists(xml):
tree = ElementTree.parse(xml)
root = tree.getroot()
else:
root = ElementTree.fromstring(xml)
for tag in tag_list:
if 'xpath=' in tag:
tag = tag.strip('xpath=')
req_tags = getChildElementsListWithSpecificXpath(root, tag)
else:
req_tags = getChildElementsListWithSpecificXpath(root, ".//{0}".format(tag))
recursive_delete_among_children(root, req_tags)
xml_string = ElementTree.tostring(root, encoding='utf-8', method='xml')
return xml_string
def _update_interfaces(self):
self._interfaces = {}
root = etree.fromstring(self._introspect_xml)
for e in root.iter('interface'):
name = e.attrib['name']
self._interfaces[name] = Interface(
self, self._service, self._address, self._path, e,
self._camel_convert, self._timeout_ms,
self._changed_coroutines.get(name, None)
)
for e in root.iter('node'):
try:
self._nodes.append(e.attrib['name'])
except KeyError:
pass
def do_discover_reports(sdk_client):
url = 'https://adwords.google.com/api/adwords/reportdownload/{}/reportDefinition.xsd'.format(VERSION) #pylint: disable=line-too-long
xsd = request_xsd(url)
root = ET.fromstring(xsd)
nodes = list(root.find(".//*[@name='ReportDefinition.ReportType']/*"))
stream_names = [p.attrib['value'] for p in nodes if p.attrib['value'] in VERIFIED_REPORTS] #pylint: disable=line-too-long
streams = []
LOGGER.info("Starting report discovery")
for stream_name in stream_names:
schema, mdata = create_schema_for_report(stream_name, sdk_client)
streams.append({'stream': stream_name,
'tap_stream_id': stream_name,
'metadata' : metadata.to_list(mdata),
'schema': schema})
LOGGER.info("Report discovery complete")
return streams
def mountIso(self,instance,dev, image):
tree = ElementTree.fromstring(self.getInsXMLDesc(instance,0))
for disk in tree.findall('devices/disk'):
if disk.get('device') == 'cdrom':
for elm in disk:
if elm.tag == 'target':
if elm.get('dev') == dev:
src_media = ElementTree.Element('source')
src_media.set('file', image)
disk.append(src_media)
if instance.state()[0] == 1:
xml_disk = ElementTree.tostring(disk)
try:
instance.attachDevice(xml_disk)
except libvirt.libvirtError,e:
return '??????????{result}'.format(result=e.get_error_message())
xmldom = self.getInsXMLDesc(instance,1)
if instance.state()[0] == 5:
xmldom = ElementTree.tostring(tree)
try:
return self.defineXML(xmldom)
except libvirt.libvirtError,e:
return '??????????{result}'.format(result=e.get_error_message())
def getNetUsage(self,instance):
devices = []
dev_usage = []
tree = ElementTree.fromstring(self.getInsXMLDesc(instance, flag=1))
if instance.state()[0] == 1:
tree = ElementTree.fromstring(self.getInsXMLDesc(instance, flag=1))
for target in tree.findall("devices/interface/target"):
devices.append(target.get("dev"))
for i, dev in enumerate(devices):
rx_use_ago = instance.interfaceStats(dev)[0]
tx_use_ago = instance.interfaceStats(dev)[4]
time.sleep(1)
rx_use_now = instance.interfaceStats(dev)[0]
tx_use_now = instance.interfaceStats(dev)[4]
rx_diff_usage = (rx_use_now - rx_use_ago) * 8
tx_diff_usage = (tx_use_now - tx_use_ago) * 8
dev_usage.append({'dev': i, 'rx': rx_diff_usage, 'tx': tx_diff_usage})
else:
for i, dev in enumerate(self.get_net_device(instance)):
dev_usage.append({'dev': i, 'rx': 0, 'tx': 0})
return dev_usage
def delInstanceCdrom(self,instance,cdrom):
'''????'''
raw_xml = instance.XMLDesc(0)
root = ElementTree.fromstring(raw_xml)
for dk in root.findall('./devices'):
devs = dk.getchildren()
for dev in devs:
if dev.tag == 'disk'and dev.get('device')=='cdrom':
for iter in dev:
if iter.tag == 'target' and iter.get('dev') == cdrom:
devs.remove(dev)
diskXml = ElementTree.tostring(root)
try:
return self.defineXML(diskXml)
except libvirt.libvirtError,e:
return '??????????????{result}'.format(result=e.get_error_message())
def addInstanceInterface(self,instance,brName):
netk = self.getNetwork(brName)
if netk:
xml = netk.XMLDesc(0)
tree = ElementTree.fromstring(xml)
try:
mode = tree.find('virtualport').get('type')
except:
mode = 'brctl'
model = tree.find('forward').get('mode')
interXml = Const.CreateNetcard(nkt_br=brName, ntk_name=brName +'-'+CommTools.radString(length=4), data={'type':model,'mode':mode})
try:
return instance.attachDeviceFlags(interXml,3)#??????????flags?3??????????????
except libvirt.libvirtError,e:
return '??????????????{result}'.format(result=e.get_error_message())
else:return False
def setInterfaceBandwidth(self,instance,port,bandwidth):
'''????'''
domXml = instance.XMLDesc(0)
root = ElementTree.fromstring(domXml)
try:
for dev in root.findall('.//devices/'):
if dev.tag == 'interface':
for iter in dev:
if iter.tag == 'target' and iter.get('dev') == port:
bwXml = ElementTree.SubElement(dev,'bandwidth')
inbdXml = ElementTree.Element('inbound')
inbdXml.set('average',str(int(bandwidth)*1024))
inbdXml.set('peak',str(int(bandwidth)*1024))
inbdXml.set('burst','1024')
outbdXml = ElementTree.Element('outbound')
outbdXml.set('average',str(int(bandwidth)*1024))
outbdXml.set('peak',str(int(bandwidth)*1024))
outbdXml.set('burst','1024')
bwXml.append(inbdXml)
bwXml.append(outbdXml)
domXml = ElementTree.tostring(root)
except Exception,e:
return {"status":"faild",'data':e}
if self.defineXML(domXml):return {"status":"success",'data':None}
def setUpClass(cls):
xml = """
<temporal>
<fluent_changes>
<fluent_change energy="60.281" fluent="PHONE_ACTIVE" frame="0" new_value="off"/>
<fluent_change energy="60.281" fluent="PHONE_ACTIVE" frame="3890" new_value="on" old_value="off"/>
<fluent_change energy="60.281" fluent="PHONE_ACTIVE" frame="3990" new_value="off" old_value="on"/>
</fluent_changes>
<actions>
<event action="makecall_START" energy="60.281" frame="3718"/>
<event action="makecall_END" energy="60.281" frame="3767"/>
</actions>
</temporal>"""
cls.parsexml = ET.fromstring(xml)