Python xml.etree.ElementTree 模块,ParseError() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用xml.etree.ElementTree.ParseError()。
def _scanDir(self, workspace, dir):
self.__dir = dir
try:
info = ElementTree.fromstring(subprocess.check_output(
["svn", "info", "--xml", dir],
cwd=workspace, universal_newlines=True))
self.__url = info.find('entry/url').text
self.__revision = int(info.find('entry').get('revision'))
self.__repoRoot = info.find('entry/repository/root').text
self.__repoUuid = info.find('entry/repository/uuid').text
status = subprocess.check_output(["svn", "status", dir],
cwd=workspace, universal_newlines=True)
self.__dirty = status != ""
except subprocess.CalledProcessError as e:
raise BuildError("Svn audit failed: " + str(e))
except OSError as e:
raise BuildError("Error calling git: " + str(e))
except ElementTree.ParseError as e:
raise BuildError("Invalid XML received from svn")
def __init__(self, response, _message=None):
status = response.status
reason = response.reason
body = response.body.read()
try:
detail = XML(body).findtext("./messages/msg")
except ParseError as err:
detail = body
message = "HTTP %d %s%s" % (
status, reason, "" if detail is None else " -- %s" % detail)
Exception.__init__(self, _message or message)
self.status = status
self.reason = reason
self.headers = response.headers
self.body = body
self._response = response
def __init__(self, response, _message=None):
status = response.status
reason = response.reason
body = response.body.read()
try:
detail = XML(body).findtext("./messages/msg")
except ParseError as err:
detail = body
message = "HTTP %d %s%s" % (
status, reason, "" if detail is None else " -- %s" % detail)
Exception.__init__(self, _message or message)
self.status = status
self.reason = reason
self.headers = response.headers
self.body = body
self._response = response
def parse_XML(self, xmlfile, params, skip_validation=False, queue=None,
submit_jobs=True, completion_mail=True, search_path="",
user_override_file=None, keep_temp=False, release_jobs=True,
force_conditional_steps=False, delay=None, email_address=None,
error_email_address=None, walltime_multiplier=1,
write_pipeline_files=False,
tool_exec_mode=ToolExecModes.BATCH_STANDARD):
try:
self._parse_XML(xmlfile, params, skip_validation, queue, submit_jobs,
completion_mail, search_path, user_override_file,
keep_temp, release_jobs, force_conditional_steps,
delay, email_address, error_email_address,
walltime_multiplier, write_pipeline_files,
tool_exec_mode)
except civet_exceptions.ParseError as e:
print("\nError parsing XML: {}".format(e), file=sys.stderr)
sys.exit(1)
except civet_exceptions.MissingFile as e:
print(e, file=sys.stderr)
sys.exit(1)
def file(self, e):
atts = e.attrib
id = atts['id']
# Ensure that the id is unique.
if id in self.options:
raise civet_exceptions.ParseError("{}: file id duplicates an option"
"name: ".format(os.path.basename(self.xml_file), self.id))
if id in self.tool_files:
raise civet_exceptions.ParseError("{}: file id is a duplicate: {}".format(os.path.basename(self.xml_file), self.id))
PipelineFile.parse_xml(e, self.tool_files)
# Track all the tool temporary files, so that we can
# delete them at the end of the tool's execution.
if self.tool_files[id].is_temp:
self.tempfile_ids.append(id)
def xml_to_dict(data):
try:
root = ET.fromstring(data)
except ET.ParseError:
root = []
d = {}
for item in root:
dd = {}
for subitem in item:
m = {}
m['text'] = subitem.text
m.update(subitem.attrib)
dd[subitem.tag] = m
d[dd['title']['text']] = dd
return d
def iron_claw_warrior_xml_files(filepath):
"""Validate Warrior xml files (Testcase/Testsuite/Project) against
their xsd schema files """
try:
root = xml_Utils.getRoot(filepath)
except ElementTree.ParseError, err:
print_error("PARSING ERROR:{0}".format(err))
return False
ironclaw_object = IronClaw()
if root.tag == 'Testcase':
result = ironclaw_object.testcase_prerun(filepath)
if root.tag == 'TestSuite':
result = ironclaw_object.testsuite_prerun(filepath, root)
if root.tag == 'Project':
result = ironclaw_object.project_prerun(filepath, root)
return result
def ProcessXml(self, xml_str):
"""Parses XML string and returns object representation of relevant info.
Args:
xml_str: The XML string.
Returns:
A list of Backend object containg information about backends from the XML.
Raises:
AppEngineConfigException: In case of malformed XML or illegal inputs.
"""
try:
self.backends = []
self.errors = []
xml_root = ElementTree.fromstring(xml_str)
for child in xml_root.getchildren():
self.ProcessBackendNode(child)
if self.errors:
raise AppEngineConfigException('\n'.join(self.errors))
return self.backends
except ElementTree.ParseError:
raise AppEngineConfigException('Bad input -- not valid XML')
def ProcessXml(self, xml_str):
"""Parses XML string and returns object representation of relevant info.
Args:
xml_str: The XML string.
Returns:
A list of Backend object containg information about backends from the XML.
Raises:
AppEngineConfigException: In case of malformed XML or illegal inputs.
"""
try:
self.backends = []
self.errors = []
xml_root = ElementTree.fromstring(xml_str)
for child in xml_root.getchildren():
self.ProcessBackendNode(child)
if self.errors:
raise AppEngineConfigException('\n'.join(self.errors))
return self.backends
except ElementTree.ParseError:
raise AppEngineConfigException('Bad input -- not valid XML')
def handleXMLContentFromMessage(content):
rootNode = None
if len(content) < 5:
print ('It is not valid xml content:' + str(content))
return rootNode
else:
if content[0:5] != '<msg>':
content = '\n'.join(content.split('\n',2)[1:])
try:
rootNode = ET.fromstring(content)
except ET.ParseError as args:
#print ('It is not valid xml content (' , args,'):\n',content)
rootNode = None
return rootNode
def get_status_xml(self, command):
"""Get status XML via HTTP and return it as XML ElementTree."""
# Get XML structure via HTTP get
res = requests.get("http://{host}{command}".format(
host=self._host, command=command), timeout=self.timeout)
# Continue with XML processing only if HTTP status code = 200
if res.status_code == 200:
try:
# Return XML ElementTree
return ET.fromstring(res.text)
except ET.ParseError:
_LOGGER.error(
"Host %s returned malformed XML for: %s",
self._host, command)
raise ValueError
else:
_LOGGER.error((
"Host %s returned HTTP status code %s "
"when trying to receive data"), self._host, res.status_code)
raise ValueError
def guess_format(config):
try:
json.loads(config)
return 'json'
except ValueError:
pass
try:
ElementTree.fromstring(config)
return 'xml'
except ElementTree.ParseError:
pass
if config.startswith('set') or config.startswith('delete'):
return 'set'
return 'text'
def _get_py3_cls():
"""Python 3.3 hides the pure Python code but defusedxml requires it.
The code is based on test.support.import_fresh_module().
"""
pymodname = "xml.etree.ElementTree"
cmodname = "_elementtree"
pymod = sys.modules.pop(pymodname, None)
cmod = sys.modules.pop(cmodname, None)
sys.modules[cmodname] = None
pure_pymod = importlib.import_module(pymodname)
if cmod is not None:
sys.modules[cmodname] = cmod
else:
sys.modules.pop(cmodname)
sys.modules[pymodname] = pymod
_XMLParser = pure_pymod.XMLParser
_iterparse = pure_pymod.iterparse
ParseError = pure_pymod.ParseError
return _XMLParser, _iterparse, ParseError
def error(xml):
"""
Test error handling.
>>> issubclass(ET.ParseError, SyntaxError)
True
>>> error("foo").position
(1, 0)
>>> error("<tag>&foo;</tag>").position
(1, 5)
>>> error("foobar<").position
(1, 6)
"""
try:
ET.XML(xml)
except ET.ParseError:
return sys.exc_info()[1]
def __init__(self, response, _message=None):
status = response.status
reason = response.reason
body = response.body.read()
try:
detail = XML(body).findtext("./messages/msg")
except ParseError as err:
detail = body
message = "HTTP %d %s%s" % (
status, reason, "" if detail is None else " -- %s" % detail)
Exception.__init__(self, _message or message)
self.status = status
self.reason = reason
self.headers = response.headers
self.body = body
self._response = response
def find_file_in_zip(zip_file):
'''Returns the twb/tds file from a Tableau packaged file format. Packaged
files can contain cache entries which are also valid XML, so only look for
files with a .tds or .twb extension.
'''
candidate_files = filter(lambda x: x.split('.')[-1] in ('twb', 'tds'),
zip_file.namelist())
for filename in candidate_files:
with zip_file.open(filename) as xml_candidate:
try:
ET.parse(xml_candidate)
return filename
except ET.ParseError:
# That's not an XML file by gosh
pass
def check_encoding(encoding):
"""
>>> check_encoding("ascii")
>>> check_encoding("us-ascii")
>>> check_encoding("iso-8859-1")
>>> check_encoding("iso-8859-15")
>>> check_encoding("cp437")
>>> check_encoding("mac-roman")
>>> check_encoding("gbk")
Traceback (most recent call last):
ValueError: multi-byte encodings are not supported
>>> check_encoding("cp037")
Traceback (most recent call last):
ParseError: unknown encoding: line 1, column 30
"""
ET.XML("<?xml version='1.0' encoding='%s'?><xml />" % encoding)
def error(xml):
"""
Test error handling.
>>> issubclass(ET.ParseError, SyntaxError)
True
>>> error("foo").position
(1, 0)
>>> error("<tag>&foo;</tag>").position
(1, 5)
>>> error("foobar<").position
(1, 6)
"""
try:
ET.XML(xml)
except ET.ParseError:
return sys.exc_value
def check_encoding(encoding):
"""
>>> check_encoding("ascii")
>>> check_encoding("us-ascii")
>>> check_encoding("iso-8859-1")
>>> check_encoding("iso-8859-15")
>>> check_encoding("cp437")
>>> check_encoding("mac-roman")
>>> check_encoding("gbk")
Traceback (most recent call last):
ValueError: multi-byte encodings are not supported
>>> check_encoding("cp037")
Traceback (most recent call last):
ParseError: unknown encoding: line 1, column 30
"""
ET.XML("<?xml version='1.0' encoding='%s'?><xml />" % encoding)
def error(xml):
"""
Test error handling.
>>> issubclass(ET.ParseError, SyntaxError)
True
>>> error("foo").position
(1, 0)
>>> error("<tag>&foo;</tag>").position
(1, 5)
>>> error("foobar<").position
(1, 6)
"""
try:
ET.XML(xml)
except ET.ParseError:
return sys.exc_value
def __init__(self, response, _message=None):
status = response.status
reason = response.reason
body = response.body.read()
try:
detail = XML(body).findtext("./messages/msg")
except ParseError as err:
detail = body
message = "HTTP %d %s%s" % (
status, reason, "" if detail is None else " -- %s" % detail)
Exception.__init__(self, _message or message)
self.status = status
self.reason = reason
self.headers = response.headers
self.body = body
self._response = response
def package_analysis(iface, scope):
FNULL = open(os.devnull, "w")
output = open(scope.id + ".xml", "w")
try:
subprocess.call(["cppcheck", "--xml-version=2", "--enable=all",
"--rule-file=" + iface.get_file("rules.xml"),
scope.path
], stdout=FNULL, stderr=output)
finally:
FNULL.close()
output.close()
files = file_mapping(scope)
try:
xml = ET.parse(scope.id + ".xml").getroot()
errors = xml.find("errors")
for error in errors:
handle_report(iface, files, error)
except ET.ParseError as e:
pass
def __init__(self, response, _message=None):
status = response.status
reason = response.reason
body = response.body.read()
try:
detail = XML(body).findtext("./messages/msg")
except ParseError as err:
detail = body
message = "HTTP %d %s%s" % (
status, reason, "" if detail is None else " -- %s" % detail)
Exception.__init__(self, _message or message)
self.status = status
self.reason = reason
self.headers = response.headers
self.body = body
self._response = response
def __init__(self, response, _message=None):
status = response.status
reason = response.reason
body = response.body.read()
try:
detail = XML(body).findtext("./messages/msg")
except ParseError as err:
detail = body
message = "HTTP %d %s%s" % (
status, reason, "" if detail is None else " -- %s" % detail)
Exception.__init__(self, _message or message)
self.status = status
self.reason = reason
self.headers = response.headers
self.body = body
self._response = response
def __init__(self, response, _message=None):
status = response.status
reason = response.reason
body = response.body.read()
try:
detail = XML(body).findtext("./messages/msg")
except ParseError as err:
detail = body
message = "HTTP %d %s%s" % (
status, reason, "" if detail is None else " -- %s" % detail)
Exception.__init__(self, _message or message)
self.status = status
self.reason = reason
self.headers = response.headers
self.body = body
self._response = response
def __init__(self, response, _message=None):
status = response.status
reason = response.reason
body = response.body.read()
try:
detail = XML(body).findtext("./messages/msg")
except ParseError as err:
detail = body
message = "HTTP %d %s%s" % (
status, reason, "" if detail is None else " -- %s" % detail)
Exception.__init__(self, _message or message)
self.status = status
self.reason = reason
self.headers = response.headers
self.body = body
self._response = response
def __init__(self, response, _message=None):
status = response.status
reason = response.reason
body = response.body.read()
try:
detail = XML(body).findtext("./messages/msg")
except ParseError as err:
detail = body
message = "HTTP %d %s%s" % (
status, reason, "" if detail is None else " -- %s" % detail)
Exception.__init__(self, _message or message)
self.status = status
self.reason = reason
self.headers = response.headers
self.body = body
self._response = response
def __init__(self, response, _message=None):
status = response.status
reason = response.reason
body = response.body.read()
try:
detail = XML(body).findtext("./messages/msg")
except ParseError as err:
detail = body
message = "HTTP %d %s%s" % (
status, reason, "" if detail is None else " -- %s" % detail)
Exception.__init__(self, _message or message)
self.status = status
self.reason = reason
self.headers = response.headers
self.body = body
self._response = response
def _process_ilimodels(self, file, netloc):
'''
Parses ilimodels.xml provided in ``file`` and updates the local repositories cache.
'''
try:
root = ET.parse(file).getroot()
except ET.ParseError as e:
QgsMessageLog.logMessage(self.tr('Could not parse ilimodels file `{file}` ({exception})'.format(file=file, exception=str(e))), self.tr('Projectgenerator'))
return
self.repositories[netloc] = list()
repo_models = list()
for repo in root.iter('{http://www.interlis.ch/INTERLIS2.3}IliRepository09.RepositoryIndex'):
for model_metadata in repo.findall('ili23:IliRepository09.RepositoryIndex.ModelMetadata', self.ns):
model = dict()
model['name'] = model_metadata.find('ili23:Name', self.ns).text
model['version'] = model_metadata.find('ili23:Version', self.ns).text
repo_models.append(model)
self.repositories[netloc] = sorted(repo_models, key=lambda m: m['version'], reverse=True)
self.models_changed.emit()
def ProcessXml(self, xml_str):
"""Parses XML string and returns object representation of relevant info.
Args:
xml_str: The XML string.
Returns:
A list of Backend object containg information about backends from the XML.
Raises:
AppEngineConfigException: In case of malformed XML or illegal inputs.
"""
try:
self.backends = []
self.errors = []
xml_root = ElementTree.fromstring(xml_str)
for child in xml_root.getchildren():
self.ProcessBackendNode(child)
if self.errors:
raise AppEngineConfigException('\n'.join(self.errors))
return self.backends
except ElementTree.ParseError:
raise AppEngineConfigException('Bad input -- not valid XML')
def __init__(self, response, _message=None):
status = response.status
reason = response.reason
body = response.body.read()
try:
detail = XML(body).findtext("./messages/msg")
except ParseError as err:
detail = body
message = "HTTP %d %s%s" % (
status, reason, "" if detail is None else " -- %s" % detail)
Exception.__init__(self, _message or message)
self.status = status
self.reason = reason
self.headers = response.headers
self.body = body
self._response = response
def get_device_name(ip_addr):
""" get the device friendly name for an IP address """
try:
conn = httplib.HTTPConnection(ip_addr + ":8008")
conn.request("GET", "/ssdp/device-desc.xml")
resp = conn.getresponse()
if resp.status == 200:
status_doc = resp.read()
try:
xml = ElementTree.fromstring(status_doc)
device_element = xml.find("{urn:schemas-upnp-org:device-1-0}" + "device")
return device_element.find("{urn:schemas-upnp-org:device-1-0}" + "friendlyName").text
except ElementTree.ParseError:
return ""
else:
return ""
except:
# unable to get a name - this might be for many reasons
# e.g. a non chromecast device on the network that responded to the search
return ""
def check_encoding(encoding):
"""
>>> check_encoding("ascii")
>>> check_encoding("us-ascii")
>>> check_encoding("iso-8859-1")
>>> check_encoding("iso-8859-15")
>>> check_encoding("cp437")
>>> check_encoding("mac-roman")
>>> check_encoding("gbk")
Traceback (most recent call last):
ValueError: multi-byte encodings are not supported
>>> check_encoding("cp037")
Traceback (most recent call last):
ParseError: unknown encoding: line 1, column 30
"""
ET.XML("<?xml version='1.0' encoding='%s'?><xml />" % encoding)
def error(xml):
"""
Test error handling.
>>> issubclass(ET.ParseError, SyntaxError)
True
>>> error("foo").position
(1, 0)
>>> error("<tag>&foo;</tag>").position
(1, 5)
>>> error("foobar<").position
(1, 6)
"""
try:
ET.XML(xml)
except ET.ParseError:
return sys.exc_value
def feed(url, delay=1, cached=False, headers={'User-Agent': 'Grasp.py'}):
s = download(url, headers=headers, delay=delay, cached=cached)
for f in (rss, atom):
try:
for r in f(s):
yield r
except ElementTree.ParseError as e: # HTML?
pass
except:
pass
# for story in feed('http://feeds.washingtonpost.com/rss/world'):
# print(story)
#---- MAIL ----------------------------------------------------------------------------------------
# The mail() function sends a HTML-formatted e-mail from a Gmail account.
def error(xml):
"""
Test error handling.
>>> issubclass(ET.ParseError, SyntaxError)
True
>>> error("foo").position
(1, 0)
>>> error("<tag>&foo;</tag>").position
(1, 5)
>>> error("foobar<").position
(1, 6)
"""
try:
ET.XML(xml)
except ET.ParseError:
return sys.exc_value
def onResolveEmbedded(self):
item = self.currentItem()
QApplication.setOverrideCursor(Qt.WaitCursor)
uri = item.data(1, Qt.UserRole)
try:
f = remote_open_from_qgis(uri)
try:
doc, ns_map = xml_parse(f)
except ET.ParseError:
# probably not an XML
QApplication.restoreOverrideCursor()
QMessageBox.warning(self, "XML parsing error", "The external resource is not a well formed XML")
return
ns_imap = {}
for k, v in ns_map.items():
ns_imap[v] = k
fill_tree_with_element(self, item.parent(), doc.getroot(), ns_imap, get_custom_viewers(), ns_map)
except RuntimeError as e:
QApplication.restoreOverrideCursor()
QMessageBox.warning(self, "Network error", e.args[0])
return
finally:
QApplication.restoreOverrideCursor()
def ProcessXml(self, xml_str):
"""Parses XML string and returns object representation of relevant info.
Args:
xml_str: The XML string.
Returns:
A list of Backend object containg information about backends from the XML.
Raises:
AppEngineConfigException: In case of malformed XML or illegal inputs.
"""
try:
self.backends = []
self.errors = []
xml_root = ElementTree.fromstring(xml_str)
for child in xml_root.getchildren():
self.ProcessBackendNode(child)
if self.errors:
raise AppEngineConfigException('\n'.join(self.errors))
return self.backends
except ElementTree.ParseError:
raise AppEngineConfigException('Bad input -- not valid XML')
def __init__(self, response, _message=None):
status = response.status
reason = response.reason
body = response.body.read()
try:
detail = XML(body).findtext("./messages/msg")
except ParseError as err:
detail = body
message = "HTTP %d %s%s" % (
status, reason, "" if detail is None else " -- %s" % detail)
Exception.__init__(self, _message or message)
self.status = status
self.reason = reason
self.headers = response.headers
self.body = body
self._response = response
def ProcessXml(self, xml_str):
"""Parses XML string and returns object representation of relevant info.
Args:
xml_str: The XML string.
Returns:
A list of Backend object containg information about backends from the XML.
Raises:
AppEngineConfigException: In case of malformed XML or illegal inputs.
"""
try:
self.backends = []
self.errors = []
xml_root = ElementTree.fromstring(xml_str)
for child in xml_root.getchildren():
self.ProcessBackendNode(child)
if self.errors:
raise AppEngineConfigException('\n'.join(self.errors))
return self.backends
except ElementTree.ParseError:
raise AppEngineConfigException('Bad input -- not valid XML')
def test_load_ui_invalidxml():
"""Tests to see if loadUi successfully fails on invalid ui files"""
import sys
invalid_xml = os.path.join(self.tempdir, "invalid.ui")
with io.open(invalid_xml, "w", encoding="utf-8") as f:
f.write(u"""
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0" garbage
</ui>
""")
from xml.etree import ElementTree
from Qt import QtWidgets, QtCompat
app = QtWidgets.QApplication(sys.argv)
assert_raises(ElementTree.ParseError, QtCompat.loadUi, invalid_xml)
app.exit()
def __init__(self, response, _message=None):
status = response.status
reason = response.reason
body = response.body.read()
try:
detail = XML(body).findtext("./messages/msg")
except ParseError as err:
detail = body
message = "HTTP %d %s%s" % (
status, reason, "" if detail is None else " -- %s" % detail)
Exception.__init__(self, _message or message)
self.status = status
self.reason = reason
self.headers = response.headers
self.body = body
self._response = response
def validatenmapxml(phase1):
for file in os.listdir(phase1):
file = os.path.join(phase1, file)
files_to_parse = ParseConfig().files_to_parse
try:
ET.parse(file)
logging.debug("Adding {f} to list of files to parse"
.format(f=file))
files_to_parse.append(file)
except ParseError:
logging.warning("{f} is malformed or not an xml".format(f=file))
print("{e} {f} is malformed or not an xml".format(e=cterr, f=file))
pass
except IOError:
# File is a directory.
pass
return files_to_parse
def location(self):
"""
????????????
"""
try:
ret = ETree.fromstring(self.raw['OriContent']).find('location').attrib
try:
ret['x'] = float(ret['x'])
ret['y'] = float(ret['y'])
ret['scale'] = int(ret['scale'])
ret['maptype'] = int(ret['maptype'])
except (KeyError, ValueError):
pass
return ret
except (TypeError, KeyError, ValueError, ETree.ParseError):
pass
# chats
def __parse(self, file):
print "[Bonjour.__parse] parsing %s%s" %(self.AVAHI_SERVICES_DIR, file)
try:
config = cet_parse(self.AVAHI_SERVICES_DIR + file).getroot()
except ParseError: #parsing failed, skip the file
return
name = config.find('name').text
service = config.find('service')
type = service.find('type').text
port = service.find('port').text
text = service.findall('txt-record')
textList = []
if text != None:
for txt in text:
textList.append(txt.text)
service = self.buildServiceFull(file, name, type, port, textList)
self.registerService(service)
def __init__(self, response, _message=None):
status = response.status
reason = response.reason
body = response.body.read()
try:
detail = XML(body).findtext("./messages/msg")
except ParseError as err:
detail = body
message = "HTTP %d %s%s" % (
status, reason, "" if detail is None else " -- %s" % detail)
Exception.__init__(self, _message or message)
self.status = status
self.reason = reason
self.headers = response.headers
self.body = body
self._response = response
def fetch_user_mal(self, name, url, cmd):
with aiohttp.ClientSession() as session:
async with session.get(url.format(name)) as response:
data = await response.text()
try:
root = ET.fromstring(data)
except ET.ParseError:
return '', ''
else:
if len(root) == 0:
return '', ''
collection = {x.find('series_title').text for x in root.findall(cmd)}
entry = root.find('myinfo')
if cmd == "anime":
info = [entry.find(x).text for x in ['user_watching', 'user_completed',
'user_onhold', 'user_dropped',
'user_days_spent_watching']]
return collection, info
else:
info = [entry.find(x).text for x in ['user_reading', 'user_completed',
'user_onhold', 'user_dropped',
'user_days_spent_watching']]
return collection, info
def get_filename(cls, item):
# we reparse formatted item to get filename from <meta name="filename"> element
# this way we are sure that we have the exact same filename
try:
xml = ET.fromstring(item['formatted_item'])
except (KeyError, ET.ParseError) as e:
filename = None
logger.error("Error on parsing, can't get filename: {}".format(e))
else:
filename = xml.find('head/meta[@name="filename"]').attrib['content']
if not filename:
return super(NTBPublishService, cls).get_filename(item)
return filename
def parseDemultiplexConfig( demultiplexConfigFile ):
"""Parses the DemultiplexConfig file for the version of casava used
Output is the details of the software in the format:
[softwareName, softwareVersion, softwareArguements]
[bcl2fastq, 1.8.4, "cluster/gsu/data/hiseq/140506_SN7001398_0099_BC4362ACXX/Data/Intensities/BaseCalls --output-dir /cluster/gsu/data/processed/hiseq/140506_SN7001398_0099_BC4362ACXX/ --sample-sheet /homes/gsupipe-x/SampleSheets/140506_SN7001398_0099_BC4362ACXX/SampleSheet_revComp_edited.csv --use-bases-mask Y*,I8n*,Y*'"]
"""
casavaDetails = []
try:
print "Parsing Run Parameters"
demultiplexConfigXML = ElementTree.parse(demultiplexConfigFile)
except IOError:
print "Cannot load information from %s" % demultiplexConfigFile
raise
except ElementTree.ParseError:
print "Invalid XML in %s" % demultiplexConfigFile
raise
for softwareDetails in demultiplexConfigXML.iterfind("Software"):
versionString = softwareDetails.attrib["Version"]
commandArgs = softwareDetails.attrib["CmdAndArgs"]
casavaDetails.append( versionString.split("-")[0] )
casavaDetails.append( versionString.split("-")[1] )
casavaDetails.append( commandArgs )
return casavaDetails
# for line in demultiplexConfigFile:
def parse_new_asx(data):
# Copied from mopidy.audio.playlists
try:
for _, element in elementtree.iterparse(data):
element.tag = element.tag.lower() # normalize
for ref in element.findall('entry/ref[@href]'):
yield fix_asf_uri(ref.get('href', '').strip())
for entry in element.findall('entry[@href]'):
yield fix_asf_uri(entry.get('href', '').strip())
except elementtree.ParseError:
return