def generate_datafile(lists_of_systems, output_dir, filename): """ take in a list of lists which contains systems generate one input data file per list """ result = [] for index, list_of_sys in enumerate(lists_of_systems): output_filename = filename + "_" + str(index) + ".xml" output_file = os.path.join(output_dir, output_filename) fd = file_Utils.open_file(output_file, "w+") if fd is not None: root = xml_Utils.create_element("root") for system in list_of_sys: root.append(system) fd.write(xml_Utils.convert_element_to_string(root)) result.append(output_file) return result
def handle_elif(self, span, cond): '''Should be called to signalize an elif directive. Args: span (tuple of int): Start and end line of the directive. cond (str): String representation of the branching condition. ''' self._check_for_open_block(span, 'elif') block = self._open_blocks[-1] directive, _, spans = block[0:3] self._check_if_matches_last(directive, 'if', spans[-1], span, 'elif') conds, contents = block[3:5] conds.append(cond) contents.append(self._curnode) spans.append(span) self._curnode = []
def handle_enddef(self, span, name): '''Should be called to signalize an enddef directive. Args: span (tuple of int): Start and end line of the directive. name (str): Name of the enddef statement. Could be None, if enddef was specified without name. ''' self._check_for_open_block(span, 'enddef') block = self._open_blocks.pop(-1) directive, fname, spans = block[0:3] self._check_if_matches_last(directive, 'def', spans[-1], span, 'enddef') defname, argexpr, dummy = block[3:6] if name is not None and name != defname: msg = "wrong name in enddef directive "\ "(expected '{0}', got '{1}')".format(defname, name) raise FyppFatalError(msg, fname, span) spans.append(span) block = (directive, fname, spans, defname, argexpr, self._curnode) self._curnode = self._path.pop(-1) self._curnode.append(block)
def handle_nextarg(self, span, name): '''Should be called to signalize a nextarg directive. Args: span (tuple of int): Start and end line of the directive. name (str or None): Name of the argument following next or None if it should be the next positional argument. ''' self._check_for_open_block(span, 'nextarg') block = self._open_blocks[-1] directive, fname, spans = block[0:3] self._check_if_matches_last( directive, 'call', spans[-1], span, 'nextarg') args, argnames = block[5:7] args.append(self._curnode) spans.append(span) if name is not None: argnames.append(name) elif argnames: msg = 'non-keyword argument following keyword argument' raise FyppFatalError(msg, fname, span) self._curnode = []
def _get_called_content(self, fname, spans, name, argexpr, contents, argnames): posargs, kwargs = self._get_call_arguments(fname, spans, argexpr, contents, argnames) try: callobj = self._evaluate(name, fname, spans[0][0]) result = callobj(*posargs, **kwargs) except Exception as exc: msg = "exception occured when calling '{0}'".format(name) raise FyppFatalError(msg, fname, spans[0], exc) self._update_predef_globals(fname, spans[0][0]) span = (spans[0][0], spans[-1][1]) out = [] ieval = [] peval = [] if result is not None: out = [str(result)] if not self._diverted: ieval = [0] peval = [(span, fname)] if span[0] != span[1]: out.append('\n') return out, ieval, peval
def import_module(self, module): '''Import a module into the evaluator. Note: Import only trustworthy modules! Module imports are global, therefore, importing a malicious module which manipulates other global modules could affect code behaviour outside of the Evaluator as well. Args: module (str): Python module to import. Raises: FyppFatalError: If module could not be imported. ''' rootmod = module.split('.', 1)[0] try: imported = __import__(module, self._scope) self.define(rootmod, imported) except Exception as exc: msg = "failed to import module '{0}'".format(module) raise FyppFatalError(msg, cause=exc)
def addglobal(self, name): '''Define a given entity as global. Args: name (str): Name of the entity to make global. Raises: FyppFatalError: If entity name is invalid or if the current scope is a local scope and entity is already defined in it. ''' varnames = self._get_variable_names(name) for varname in varnames: self._check_variable_name(varname) if self._locals is not None: if varname in self._locals: msg = "variable '{0}' already defined in local scope"\ .format(varname) raise FyppFatalError(msg) self._globalrefs.add(varname)
def _formatted_exception(exc): error_header_formstr = '{file}:{line}: ' error_body_formstr = 'error: {errormsg} [{errorclass}]' if not isinstance(exc, FyppError): return error_body_formstr.format( errormsg=str(exc), errorclass=exc.__class__.__name__) out = [] if exc.fname is not None: if exc.span[1] > exc.span[0] + 1: line = '{0}-{1}'.format(exc.span[0] + 1, exc.span[1]) else: line = '{0}'.format(exc.span[0] + 1) out.append(error_header_formstr.format(file=exc.fname, line=line)) out.append(error_body_formstr.format(errormsg=exc.msg, errorclass=exc.__class__.__name__)) if exc.cause is not None: out.append('\n' + _formatted_exception(exc.cause)) out.append('\n') return ''.join(out)
def test_bytes(self): """test b() helper, bytes and native str type""" if PY3: import builtins self.assertIs(bytes, builtins.bytes) else: import __builtin__ as builtins self.assertIs(bytes, builtins.str) self.assertIsInstance(b'', bytes) self.assertIsInstance(b'\x00\xff', bytes) if PY3: self.assertEqual(b'\x00\xff'.decode("latin-1"), "\x00\xff") else: self.assertEqual(b'\x00\xff', "\x00\xff")
def write(self, x): old_f.write(x.replace("\n", " [%s]\n" % str(datetime.now())))
def underline_print(string): under_str = "\033[4m%s\033[0m" % (str(string),) return under_str
def str2bool(value): """ Tries to transform a string supposed to represent a boolean to a boolean. Parameters ---------- value : str The string that is transformed to a boolean. Returns ------- boolval : bool The boolean representation of `value`. Raises ------ ValueError If the string is not 'True' or 'False' (case independent) Examples -------- >>> np.lib._iotools.str2bool('TRUE') True >>> np.lib._iotools.str2bool('false') False """ value = value.upper() if value == asbytes('TRUE'): return True elif value == asbytes('FALSE'): return False else: raise ValueError("Invalid boolean")
def convert2type(value, data_type='str'): """Convert value to data_type and return value in that data_type Currently supported are str/int/float only """ type_funcs = {'str': str, 'int': int, 'float': float} convert = type_funcs[data_type] cvalue = value try: cvalue = convert(value) except ValueError: print_error("'{}' should be of type {}, please correct".format(value, data_type)) except Exception as exception: print_exception(exception) return cvalue
def verify_resp_inorder(match_list, context_list, command, response, varconfigfile=None, verify_on_list=None, verify_list=None, remote_resp_dict=None, verify_group=None): """ Method for in-order search. Verifies the 'search strings' in the system response and also verifies whether they are in order or not """ msg = ("In-order verification requested for the command: " "'{0}' ".format(command)) testcase_Utils.pNote(msg, "debug") if varconfigfile and varconfigfile is not None: match_list = string_Utils.sub_from_varconfig(varconfigfile, match_list) status = True if isinstance(verify_list, str): verify_list = verify_list.split(",") resp_details_dict = _get_resp_details(match_list, context_list, verify_on_list, verify_list, remote_resp_dict) for system in resp_details_dict: if resp_details_dict[system]: sys_status = verify_inorder_cmd_response(match_list, verify_list, system, command, resp_details_dict[system], verify_group) else: pNote("Verification can not be done for the system : " "{0}".format(system), "error") sys_status = "ERROR" status = "ERROR" if sys_status == "ERROR" else status and sys_status return status
def get_filepath_from_system(datafile, system_name, *args): """ This function takes variable number of tag names as input and returns a list of absolute paths for all the tag values. In case of non-string tag values, it will simply return those non-string values. """ abspath_lst = [] credentials = get_credentials(datafile, system_name, args) start_directory = os.path.dirname(datafile) for tag in args: if isinstance(credentials[tag], str) or credentials[tag] is False: abspath = file_Utils.getAbsPath(credentials[tag], start_directory) if abspath: if os.path.isfile(abspath): abspath_lst.append(abspath) else: print_warning("File '{0}' provided for tag '{1}' does not " "exist".format(abspath, tag)) abspath_lst.append(None) else: abspath_lst.append(None) else: abspath_lst.append(credentials[tag]) return abspath_lst
def get_iteration_syslist(system_node_list, system_name_list): """ Takes a list of system nodes and system names and returns. 1. List of system names with iter=yes 2. List of system nodes with iter=yes """ iteration_sysnamelist = [] iteration_sysnodelist = [] for i in range(0, len(system_node_list)): system = system_node_list[i] iter_flag = system.get("iter", None) if iter_flag is None: iter_flag = xml_Utils.get_text_from_direct_child(system, "iter") iter_flag = sub_from_env_var(iter_flag) iter_flag = sub_from_data_repo(iter_flag) if str(iter_flag).lower() == "no": pass else: system_name = system_name_list[i] if not system_name: pNote("No name provided for system/susbsystem in datafile", "error") else: iteration_sysnamelist.append(system_name) iteration_sysnodelist.append(system) return iteration_sysnamelist, iteration_sysnodelist
def setUp(self): self.s = TEST_UNICODE_STR self.s2 = str(self.s) self.b = b'ABCDEFG' self.b2 = bytes(self.b)
def test_native_str(self): """ Tests whether native_str is really equal to the platform str. """ if PY2: import __builtin__ builtin_str = __builtin__.str else: import builtins builtin_str = builtins.str inputs = [b'blah', u'blah', 'blah'] for s in inputs: self.assertEqual(native_str(s), builtin_str(s)) self.assertTrue(isinstance(native_str(s), builtin_str))
def test_native(self): a = int(10**20) # long int b = native(a) self.assertEqual(a, b) if PY2: self.assertEqual(type(b), long) else: self.assertEqual(type(b), int) c = bytes(b'ABC') d = native(c) self.assertEqual(c, d) if PY2: self.assertEqual(type(d), type(b'Py2 byte-string')) else: self.assertEqual(type(d), bytes) s = str(u'ABC') t = native(s) self.assertEqual(s, t) if PY2: self.assertEqual(type(t), unicode) else: self.assertEqual(type(t), str) d1 = dict({'a': 1, 'b': 2}) d2 = native(d1) self.assertEqual(d1, d2) self.assertEqual(type(d2), type({}))
def test_raise_(self): """ The with_value() test currently fails on Py3 """ def valerror(): try: raise ValueError("Apples!") except Exception as e: raise_(e) self.assertRaises(ValueError, valerror) def with_value(): raise_(IOError, "This is an error") self.assertRaises(IOError, with_value) try: with_value() except IOError as e: self.assertEqual(str(e), "This is an error") def with_traceback(): try: raise ValueError("An error") except Exception as e: _, _, traceback = sys.exc_info() raise_(IOError, str(e), traceback) self.assertRaises(IOError, with_traceback) try: with_traceback() except IOError as e: self.assertEqual(str(e), "An error")
def test_ensure_new_type(self): s = u'abcd' s2 = str(s) self.assertEqual(ensure_new_type(s), s2) self.assertEqual(type(ensure_new_type(s)), str) b = b'xyz' b2 = bytes(b) self.assertEqual(ensure_new_type(b), b2) self.assertEqual(type(ensure_new_type(b)), bytes) i = 10000000000000 i2 = int(i) self.assertEqual(ensure_new_type(i), i2) self.assertEqual(type(ensure_new_type(i)), int)
def __str__(self): msg = [self.__class__.__name__, ': '] if self.fname is not None: msg.append("file '" + self.fname + "'") if self.span[1] > self.span[0] + 1: msg.append(', lines {0}-{1}'.format( self.span[0] + 1, self.span[1])) else: msg.append(', line {0}'.format(self.span[0] + 1)) msg.append('\n') if self.msg: msg.append(self.msg) if self.cause is not None: msg.append('\n' + str(self.cause)) return ''.join(msg)
def parsefile(self, fobj): '''Parses file or a file like object. Args: fobj (str or file): Name of a file or a file like object. ''' if isinstance(fobj, str): if fobj == STDIN: self._includefile(None, sys.stdin, STDIN, os.getcwd()) else: inpfp = _open_input_file(fobj) self._includefile(None, inpfp, fobj, os.path.dirname(fobj)) inpfp.close() else: self._includefile(None, fobj, FILEOBJ, os.getcwd())
def parse(self, txt): '''Parses string. Args: txt (str): Text to parse. ''' self._curfile = STRING self._curdir = '' self.handle_include(None, self._curfile) self._parse(txt) self.handle_endinclude(None, self._curfile)
def handle_include(self, span, fname): '''Called when parser starts to process a new file. It is a dummy methond and should be overriden for actual use. Args: span (tuple of int): Start and end line of the include directive or None if called the first time for the main input. fname (str): Name of the file. ''' self._log_event('include', span, filename=fname)
def handle_endinclude(self, span, fname): '''Called when parser finished processing a file. It is a dummy method and should be overriden for actual use. Args: span (tuple of int): Start and end line of the include directive or None if called the first time for the main input. fname (str): Name of the file. ''' self._log_event('endinclude', span, filename=fname)
def handle_def(self, span, name, args): '''Called when parser encounters a def directive. It is a dummy method and should be overriden for actual use. Args: span (tuple of int): Start and end line of the directive. name (str): Name of the macro to be defined. argexpr (str): String with argument definition (or None) ''' self._log_event('def', span, name=name, arguments=args)
def handle_enddef(self, span, name): '''Called when parser encounters an enddef directive. It is a dummy method and should be overriden for actual use. Args: span (tuple of int): Start and end line of the directive. name (str): Name found after the enddef directive. ''' self._log_event('enddef', span, name=name)
def handle_del(self, span, name): '''Called when parser encounters a del directive. It is a dummy method and should be overriden for actual use. Args: span (tuple of int): Start and end line of the directive. name (str): Name of the variable to delete. ''' self._log_event('del', span, name=name)
def handle_if(self, span, cond): '''Called when parser encounters an if directive. It is a dummy method and should be overriden for actual use. Args: span (tuple of int): Start and end line of the directive. cond (str): String representation of the branching condition. ''' self._log_event('if', span, condition=cond)
def handle_elif(self, span, cond): '''Called when parser encounters an elif directive. It is a dummy method and should be overriden for actual use. Args: span (tuple of int): Start and end line of the directive. cond (str): String representation of the branching condition. ''' self._log_event('elif', span, condition=cond)
def handle_call(self, span, name, argexpr): '''Called when parser encounters a call directive. It is a dummy method and should be overriden for actual use. Args: span (tuple of int): Start and end line of the directive. name (str): Name of the callable to call argexpr (str or None): Argument expression containing additional arguments for the call. ''' self._log_event('call', span, name=name, argexpr=argexpr)
def handle_nextarg(self, span, name): '''Called when parser encounters a nextarg directive. It is a dummy method and should be overriden for actual use. Args: span (tuple of int): Start and end line of the directive. name (str or None): Name of the argument following next or None if it should be the next positional argument. ''' self._log_event('nextarg', span, name=name)
def handle_endcall(self, span, name): '''Called when parser encounters an endcall directive. It is a dummy method and should be overriden for actual use. Args: span (tuple of int): Start and end line of the directive. name (str): Name found after the endcall directive. ''' self._log_event('endcall', span, name=name)
def handle_eval(self, span, expr): '''Called when parser encounters an eval directive. It is a dummy method and should be overriden for actual use. Args: span (tuple of int): Start and end line of the directive. expr (str): String representation of the Python expression to be evaluated. ''' self._log_event('eval', span, expression=expr)
def handle_global(self, span, name): '''Called when parser encounters a global directive. It is a dummy method and should be overriden for actual use. Args: span (tuple of int): Start and end line of the directive. name (str): Name of the variable which should be made global. ''' self._log_event('global', span, name=name)
def handle_stop(self, span, msg): '''Called when parser finds an stop directive. It is a dummy method and should be overriden for actual use. Args: span (tuple of int): Start and end line of the directive. msg (str): Stop message. ''' self._log_event('stop', span, msg=msg)
def handle_include(self, span, fname): '''Should be called to signalize change to new file. Args: span (tuple of int): Start and end line of the include directive or None if called the first time for the main input. fname (str): Name of the file to be included. ''' self._path.append(self._curnode) self._curnode = [] self._open_blocks.append( ('include', self._curfile, [span], fname, None)) self._curfile = fname self._nr_prev_blocks.append(len(self._open_blocks))
def handle_endinclude(self, span, fname): '''Should be called when processing of a file finished. Args: span (tuple of int): Start and end line of the include directive or None if called the first time for the main input. fname (str): Name of the file which has been included. ''' nprev_blocks = self._nr_prev_blocks.pop(-1) if len(self._open_blocks) > nprev_blocks: directive, fname, spans = self._open_blocks[-1][0:3] msg = '{0} directive still unclosed when reaching end of file'\ .format(directive) raise FyppFatalError(msg, self._curfile, spans[0]) block = self._open_blocks.pop(-1) directive, blockfname, spans = block[0:3] if directive != 'include': msg = 'internal error: last open block is not \'include\' when '\ 'closing file \'{0}\''.format(fname) raise FyppFatalError(msg) if span != spans[0]: msg = 'internal error: span for include and endinclude differ ('\ '{0} vs {1}'.format(span, spans[0]) raise FyppFatalError(msg) oldfname, _ = block[3:5] if fname != oldfname: msg = 'internal error: mismatching file name in close_file event'\ " (expected: '{0}', got: '{1}')".format(oldfname, fname) raise FyppFatalError(msg, fname) block = directive, blockfname, spans, fname, self._curnode self._curnode = self._path.pop(-1) self._curnode.append(block) self._curfile = blockfname
def handle_if(self, span, cond): '''Should be called to signalize an if directive. Args: span (tuple of int): Start and end line of the directive. param (str): String representation of the branching condition. ''' self._path.append(self._curnode) self._curnode = [] self._open_blocks.append(('if', self._curfile, [span], [cond], []))
def handle_def(self, span, name, argexpr): '''Should be called to signalize a def directive. Args: span (tuple of int): Start and end line of the directive. name (str): Name of the macro to be defined. argexpr (str): Macro argument definition or None ''' self._path.append(self._curnode) self._curnode = [] defblock = ('def', self._curfile, [span], name, argexpr, None) self._open_blocks.append(defblock)
def handle_call(self, span, name, argexpr): '''Should be called to signalize a call directive. Args: span (tuple of int): Start and end line of the directive. name (str): Name of the callable to call argexpr (str or None): Argument expression containing additional arguments for the call. ''' self._path.append(self._curnode) self._curnode = [] self._open_blocks.append( ('call', self._curfile, [span, span], name, argexpr, [], []))
def handle_endcall(self, span, name): '''Should be called to signalize an endcall directive. Args: span (tuple of int): Start and end line of the directive. name (str): Name of the endcall statement. Could be None, if endcall was specified without name. ''' self._check_for_open_block(span, 'endcall') block = self._open_blocks.pop(-1) directive, fname, spans = block[0:3] self._check_if_matches_last(directive, 'call', spans[0], span, 'endcall') callname, callargexpr, args, argnames = block[3:7] if name is not None and name != callname: msg = "wrong name in endcall directive "\ "(expected '{0}', got '{1}')".format(callname, name) raise FyppFatalError(msg, fname, span) args.append(self._curnode) # If nextarg or endcall immediately followed call, then first argument # is empty and should be removed (to allow for calls without arguments # and named first argument in calls) if args and not args[0]: if len(argnames) == len(args): del argnames[0] del args[0] del spans[1] spans.append(span) block = (directive, fname, spans, callname, callargexpr, args, argnames) self._curnode = self._path.pop(-1) self._curnode.append(block)
def handle_global(self, span, name): '''Should be called to signalize a global directive. Args: span (tuple of int): Start and end line of the directive. name (str): Name of the variable(s) to make global. ''' self._curnode.append(('global', self._curfile, span, name))
def handle_del(self, span, name): '''Should be called to signalize a del directive. Args: span (tuple of int): Start and end line of the directive. name (str): Name of the variable(s) to delete. ''' self._curnode.append(('del', self._curfile, span, name))
def handle_eval(self, span, expr): '''Should be called to signalize an eval directive. Args: span (tuple of int): Start and end line of the directive. expr (str): String representation of the Python expression to be evaluated. ''' self._curnode.append(('eval', self._curfile, span, expr))
def handle_text(self, span, txt): '''Should be called to pass text which goes to output unaltered. Args: span (tuple of int): Start and end line of the text. txt (str): Text. ''' self._curnode.append(('txt', self._curfile, span, txt))
def render(self, tree, divert=False, fixposition=False): '''Renders a tree. Args: tree (fypp-tree): Tree to render. divert (bool): Whether output will be diverted and sent for further processing, so that no line numbering directives and postprocessing are needed at this stage. (Default: False) fixposition (bool): Whether file name and line position (variables _FILE_ and _LINE_) should be kept at their current values or should be updated continuously. (Default: False). Returns: str: Rendered string. ''' diverted = self._diverted self._diverted = divert fixedposition = self._fixedposition self._fixedposition = fixposition output, eval_inds, eval_pos = self._render(tree) if not self._diverted and eval_inds: self._postprocess_eval_lines(output, eval_inds, eval_pos) self._diverted = diverted self._fixedposition = fixedposition txt = ''.join(output) return txt
def _handle_stop(self, fname, span, msgstr): try: msg = str(self._evaluate(msgstr, fname, span[0])) except Exception as exc: msg = "exception occured when evaluating stop message '{0}'"\ .format(msgstr) raise FyppFatalError(msg, fname, span, exc) raise FyppStopRequest(msg, fname, span)