Python xml.dom.minidom 模块,parseString() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用xml.dom.minidom.parseString()。
def addTree(self, title, data, row=None, column=0, colspan=0, rowspan=0):
self.__verifyItem(self.n_trees, title, True)
self.__importAjtree()
if parseString is False:
self.warn("Unable to parse xml files. .addTree() not available")
return
xmlDoc = parseString(data)
frame = ScrollPane(
self.__getContainer(),
relief=RAISED,
borderwidth=2,
bg="white",
highlightthickness=0,
takefocus=1)
self.__positionWidget(frame, row, column, colspan, rowspan, "NSEW")
item = ajTreeData(xmlDoc.documentElement)
node = ajTreeNode(frame.getPane(), None, item)
self.n_trees[title] = node
# update() & expand() called in go() function
def parseDomainManager (self):
identifier = self.addTreeWidgetItem(self.domMgrItem, 'Identifier:', self.domManager._get_identifier())
profile = self.addTreeWidgetItem(self.domMgrItem, 'Profile:', self.domManager._get_domainManagerProfile())
self.domMgrPropsItem = self.addTreeWidgetItem(self.domMgrItem, 'Properties')
# Read the DMD file to get the SPD file, which can then be used to get the properties.
_xmlFile = self.fileMgr.open(str(self.domManager._get_domainManagerProfile()), True)
dmd = minidom.parseString(_xmlFile.read(_xmlFile.sizeOf()))
_xmlFile.close()
spdFile = dmd.getElementsByTagName('localfile')[0].getAttribute('name')
if not spdFile.startswith("/"):
spdFile = os.path.join(os.path.dirname(xmlfile), spdFile)
# Get the property name mapping.
prfFile = getPropertyFile(spdFile, self.fileMgr)
self.domMgrProps = parsePropertyFile(prfFile, self.fileMgr)
# Create entries for all of the properties.
try:
props = self.domManager.query([])
except:
props = []
self.buildPropertiesListView_old(self.domMgrPropsItem, props, self.domMgrProps)
def __init__(self, xml_string):
self.xml_unescape_table = {}
self.xml_map = {}
try:
self.xml = minidom.parseString(xml_string)
except:
print xml_string
self.xml_unescape_tabl = get_xml_unescape_table()
self.xml_map = get_xml_unescape_map()
for k, v in self.xml_map.items():
xml_string = xml_string.replace(k, v)
self.xml = minidom.parseString(xml_string)
self.bucket = get_tag_text(self.xml, 'Bucket', convert_to_bool = False)
self.object = get_tag_text(self.xml, 'Key', convert_to_bool = False)
if self.xml_map:
for k, v in self.xml_map.items():
self.object = self.object.replace(v, k)
self.object = unescape(self.object, self.xml_unescape_table)
self.key = get_tag_text(self.xml, 'Key', convert_to_bool = False)
self.upload_id = get_tag_text(self.xml, 'UploadId')
self.marker = get_tag_text(self.xml, 'Marker', convert_to_bool = False)
def __init__(self, xml_string):
try:
self.xml = minidom.parseString(xml_string)
except:
print xml_string
self.bucket = get_tag_text(self.xml, 'Bucket', convert_to_bool = False)
self.type = get_tag_text(self.xml, 'Type', convert_to_bool = False)
self.key = get_tag_text(self.xml, 'Key', convert_to_bool = False)
self.last_modified = get_tag_text(self.xml, 'LastModified', convert_to_bool = False)
self.etag = get_tag_text(self.xml, 'ETag', convert_to_bool = False)
self.content_type = get_tag_text(self.xml, 'Content-Type')
self.size = get_tag_text(self.xml, 'Size', convert_to_bool = False)
self.parts = []
parts = self.xml.getElementsByTagName('Part')
for p in parts:
self.parts.append(Part(p))
def get_broadcast_token(self, nick, uid):
""" Token required to start a broadcast.
:param nick: Client nick name
:type nick: str
:param uid: Client user identification
:type uid: str | int
:return: The broadcast token required to start a broadcast or None on failure.
:rtype: str | None
"""
_url = self._broadcast_token_url.format(self.room_name, nick, uid)
if self.is_greenroom:
_url.replace('site=tinychat', 'site=greenroom')
_response = util.web.http_get(url=_url, proxy=self.proxy)
log.debug('broadcast token response: %s' % _response)
if _response['content'] is not None:
_xml = parseString(_response['content'])
root = _xml.getElementsByTagName('response')[0]
result = root.getAttribute('result')
if result == 'PW':
return result
return root.getAttribute('token')
return None
def addTree(self, title, data, row=None, column=0, colspan=0, rowspan=0):
self.__verifyItem(self.n_trees, title, True)
self.__importAjtree()
if parseString is False:
self.warn("Unable to parse xml files. .addTree() not available")
return
xmlDoc = parseString(data)
frame = ScrollPane(
self.__getContainer(),
relief=RAISED,
borderwidth=2,
bg="white",
highlightthickness=0,
takefocus=1)
self.__positionWidget(frame, row, column, colspan, rowspan, "NSEW")
item = ajTreeData(xmlDoc.documentElement)
node = ajTreeNode(frame.getPane(), None, item)
self.n_trees[title] = node
# update() & expand() called in go() function
def get_xml(self, encoding='utf-8'):
"""
Returns the XML-code of this schedule.
The returned byte string contains the XML in the requested encoding. You save the byte sting to file in binary
mode (the file will encoded the requested encoding). Or you can convert the byte string to a string with
.decode(encoding).
:param str encoding: The encoding of the XML.
:rtype: bytes
"""
tree = Element(None)
self.generate_xml(tree)
xml_string = ElementTree.tostring(tree)
document = minidom.parseString(xml_string)
return document.toprettyxml(indent=' ', encoding=encoding)
# ------------------------------------------------------------------------------------------------------------------
def parse(self, xml, pvars):
#tmp = minidom.parseString(xml)
if sys.version_info >= (3, 0):
pl = plistlib.readPlistFromBytes(xml.encode());
else:
pl = plistlib.readPlistFromString(xml);
parsed= {}
pvars = self.getVars(pvars)
for k,v in pvars.items():
parsed[k] = pl[k] if k in pl else None
return parsed;
def _parse_igd_profile(profile_xml):
"""
Traverse the profile xml DOM looking for either
WANIPConnection or WANPPPConnection and return
the value found as well as the 'controlURL'.
"""
dom = parseString(profile_xml)
service_types = dom.getElementsByTagName('serviceType')
for service in service_types:
if _node_val(service).find('WANIPConnection') > 0 or \
_node_val(service).find('WANPPPConnection') > 0:
control_url = service.parentNode.getElementsByTagName(
'controlURL'
)[0].childNodes[0].data
upnp_schema = _node_val(service).split(':')[-2]
return control_url, upnp_schema
return False
def _parse_for_errors(soap_response):
if soap_response.status == 500:
response_data = soap_response.read()
try:
err_dom = parseString(response_data)
err_code = _node_val(err_dom.getElementsByTagName('errorCode')[0])
err_msg = _node_val(
err_dom.getElementsByTagName('errorDescription')[0]
)
except Exception, err:
logging.error("Unable to parse SOAP error: {0}, response: {1}".format(err, response_data))
return False
logging.error('SOAP request error: {0} - {1}'.format(err_code, err_msg))
raise Exception(
'SOAP request error: {0} - {1}'.format(err_code, err_msg)
)
return False
else:
return True
def test_valid_calls(endpoint, details):
"""Test all endpoints with valid calls."""
full_url = MALTEGO_SERVER + endpoint
logging.debug("Requesting %s" % full_url)
response = requests.post(
full_url,
data=build_maltego_response(details),
auth=(API_USERNAME, API_KEY),
verify=False
)
logging.debug("Response %s" % response.content)
assert response.status_code == 200
xmldoc = minidom.parseString(response.content)
messages = xmldoc.getElementsByTagName('UIMessage')
for message in messages:
value = message.attributes['MessageType'].value
assert value != 'FatalError'
def send_body(self, body):
"""Send the body
:param body:
:return: A tuple in the form
``(:class:`~xml.dom.minidom.Element`, String)``
"""
out = body.toxml()
response = \
self.request_session.post(
self.bosh_service.geturl(),
data=out)
if response.status_code == 200:
data = response.text
else:
data = ''
doc = minidom.parseString(data)
return (doc.documentElement, data)
def write(self, filename):
xml = ET.Element("phonebooks")
for book in self.phonebookList:
xml.append(book.getXML())
tree = ET.ElementTree(xml)
if False:
tree.write(filename, encoding="iso-8859-1", xml_declaration=True)
else:
rough_string = ET.tostring(tree.getroot(), encoding="iso-8859-1", method="xml")
reparsed = parseString(rough_string)
pretty = reparsed.toprettyxml(indent=" ", encoding="iso-8859-1").decode("iso-8859-1")
with open(filename, 'w', encoding="iso-8859-1") as outfile:
outfile.write(pretty)
# sid: Login session ID
# phonebookid: 0 for main phone book
# 1 for next phone book in list, etc...
def get_requestUrl(dl_url, server, **options):
""" Get the request url."""
stopWatch = stop_watch.localThreadStopWatch()
start_time = datetime.datetime.now()
stopWatch.start('get_request')
log.info( "Requesting file to download (this can take a while)..." )
# Get request id
m = utils_http.open_url(dl_url, **options)
responseStr = m.read()
dom = minidom.parseString(responseStr)
node = dom.getElementsByTagName('statusModeResponse')[0]
status = node.getAttribute('status')
if status == "2":
msg = node.getAttribute('msg')
log.error(msg)
get_req_url = None
else:
requestId = node.getAttribute('requestId')
# Get request url
get_req_url = server + '?action=getreqstatus&requestid=' + requestId
stopWatch.stop('get_request')
return get_req_url
def saveFile(filename, subdirs, filetype):
dir_path = getDirectoryPath(subdirs)
xmldom = parseString("".join(request.body))
if filename != "":
cfg = readconfig()
xmldir = cfg[filetype]
indented_xml = "".join(xmldom.toprettyxml(newl='\n'))
corrected_xml = remove_extra_newlines_char_xml(indented_xml)
print corrected_xml
with open(xmldir + dir_path + os.sep + filename, "w") as f:
f.write(corrected_xml)
output = {"success": True,
"path": filename}
else:
output = {"success": False,
"error": "Save called without a filename or content!"}
return output
def get_output_response(api_response):
if api_response is not None:
try:
output_response = parseString("".join(api_response.text))
except:
try:
JSON.loads(api_response.text)
except:
output_response = api_response.text.encode('ascii', 'ignore')
pNote("api_response Text: \n {0}".format(output_response))
else:
output_response = api_response.json()
pNote("api_response (JSON format): \n {0}".
format(JSON.dumps(output_response, indent=4)))
else:
pNote("api_response (XML format): \n {0}".
format(output_response.toprettyxml(newl='\n')))
else:
output_response = None
return output_response
def main():
""" This function basically creates the xml file by calling various other
functions, runs the file and then saves it.
"""
root = Element('data')
dir_path = os.path.dirname(os.path.realpath(sys.argv[0]))
rel_path = get_relative_path(dir_path, "data.xml")
tree = xml.etree.ElementTree.parse(rel_path)
input_root = tree.getroot()
nodes = get_firstlevel_children(input_root, "tag")
populate_xml(root, nodes)
temp_xml = 'temp.xml'
pretty_xml = minidom.parseString(xml.etree.ElementTree.tostring(root))\
.toprettyxml(indent=" ")
with open(temp_xml, "w") as config_file:
config_file.write(pretty_xml)
config_file.flush()
config_file.close()
save_file(temp_xml)
def getVmIsoList(self):
isoList = []
try:
vMdisk = self.conn.listStoragePools()
for vM in vMdisk:
pool = self.conn.storagePoolLookupByName(vM)
stgvols = pool.listVolumes()
for stgvolname in stgvols:
volData = dict()
stgvol = pool.storageVolLookupByName(stgvolname)
info = stgvol.info()
try:
volXml = stgvol.XMLDesc(0)
xml = minidom.parseString(volXml)
volData['vol_type'] = xml.getElementsByTagName('target')[0].getElementsByTagName('format')[0].getAttribute('type')
except:
volData['vol_type'] = 'unkonwn'
volData['vol_name'] = stgvol.name()
volData['vol_size'] = info[1] / 1024/ 1024/ 1024
volData['vol_available'] = info[2] / 1024/ 1024/ 1024
volData['vol_path'] = stgvol.path()
if volData['vol_type'].endswith('.iso') or volData['vol_path'].endswith('.iso'):isoList.append(volData)
return isoList
except libvirt.libvirtError:
return isoList
def delInstanceDisk(self,instance,volPath):
'''????'''
diskXml = None
raw_xml = instance.XMLDesc(0)
domXml = minidom.parseString(raw_xml)
for ds in domXml.getElementsByTagName('disk'):
try:
path = ds.getElementsByTagName('source')[0].getAttribute('file')
except:
continue
if path == volPath:diskXml = ds.toxml()
if diskXml:
try:
return instance.detachDeviceFlags(diskXml,3)
except libvirt.libvirtError,e:
return '??????????????{result}'.format(result=e.get_error_message())
else:return False
def parseSimpleTag(text,ignoreEntities=[]):
docText = u"<doc>%s</doc>" % text
xmldoc = minidom.parseString(docText.encode('utf8'))
docNode = xmldoc.childNodes[0]
text,unmergedEntities,relations = parseSimpleTag_helper(docNode,ignoreEntities=ignoreEntities)
missingSourceEntityID = [ e.sourceEntityID == '' for e in unmergedEntities ]
assert all(missingSourceEntityID) or (not any(missingSourceEntityID)), 'All entities or none (not some) should be given IDs'
assert (not any(missingSourceEntityID)) or len(relations) == 0, "Cannot include relations with no-ID entities"
if all(missingSourceEntityID):
for i,e in enumerate(unmergedEntities):
e.sourceEntityID = i+1
entities = mergeEntitiesWithMatchingIDs(unmergedEntities)
combinedData = kindred.Document(text,entities=entities,relations=relations)
return combinedData
def do_NOTIFY(self):
global needs_to_reload_zone_config
global raumfeld_handler
uuid = "uuid:"+self.path[1:]
friendly_name = RfCmd.map_udn_to_friendly_name(uuid)
content_length = int(self.headers['content-length'])
notification = self.rfile.read(content_length)
result = minidom.parseString(notification.decode('UTF-8'))
#print("\n\n#NOTIFY:\n" + result.toprettyxml())
notification_content = XmlHelper.xml_extract_dict(result,
['LastChange',
'Revision',
'SystemUpdateID',
'BufferFilled'])
if len(notification_content['LastChange']):
if '/Preferences/ZoneConfig/Rooms' in notification_content['LastChange']:
needs_to_reload_zone_config = True
last_change = minidom.parseString(notification_content['LastChange'])
uuid_store.set(uuid, friendly_name[0], friendly_name[1], last_change)
raumfeld_handler.set_subscription_values("uuid:" + self.path[1:], last_change)
#print("\n\n#NOTIFY LastChange: "+self.path+"\n"+last_change.toprettyxml())
self.send_response(200)
self.end_headers()
def get_services_from_location(location):
try:
(xml_headers, xml_data) = UpnpSoap.get(location)
if xml_data is not False:
xml_root = minidom.parseString(xml_data)
services_list = list()
for service in xml_root.getElementsByTagName("service"):
service_dict = XmlHelper.xml_extract_dict(service, ['serviceType',
'controlURL',
'eventSubURL',
'SCPDURL',
'serviceId'])
services_list.append(service_dict)
return services_list
except Exception as e:
print("Error get_subscription_urls:{0}".format(e))
return None
def _parse_cas_xml_response(cls, response_text):
cas_type = 'noResponse'
cas_data = {}
if not response_text:
return cas_type, cas_data
xml_document = parseString(response_text)
node_element = xml_document.documentElement
if node_element.nodeName != 'cas:serviceResponse':
raise Exception
for child in node_element.childNodes:
if child.nodeType != child.ELEMENT_NODE:
continue
cas_type = child.nodeName.replace("cas:", "")
cas_data = cls._parse_cas_xml_data(child)
break
return cas_type, cas_data
def get_broadcast_token(self, nick, uid):
""" Token required to start a broadcast.
:param nick: Client nick name
:type nick: str
:param uid: Client user identification
:type uid: str | int
:return: The broadcast token required to start a broadcast or None on failure.
:rtype: str | None
"""
_url = self._broadcast_token_url.format(self.room_name, nick, uid)
if self.is_greenroom:
_url.replace('site=tinychat', 'site=greenroom')
_response = util.web.http_get(url=_url, proxy=self.proxy)
log.debug('broadcast token response: %s' % _response)
if _response['content'] is not None:
_xml = parseString(_response['content'])
root = _xml.getElementsByTagName('response')[0]
result = root.getAttribute('result')
if result == 'PW':
return result
return root.getAttribute('token')
return None
def upload(self,filename, **params):
#x = flickr._prepare_params(params)
#args['api_key'] = self.__api_key
args = params
sig = flickr._get_api_sig(params=params)
args['api_key'] = flickr.API_KEY
args['api_sig'] = sig
args['auth_token'] = flickr.userToken()
f = file(filename, 'rb')
photo_data = f.read()
f.close()
# now make a "files" array to pass to uploader
files = [('photo', filename, photo_data)]
response = post_multipart('api.flickr.com', '/services/upload/', args, files)
# use get data since error checking is handled by it already
data = flickr._get_data(minidom.parseString(response))
photo = flickr.Photo(data.rsp.photoid.text)
return photo
def module_run(self):
filename = self.options['filename']
with codecs.open(filename, 'wb', encoding='utf-8') as outfile:
# build a list of table names
tables = [x.strip() for x in self.options['tables'].split(',')]
data_dict = {}
cnt = 0
for table in tables:
data_dict[table] = []
columns = [x[0] for x in self.get_columns(table)]
rows = self.query('SELECT "%s" FROM "%s" ORDER BY 1' % ('", "'.join(columns), table))
for row in rows:
row_dict = {}
for i in range(0,len(columns)):
row_dict[columns[i]] = row[i]
data_dict[table].append(row_dict)
cnt += 1
# write the xml to a file
reparsed = parseString(dicttoxml(data_dict))
outfile.write(reparsed.toprettyxml(indent=' '*4))
self.output('%d records added to \'%s\'.' % (cnt, filename))
def search_for_books(tag, index):
"""Search Amazon for Books """
amazon = bottlenose.Amazon(ACCESS_KEY, SECRET_KEY, AFFILIATE_ID)
results = amazon.ItemSearch(
SearchIndex = index,
Sort = "relevancerank",
Keywords = tag
)
parsed_result = xml.parseString(results)
all_items = []
attrs = ['Title','Author', 'URL']
for item in parsed_result.getElementsByTagName('Item'):
parse_item = {}
for attr in attrs:
parse_item[attr] = ""
try:
parse_item[attr] = item.getElementsByTagName(attr)[0].childNodes[0].data
except:
pass
all_items.append(parse_item)
return all_items
def gen_xml(rowList):
items = Element('items')
for row in rowList:
item = SubElement(items, 'item')
item.set('autocomplete', row.get('autocomplete') or '')
item.set('uid', row.get('uid') or '')
item.set('arg', row.get('title') or '')
item.set('valid', row.get('valid') or '')
title = SubElement(item, 'title')
title.text = row.get('title') or ''
subtitle = SubElement(item, 'subtitle')
subtitle.text = row.get('subtitle') or ''
icon = SubElement(item, 'icon')
icon.text = row.get('icon')
tree = minidom.parseString(etree.tostring(items))
return tree.toxml()
def glyphUpdateFromSVG(g, svgCode):
doc = xmlparseString(svgCode)
svg = doc.documentElement
paths = findPathNodes(svg)
if len(paths) == 0:
raise Exception('no <path> found in SVG')
path = paths[0]
if len(paths) != 1:
for p in paths:
id = p.getAttribute('id')
if id is not None and id.find('stroke') == -1:
path = p
break
tr = nodeTranslation(path)
d = path.getAttribute('d')
g.clearContours()
drawSVGPath(g, d, tr)
def svgGetPaths(svgCode):
doc = xmlparseString(svgCode)
svg = doc.documentElement
paths = findPathNodes(svg)
isFigmaSVG = svgCode.find('Figma</desc>') != -1
if len(paths) == 0:
return paths, (0,0)
paths2 = []
for path in paths:
id = path.getAttribute('id')
if not isFigmaSVG or (id is None or id.find('stroke') == -1):
tr = nodeTranslation(path)
d = path.getAttribute('d')
paths2.append((d, tr))
return paths2, isFigmaSVG
def glyphUpdateFromSVG(g, svgCode):
doc = xmlparseString(svgCode)
svg = doc.documentElement
paths = findPathNodes(svg)
if len(paths) == 0:
raise Exception('no <path> found in SVG')
path = paths[0]
if len(paths) != 1:
for p in paths:
id = p.getAttribute('id')
if id is not None and id.find('stroke') == -1:
path = p
break
tr = nodeTranslation(path)
d = path.getAttribute('d')
g.clearContours()
drawSVGPath(g, d, tr)
def main(input_xml, loghandle):
xmlhandle = minidom.parseString(input_xml)
documentNode = xmlhandle.childNodes[0].childNodes
tags = '<act>'
for articleNode in documentNode:
if articleNode.nodeType != Node.ELEMENT_NODE:
tags += articleNode.data
else:
if articleNode.tagName != 'article':
tags += xmlExtract.get_complete_tag(articleNode)
else:
tags += get_article_data(articleNode, loghandle)
tags += '</act>\n'
return tags
def _parse_repos(module):
"""parses the output of zypper --xmlout repos and return a parse repo dictionary"""
cmd = _get_cmd('--xmlout', 'repos')
from xml.dom.minidom import parseString as parseXML
rc, stdout, stderr = module.run_command(cmd, check_rc=False)
if rc == 0:
repos = []
dom = parseXML(stdout)
repo_list = dom.getElementsByTagName('repo')
for repo in repo_list:
opts = {}
for o in REPO_OPTS:
opts[o] = repo.getAttribute(o)
opts['url'] = repo.getElementsByTagName('url')[0].firstChild.data
# A repo can be uniquely identified by an alias + url
repos.append(opts)
return repos
# exit code 6 is ZYPPER_EXIT_NO_REPOS (no repositories defined)
elif rc == 6:
return []
else:
module.fail_json(msg='Failed to execute "%s"' % " ".join(cmd), rc=rc, stdout=stdout, stderr=stderr)
def parse_cp_xml_counter_output(data):
"""
Parse xml file print_counter.xml
data must be a single string, as returned by file.read() (notice the
difference with parse_text_output!)
On output, a dictionary with parsed values.
"""
dom = parseString(data)
parsed_data = {}
cardname = 'LAST_SUCCESSFUL_PRINTOUT'
card1 = [_ for _ in dom.childNodes if _.nodeName == 'PRINT_COUNTER'][0]
card2 = [_ for _ in card1.childNodes if _.nodeName == 'LAST_SUCCESSFUL_PRINTOUT'][0]
tagname = 'STEP'
parsed_data[cardname.lower().replace('-', '_')] = parse_xml_child_integer(tagname, card2)
return parsed_data
def parse_cp_xml_counter_output(data):
"""
Parse xml file print_counter.xml
data must be a single string, as returned by file.read() (notice the
difference with parse_text_output!)
On output, a dictionary with parsed values.
"""
dom = parseString(data)
parsed_data={}
cardname='LAST_SUCCESSFUL_PRINTOUT'
card1 = [ _ for _ in dom.childNodes if _.nodeName=='PRINT_COUNTER'][0]
card2 = [ _ for _ in card1.childNodes if _.nodeName=='LAST_SUCCESSFUL_PRINTOUT'][0]
tagname='STEP'
parsed_data[cardname.lower().replace('-','_')] = parse_xml_child_integer(tagname,card2)
return parsed_data
def convert_xml_to_azure_object(xmlstr, azure_type, include_id=True, use_title_as_id=True):
xmldoc = minidom.parseString(xmlstr)
return_obj = azure_type()
xml_name = azure_type._xml_name if hasattr(azure_type, '_xml_name') else azure_type.__name__
# Only one entry here
for xml_entry in _MinidomXmlToObject.get_children_from_path(xmldoc,
'entry'):
for node in _MinidomXmlToObject.get_children_from_path(xml_entry,
'content',
xml_name):
_MinidomXmlToObject._fill_data_to_return_object(node, return_obj)
for name, value in _MinidomXmlToObject.get_entry_properties_from_node(
xml_entry,
include_id=include_id,
use_title_as_id=use_title_as_id).items():
setattr(return_obj, name, value)
return return_obj
def convert_response_to_feeds(response, convert_func):
if response is None:
return None
feeds = _list_of(Feed)
_set_continuation_from_response_headers(feeds, response)
xmldoc = minidom.parseString(response.body)
xml_entries = _MinidomXmlToObject.get_children_from_path(xmldoc, 'feed', 'entry')
if not xml_entries:
# in some cases, response contains only entry but no feed
xml_entries = _MinidomXmlToObject.get_children_from_path(xmldoc, 'entry')
for xml_entry in xml_entries:
new_node = _MinidomXmlToObject._clone_node_with_namespaces(xml_entry, xmldoc)
feeds.append(convert_func(new_node.toxml('utf-8')))
return feeds
def data(self, svg):
if svg is None:
self._data = None
return
# parse into dom object
from xml.dom import minidom
x = minidom.parseString(svg)
# get svg tag (should be 1)
found_svg = x.getElementsByTagName('svg')
if found_svg:
svg = found_svg[0].toxml()
else:
# fallback on the input, trust the user
# but this is probably an error.
pass
svg = cast_unicode(svg)
self._data = svg
def get_info(cls, video_key):
req = request.Request('http://rutube.ru/api/video/%s/?format=xml' % video_key, method='GET')
try:
logger.debug('{0.method} {0.full_url}'.format(req))
response = request.urlopen(req, timeout=3)
except error.URLError:
return None
if response.status != 200:
return None
dom = minidom.parseString(response.read())
title = dom.getElementsByTagName('title').item(0)
description = dom.getElementsByTagName('description').item(0)
thumbnail = dom.getElementsByTagName('thumbnail_url').item(0)
embed = dom.getElementsByTagName('html').item(0)
return {
'title': title.firstChild.data,
'description': description.firstChild.data,
'preview_url': thumbnail.firstChild.data,
'embed': embed.firstChild.data
}
def read_hypergraph(string):
"""
Read a graph from a XML document. Nodes and hyperedges specified in the input will be added
to the current graph.
@type string: string
@param string: Input string in XML format specifying a graph.
@rtype: hypergraph
@return: Hypergraph
"""
hgr = hypergraph()
dom = parseString(string)
for each_node in dom.getElementsByTagName("node"):
hgr.add_node(each_node.getAttribute('id'))
for each_node in dom.getElementsByTagName("hyperedge"):
hgr.add_hyperedge(each_node.getAttribute('id'))
dom = parseString(string)
for each_node in dom.getElementsByTagName("node"):
for each_edge in each_node.getElementsByTagName("link"):
hgr.link(str(each_node.getAttribute('id')), str(each_edge.getAttribute('to')))
return hgr
def testGetElementsByTagNameNS(self):
d="""<foo xmlns:minidom='http://pyxml.sf.net/minidom'>
<minidom:myelem/>
</foo>"""
dom = parseString(d)
elems = dom.getElementsByTagNameNS("http://pyxml.sf.net/minidom",
"myelem")
self.confirm(len(elems) == 1
and elems[0].namespaceURI == "http://pyxml.sf.net/minidom"
and elems[0].localName == "myelem"
and elems[0].prefix == "minidom"
and elems[0].tagName == "minidom:myelem"
and elems[0].nodeName == "minidom:myelem")
dom.unlink()
def testGetEmptyNodeListFromElementsByTagNameNS(self):
doc = parseString('<doc/>')
self.get_empty_nodelist_from_elements_by_tagName_ns_helper(
doc, 'http://xml.python.org/namespaces/a', 'localname')
self.get_empty_nodelist_from_elements_by_tagName_ns_helper(
doc, '*', 'splat')
self.get_empty_nodelist_from_elements_by_tagName_ns_helper(
doc, 'http://xml.python.org/namespaces/a', '*')
doc = parseString('<doc xmlns="http://xml.python.org/splat"><e/></doc>')
self.get_empty_nodelist_from_elements_by_tagName_ns_helper(
doc, "http://xml.python.org/splat", "not-there")
self.get_empty_nodelist_from_elements_by_tagName_ns_helper(
doc, "*", "not-there")
self.get_empty_nodelist_from_elements_by_tagName_ns_helper(
doc, "http://somewhere.else.net/not-there", "e")
def testCloneDocumentDeep(self):
doc = parseString("<?xml version='1.0'?>\n"
"<!-- comment -->"
"<!DOCTYPE doc [\n"
"<!NOTATION notation SYSTEM 'http://xml.python.org/'>\n"
"]>\n"
"<doc attr='value'/>")
doc2 = doc.cloneNode(1)
self.confirm(not (doc.isSameNode(doc2) or doc2.isSameNode(doc)),
"testCloneDocumentDeep: document objects not distinct")
self.confirm(len(doc.childNodes) == len(doc2.childNodes),
"testCloneDocumentDeep: wrong number of Document children")
self.confirm(doc2.documentElement.nodeType == Node.ELEMENT_NODE,
"testCloneDocumentDeep: documentElement not an ELEMENT_NODE")
self.confirm(doc2.documentElement.ownerDocument.isSameNode(doc2),
"testCloneDocumentDeep: documentElement owner is not new document")
self.confirm(not doc.documentElement.isSameNode(doc2.documentElement),
"testCloneDocumentDeep: documentElement should not be shared")
if doc.doctype is not None:
# check the doctype iff the original DOM maintained it
self.confirm(doc2.doctype.nodeType == Node.DOCUMENT_TYPE_NODE,
"testCloneDocumentDeep: doctype not a DOCUMENT_TYPE_NODE")
self.confirm(doc2.doctype.ownerDocument.isSameNode(doc2))
self.confirm(not doc.doctype.isSameNode(doc2.doctype))
def testNormalizeCombineAndNextSibling(self):
doc = parseString("<doc/>")
root = doc.documentElement
root.appendChild(doc.createTextNode("first"))
root.appendChild(doc.createTextNode("second"))
root.appendChild(doc.createElement("i"))
self.confirm(len(root.childNodes) == 3
and root.childNodes.length == 3,
"testNormalizeCombineAndNextSibling -- preparation")
doc.normalize()
self.confirm(len(root.childNodes) == 2
and root.childNodes.length == 2
and root.firstChild.data == "firstsecond"
and root.firstChild is not root.lastChild
and root.firstChild.nextSibling is root.lastChild
and root.firstChild.previousSibling is None
and root.lastChild.previousSibling is root.firstChild
and root.lastChild.nextSibling is None
, "testNormalizeCombinedAndNextSibling -- result")
doc.unlink()
def testNormalizeDeleteWithPrevSibling(self):
doc = parseString("<doc/>")
root = doc.documentElement
root.appendChild(doc.createTextNode("first"))
root.appendChild(doc.createTextNode(""))
self.confirm(len(root.childNodes) == 2
and root.childNodes.length == 2,
"testNormalizeDeleteWithPrevSibling -- preparation")
doc.normalize()
self.confirm(len(root.childNodes) == 1
and root.childNodes.length == 1
and root.firstChild.data == "first"
and root.firstChild is root.lastChild
and root.firstChild.nextSibling is None
and root.firstChild.previousSibling is None
, "testNormalizeDeleteWithPrevSibling -- result")
doc.unlink()
def testNormalizeDeleteWithNextSibling(self):
doc = parseString("<doc/>")
root = doc.documentElement
root.appendChild(doc.createTextNode(""))
root.appendChild(doc.createTextNode("second"))
self.confirm(len(root.childNodes) == 2
and root.childNodes.length == 2,
"testNormalizeDeleteWithNextSibling -- preparation")
doc.normalize()
self.confirm(len(root.childNodes) == 1
and root.childNodes.length == 1
and root.firstChild.data == "second"
and root.firstChild is root.lastChild
and root.firstChild.nextSibling is None
and root.firstChild.previousSibling is None
, "testNormalizeDeleteWithNextSibling -- result")
doc.unlink()
def testNormalizeDeleteWithTwoNonTextSiblings(self):
doc = parseString("<doc/>")
root = doc.documentElement
root.appendChild(doc.createElement("i"))
root.appendChild(doc.createTextNode(""))
root.appendChild(doc.createElement("i"))
self.confirm(len(root.childNodes) == 3
and root.childNodes.length == 3,
"testNormalizeDeleteWithTwoSiblings -- preparation")
doc.normalize()
self.confirm(len(root.childNodes) == 2
and root.childNodes.length == 2
and root.firstChild is not root.lastChild
and root.firstChild.nextSibling is root.lastChild
and root.firstChild.previousSibling is None
and root.lastChild.previousSibling is root.firstChild
and root.lastChild.nextSibling is None
, "testNormalizeDeleteWithTwoSiblings -- result")
doc.unlink()
def testNormalizeDeleteAndCombine(self):
doc = parseString("<doc/>")
root = doc.documentElement
root.appendChild(doc.createTextNode(""))
root.appendChild(doc.createTextNode("second"))
root.appendChild(doc.createTextNode(""))
root.appendChild(doc.createTextNode("fourth"))
root.appendChild(doc.createTextNode(""))
self.confirm(len(root.childNodes) == 5
and root.childNodes.length == 5,
"testNormalizeDeleteAndCombine -- preparation")
doc.normalize()
self.confirm(len(root.childNodes) == 1
and root.childNodes.length == 1
and root.firstChild is root.lastChild
and root.firstChild.data == "secondfourth"
and root.firstChild.previousSibling is None
and root.firstChild.nextSibling is None
, "testNormalizeDeleteAndCombine -- result")
doc.unlink()
def partinfo(self, uuid=None, devname=None):
"""Read partition info including uuid and filesystem.
You can specify uuid or devname to get the identified partition info.
If no argument provided, all partitions will return.
We read info from /etc/blkid/blkid.tab instead of call blkid command.
REF: http://linuxconfig.org/how-to-retrieve-and-change-partitions-universally-unique-identifier-uuid-on-linux
"""
blks = {}
p = subprocess.Popen(shlex.split('/sbin/blkid'), stdout=subprocess.PIPE, close_fds=True)
p.stdout.read()
p.wait()
# OpenVZ may not have this file
if not os.path.exists('/etc/blkid/blkid.tab'): return None
with open('/etc/blkid/blkid.tab') as f:
for line in f:
dom = parseString(line).documentElement
_fstype = dom.getAttribute('TYPE')
_uuid = dom.getAttribute('UUID')
_devname = dom.firstChild.nodeValue.replace('/dev/', '')
partinfo = {
'name': _devname,
'fstype': _fstype,
'uuid': _uuid,
}
if uuid and uuid == _uuid:
return partinfo
elif devname and devname == _devname:
return partinfo
else:
blks[_devname] = partinfo
if uuid or devname:
return None
else:
return blks
def buildDevSeq(dasXML, fs):
_xmlFile = fs.open(str(dasXML), True)
buff = _xmlFile.read(_xmlFile.sizeOf())
_xmlFile.close()
das = minidom.parseString(buff)
ds = []
deviceAssignmentTypeNodeList = das.getElementsByTagName("deviceassignmenttype")
for node in deviceAssignmentTypeNodeList:
componentid = node.getElementsByTagName("componentid")[0].firstChild.data
assigndeviceid = node.getElementsByTagName("assigndeviceid")[0].firstChild.data
ds.append( CF.DeviceAssignmentType(str(componentid),str(assigndeviceid)) )
return ds