def isoratio_init(self,isos): ''' This file returns the isotopic ratio of two isotopes specified as iso1 and iso2. The isotopes are given as, e.g., ['Fe',56,'Fe',58] or ['Fe-56','Fe-58'] (for compatibility) -> list. ''' if len(isos) == 2: dumb = [] dumb = isos[0].split('-') dumb.append(isos[1].split('-')[0]) dumb.append(isos[1].split('-')[1]) isos = dumb ssratio = old_div(self.habu[isos[0].ljust(2).lower() + str(int(isos[1])).rjust(3)], self.habu[isos[2].ljust(2).lower() + str(int(isos[3])).rjust(3)]) return ssratio
def iso_abundance(self,isos): ''' This routine returns the abundance of a specific isotope. Isotope given as, e.g., 'Si-28' or as list ['Si-28','Si-29','Si-30'] ''' if type(isos) == list: dumb = [] for it in range(len(isos)): dumb.append(isos[it].split('-')) ssratio = [] isos = dumb for it in range(len(isos)): ssratio.append(self.habu[isos[it][0].ljust(2).lower() + str(int(isos[it][1])).rjust(3)]) else: isos = isos.split('-') ssratio = self.habu[isos[0].ljust(2).lower() + str(int(isos[1])).rjust(3)] return ssratio
def make_list(default_symbol_list, len_list_to_print): ''' provide the list of symbols to use according for the list of species/arrays to plot. Parameters ---------- default_symbol_list : list Symbols that the user choose to use. len_list_to_print : integer len of list of species/arrays to print. ''' symbol_used = [] for i in range(len_list_to_print): symbol_used.append(default_symbol_list[sc.mod(i,len(default_symbol_list))]) return symbol_used
def _get_callable_argspec_py2(func): argspec = inspect.getargspec(func) if argspec.keywords is not None: msg = "variable length keyword argument '{0}' found"\ .format(argspec.keywords) raise FyppFatalError(msg) vararg = argspec.varargs args = argspec.args tuplearg = False for elem in args: tuplearg = tuplearg or isinstance(elem, list) if tuplearg: msg = 'tuple argument(s) found' raise FyppFatalError(msg) defaults = {} if argspec.defaults is not None: for ind, default in enumerate(argspec.defaults): iarg = len(args) - len(argspec.defaults) + ind defaults[args[iarg]] = default return args, defaults, vararg
def _get_default_gfa_tag_datatype(obj): """Default GFA tag datatype for a given object Parameters: obj : an object of any Python class Returns: str : the identifier of a datatype (one of the keys of FIELD_MODULE) to be used for a tag with obj as value, if a datatype has not been specified by the user """ if getattr(obj, "_default_gfa_tag_datatype",None): return obj._default_gfa_tag_datatype() else: if isinstance(obj, list) and\ (all([isinstance(v, builtins.int) for v in obj]) or all([isinstance(v, builtins.float) for v in obj])): return "B" for k,v in gfapy.Field._default_tag_datatypes: if isinstance(obj, k): return v return "J"
def build_rpc_call(self, method, *params): """ Builds a JSON-RPC request dictionary Args: method (str): Name of the RPC method params (list): Parameters for the RPC method Returns: dict: Request dictionary """ request = {} request['id'] = self.req_id self.req_id = self.req_id + 1 request['method'] = method request['params'] = params return request
def build_rpc_notify(self, method, *params): """ Builds a JSON-RPC notify request dictionary Args: method (str): Name of the notify method params (list): Parameters for the notify method Returns: dict: Request dictionary """ request = {} request['id'] = None request['method'] = method request['params'] = params return request
def assign(item, assignment, **kwargs): key = kwargs.get('assign') value = next(assignment) if kwargs.get('one') else list(assignment) merged = merge([item, {key: value}]) yield DotDict(merged) if kwargs.get('dictize') else merged
def fit(self, x, y, dcoef='none'): ''' performs the fit x, y : list Matching data arrays that define a numerical function y(x), this is the data to be fitted. dcoef : list or string You can provide a different guess for the coefficients, or provide the string 'none' to use the inital guess. The default is 'none'. Returns ------- ierr Values between 1 and 4 signal success. Notes ----- self.fcoef, contains the fitted coefficients. ''' self.x = x self.y = y if dcoef is not 'none': coef = dcoef else: coef = self.coef fcoef=optimize.leastsq(self.residual,coef,args=(y,self.func,x)) self.fcoef = fcoef[0].tolist() return fcoef[1]
def plot(self, ifig=1, data_label='data', fit_label='fit', data_shape='o', fit_shape='-'): ''' plot the data and the fitted function. Parameters ---------- ifig : integer Figure window number. The default is 1. data_label : string Legend for data. The default is 'data'. fit_label : string Legend for fit. If fit_lable is 'fit', then substitute fit function type self.func_name. The default is 'fit'. data_shape : character Shape for data. The default is 'o'. fit_shape : character Shape for fit. The default is '-'. ''' if len(self.coef) is not len(self.fcoef): print("Warning: the fitted coefficient list is not same") print(" length as guessed list - still I will try ...") pl.figure(ifig) pl.plot(self.x,self.y,data_shape,label=data_label) if fit_label is 'fit': fit_label=self.__name__ pl.plot(self.x,self.func(self.fcoef,self.x),fit_shape,label=fit_label) pl.legend()
def is_stable(self,species): ''' This routine accepts input formatted like 'He-3' and checks with stable_el list if occurs in there. If it does, the routine returns True, otherwise False. Notes ----- this method is designed to work with an se instance from nugridse.py. In order to make it work with ppn.py some additional work is required. FH, April 20, 2013. ''' element_name_of_iso = species.split('-')[0] try: a_of_iso = int(species.split('-')[1]) except ValueError: # if the species name contains in addition to the # mass number some letters, e.g. for isomere, then # we assume it is unstable. This is not correct but # related to the fact that in nugridse.py we do not # identify species properly by the three numbers A, Z # and isomeric_state. We should do that!!!!!! a_of_iso = 999 idp_of_element_in_stable_names = self.stable_names.index(element_name_of_iso) if a_of_iso in self.stable_el[idp_of_element_in_stable_names][1:]: return True else: return False
def strictly_monotonic(bb): ''' bb is an index array which may have numerous double or triple occurrences of indices, such as for example the decay_index_pointer. This method removes all entries <= -, then all dublicates and finally returns a sorted list of indices. ''' cc=bb[np.where(bb>=0)] cc.sort() dc=cc[1:]-cc[:-1] # subsequent equal entries have 0 in db dc=np.insert(dc,0,1) # the first element is always unique (the second occurence is the dublicate) dc_mask=np.ma.masked_equal(dc,0) return np.ma.array(cc,mask=dc_mask.mask).compressed()
def _func_getargvalues(*args, **kwargs): return list(args), kwargs
def _get_syspath_without_scriptdir(): '''Remove the folder of the fypp binary from the search path''' syspath = list(sys.path) scriptdir = os.path.abspath(os.path.dirname(sys.argv[0])) if os.path.abspath(syspath[0]) == scriptdir: del syspath[0] return syspath
def __call__(self, line): '''Folds a line. Can be directly called to return the list of folded lines:: linefolder = FortranLineFolder(maxlen=10) linefolder(' print *, "some Fortran line"') Args: line (str): Line to fold. Returns: list of str: Components of folded line. They should be assembled via ``\\n.join()`` to obtain the string representation. ''' if self._maxlen < 0 or len(line) <= self._maxlen: return [line] if self._inherit_indent: indent = len(line) - len(line.lstrip()) prefix = ' ' * indent + self._prefix else: indent = 0 prefix = self._prefix suffix = self._suffix return self._split_line(line, self._maxlen, prefix, suffix, self._fold_position_finder)
def __call__(self, line): '''Returns the entire line without any folding. Returns: list of str: Components of folded line. They should be assembled via ``\\n.join()`` to obtain the string representation. ''' return [line]
def pytest_addoption(parser): group = parser.getgroup('nodev') group.addoption( '--candidates-from-stdlib', action='store_true', help="Collects candidates form the Python standard library.") group.addoption( '--candidates-from-all', action='store_true', help="Collects candidates form the Python standard library and all installed packages. " "Disabled by default, see the docs.") group.addoption( '--candidates-from-specs', default=[], nargs='+', help="Collects candidates from installed packages. Space separated list of `pip` specs.") group.addoption( '--candidates-from-modules', default=[], nargs='+', help="Collects candidates from installed modules. Space separated list of module names.") group.addoption( '--candidates-includes', nargs='+', help="Space separated list of regexs matching full object names to include, " "defaults to include all objects collected via `--candidates-from-*`.") group.addoption( '--candidates-excludes', default=[], nargs='+', help="Space separated list of regexs matching full object names to exclude.") group.addoption( '--candidates-predicate', default='builtins:callable', help="Full name of the predicate passed to `inspect.getmembers`, " "defaults to `builtins.callable`.") group.addoption('--candidates-fail', action='store_true', help="Show candidates failures.")
def pytest_pycollect_makeitem(collector, name, obj): candidate_marker = getattr(obj, 'candidate', None) if candidate_marker and getattr(candidate_marker, 'args', []): candidate_name = candidate_marker.args[0] def wrapper(candidate, monkeypatch, *args, **kwargs): if '.' in candidate_name: monkeypatch.setattr(candidate_name, candidate, raising=False) else: monkeypatch.setattr(inspect.getmodule(obj), candidate_name, candidate, raising=False) return obj(*args, **kwargs) wrapper.__dict__ = obj.__dict__ return list(collector._genfunctions(name, wrapper))
def collect_stdlib_distributions(): """Yield a conventional spec and the names of all top_level standard library modules.""" distribution_spec = 'Python==%d.%d.%d' % sys.version_info[:3] stdlib_path = sysconfig.get_python_lib(standard_lib=True) distribution_top_level = [name for _, name, _ in pkgutil.iter_modules(path=[stdlib_path])] distribution_top_level += list(sys.builtin_module_names) yield distribution_spec, distribution_top_level
def test_collect_stdlib_distributions(): stdlib_distributions = list(collect.collect_stdlib_distributions()) assert len(stdlib_distributions) == 1 _, module_names = stdlib_distributions[0] assert len(module_names) > 10
def test_collect_distributions(): distributions = list(collect.collect_distributions(['pytest-nodev'])) assert len(distributions) == 1 _, module_names = distributions[0] assert len(module_names) == 1 assert len(list(collect.collect_distributions(['non_existent_distribution']))) == 0
def test_import_distributions(): distributions = [('pytest-nodev', ['pytest_nodev'])] module_names = list(collect.import_distributions(distributions)) assert module_names == ['pytest_nodev'] distributions = [('pytest-nodev', ['non_existent_module'])] module_names = list(collect.import_distributions(distributions)) assert module_names == []
def test_generate_module_objects(): expected_item = ('generate_module_objects', collect.generate_module_objects) assert expected_item in list(collect.generate_module_objects(collect))
def test_generate_objects_from_modules(): import re modules = {'pytest_nodev.collection': collect, 're': re} include_patterns = ['pytest_nodev.collection:generate_objects_from_modules'] objs = collect.generate_objects_from_modules( modules, include_patterns, module_blacklist_pattern='re') assert len(list(objs)) == 1
def rpc_call(self, method, *params): """ Call a RPC function on the server This function returns the server response or raises an error Args: method (str): The name of the RPC method to call on the server params (list): The parameters to pass to the RPC method Returns: JSON type: The value returned by the server Raises: RpcError: Generic exception to encapsulate all errors """ json_data = json.dumps(self.build_rpc_call(method, *params)) json_reply = self.rpc_call_raw(json_data) reply = json.loads(json_reply) if 'error' in reply and reply['error']: raise RpcError(reply['error']) return reply['result']
def rpc_notify(self, method, *params): """ Call a RPC function on the server but tell it to send no response Args: method (str): The name of the RPC method to call on the server params (list): The parameters to pass to the RPC method """ json_data = json.dumps(self.build_rpc_call(method, *params)) self.rpc_call_raw(json_data, True)
def __init__(self): """ Constructor """ self.functions = [] self.functions_dict = {} self.custom_types = [] self.custom_types_dict = {} self.description = { 'name': '', 'description': '', 'version': '', 'custom_fields': {} } self.builtins = {} self.builtins['__describe_service'] = self.describe_service self.builtins['__describe_functions'] = self.describe_functions self.builtins['__describe_custom_types'] = self.describe_custom_types self.named_hash_validation = True self.json2py = {'bool': 'bool', 'int': 'int', 'float': 'float', 'string': 'str', 'array': 'list', 'hash': 'dict', 'base64': 'str'} self.py2json = {'bool': 'bool', 'int': 'int', 'float': 'float', 'str': 'string', 'list': 'array', 'dict': 'hash'}
def describe_functions(self): """ Describe the functions exposed by this RPC service Returns: list: Description of all functions registered to this RpcProcessor """ return [function.to_dict() for function in self.functions]
def describe_custom_types(self): """ Describe the custom types exposed by this RPC service Returns: list: Description of all custom types registered to this RpcProcessor """ return [custom_type.to_dict() for custom_type in self.custom_types]
def check_request_types(self, func, params): """ Check the types of the parameters passed by a JSON-RPC call Args: func (RpcFunction): Description of the called function params (list): Parameters passed in the request Raises: JsonRpcTypeError: If a parameter type is invalid JsonRpcParamTypeError: If a parameter type is invalid """ if len(params) != len(func.params): raise JsonRpcParamError(func.name, len(func.params), len(params)) i = 0 for p in func.params: value = params[i] try: self.check_param_type(func, p, value, '') except InvalidEnumValueError as e: raise JsonRpcTypeError("%s: '%s' is not a valid value for parameter '%s' of enum type '%s'" % (func.name, e.value, e.name, e.expected_type)) except InvalidEnumTypeError as e: raise JsonRpcTypeError("%s: Enum parameter '%s' requires a value of type 'int' or 'string' but type was '%s'" % (func.name, e.name, e.real_type)) except InvalidNamedHashError as e: raise JsonRpcTypeError("%s: Named hash parameter '%s' of type '%s' requires a hash value but got '%s'" % (func.name, e.name, e.expected_type, e.real_type)) except InvalidPrimitiveTypeError as e: raise JsonRpcParamTypeError(func.name, e.name, e.expected_type, e.real_type) i += 1
def __init__(self, func='linear', coef=(1, 1)): ''' Parameters ---------- func : string or function, optional If func is a string, then it must be one of the two strings 'linear' or 'powerlaw'. If func is not a string it must be a custom function. The default is "linear". coef : list, optional A guess for the list of coeffiecients. For 'powerlaw' coef must have three enries. For 'linear' coef must have two enries. If you provide your own function, then provide as many coef entries as your function needs. The default is (1, 1). ''' if func is 'linear': print("Information: 'linear' fit needs coef list with 2 entries") print(" -> will use default: coef = "+str(coef)) if len(coef) is not 2: print("Warning: you want a linear fit but you have not") print(" provided a guess for coef with the") print(" right length (2).") print(" -> I will continue and assume coef=(1,1)") coef = (1,1) def ff(coef,x): return coef[0]*x + coef[1] self.__name__ = func elif func is 'powerlaw': print("Information: 'powerlaw' fit needs coef list with 3 entries") print(" -> will use default: coef = "+str(coef)) if len(coef) is not 3: print("Warning: you want a power law fit but you have") print(" not provided a guess for coef with the") print(" right length (3).") print(" -> I will continue and assume coef=(1,1,1)") coef = (1,1,1) def ff(coef,x): return coef[0]*x**coef[1] + coef[2] self.__name__ = func else: print("Information: You provide a fit function yourself. I trust") print(" you have provided a matching guess for the ") print(" coefficient list!") ff = func self.__name__ = func.__name__ # we want to determine the coefficients that # fit the power law to the data # this is done by finding the minimum to a # residual function: # func(params) = ydata - f(xdata, params) # therefore we define a residual function def fres(coef,y,ff,x): return y-ff(coef,x) self.residual = fres self.coef = coef self.func = ff
def _process_abundance_vector(self, a, z, isomers, yps): ''' This private method takes as input one vector definition and processes it, including sorting by charge number and mass number. It returns the processed input variables plus an element and isotope vector and a list of isomers. ''' def cmp_to_key(mycmp): 'Convert a cmp= function into a key= function' class K(object): def __init__(self, obj, *args): self.obj = obj def __lt__(self, other): return mycmp(self.obj, other.obj) < 0 def __gt__(self, other): return mycmp(self.obj, other.obj) > 0 def __eq__(self, other): return mycmp(self.obj, other.obj) == 0 def __le__(self, other): return mycmp(self.obj, other.obj) <= 0 def __ge__(self, other): return mycmp(self.obj, other.obj) >= 0 def __ne__(self, other): return mycmp(self.obj, other.obj) != 0 return K tmp=[] isom=[] for i in range(len(a)): if z[i]!=0 and isomers[i]==1: #if its not 'NEUt and not an isomer' tmp.append([self.stable_names[int(z[i])]+'-'+str(int(a[i])),yps[i],z[i],a[i]]) elif isomers[i]!=1: #if it is an isomer if yps[i]==0: isom.append([self.stable_names[int(z[i])]+'-'+str(int(a[i]))+'-'+str(int(isomers[i]-1)),1e-99]) else: isom.append([self.stable_names[int(z[i])]+'-'+str(int(a[i]))+'-'+str(int(isomers[i]-1)),yps[i]]) tmp.sort(key = cmp_to_key(self.compar)) tmp.sort(key = cmp_to_key(self.comparator)) abunds=[] isotope_to_plot=[] z_iso_to_plot=[] a_iso_to_plot=[] el_iso_to_plot=[] for i in range(len(tmp)): isotope_to_plot.append(tmp[i][0]) abunds.append(tmp[i][1]) z_iso_to_plot.append(int(tmp[i][2])) a_iso_to_plot.append(int(tmp[i][3])) el_iso_to_plot.append(self.stable_names[int(tmp[i][2])]) return a_iso_to_plot,z_iso_to_plot,abunds,isotope_to_plot,el_iso_to_plot,isom
def trajectory_SgConst(Sg=0.1, delta_logt_dex=-0.01): ''' setup trajectories for constant radiation entropy. S_gamma/R where the radiation constant R = N_A*k (Dave Arnett, Supernova book, p. 212) This relates rho and T but the time scale for this is independent. Parameters ---------- Sg : float S_gamma/R, values between 0.1 and 10. reflect conditions in massive stars. The default is 0.1. delta_logt_dex : float Sets interval between time steps in dex of logtimerev. The default is -0.01. ''' # reverse logarithmic time logtimerev=np.arange(5.,-6.,delta_logt_dex) logrho=np.linspace(0,8.5,len(logtimerev)) logT = (old_div(1.,3.))*(logrho + 21.9161 + np.log10(Sg)) #rho_6=10**logrho/(0.1213*1.e6) #T9=rho_6**(1./3.) #logT_T3=np.log10(T9*1.e9) pl.close(3);pl.figure(3);pl.plot(logrho,logT,label='$S/\mathrm{N_Ak}='+str(Sg)+'$') pl.legend(loc=2);pl.xlabel('$\log \\rho$'); pl.ylabel('$\log T$') pl.close(5);pl.figure(5);pl.plot(logtimerev, logrho) pl.xlabel('$\log (t_\mathrm{final}-t)$'); pl.ylabel('$\log \\rho$') pl.xlim(8,-6) pl.close(6);pl.figure(6);pl.plot(logtimerev) pl.ylabel('$\log (t_\mathrm{final}-t)$'); pl.xlabel('cycle') # [t] logtimerev yrs # [rho] cgs # [T] K T9=old_div(10**logT,1.e9) data=[logtimerev,T9,logrho] att.writeTraj(filename='trajectory.input', data=data, ageunit=2, tunit=1, rhounit=1, idNum=1) # data: A list of 1D data vectors with time, T and rho # ageunit: If 1 ageunit = SEC, If 0 ageunit = YRS. If 2 agunit = logtimerev in yrs. Default is 0 # logtimerev is log of time until end
def make_candidate_index(config): if config.getoption('candidates_from_all') and os.environ.get('PYTEST_NODEV_MODE') != 'FEARLESS': raise ValueError("Use of --candidates-from-all may be very dangerous, see the docs.") if not hasattr(config, '_candidate_index'): # take over collect logging collect.logger.propagate = False collect.logger.setLevel(logging.DEBUG) # FIXME: loglevel should be configurable collect.logger.addHandler(utils.EmitHandler(config._warn)) # delegate interrupting hanging tests to pytest-timeout os.environ['PYTEST_TIMEOUT'] = os.environ.get('PYTEST_TIMEOUT', '1') # build the object index distributions = collections.OrderedDict() if config.getoption('candidates_from_stdlib') or config.getoption('candidates_from_all'): distributions.update(collect.collect_stdlib_distributions()) if config.getoption('candidates_from_all'): distributions.update(collect.collect_installed_distributions()) distributions.update(collect.collect_distributions(config.getoption('candidates_from_specs'))) if config.getoption('candidates_from_modules'): distributions['unknown distribution'] = config.getoption('candidates_from_modules') top_level_modules = collect.import_distributions(distributions.items()) includes = config.getoption('candidates_includes') if not includes: includes = ['.'] if config.getoption('candidates_from_all') else sorted(top_level_modules) excludes = config.getoption('candidates_excludes') predicate = config.getoption('candidates_predicate') # NOTE: 'copy' is needed here because indexing may unexpectedly trigger a module load modules = sys.modules.copy() object_index = dict( collect.generate_objects_from_modules(modules, includes, excludes, predicate) ) # store index config._candidate_index = list(zip(*sorted(object_index.items()))) or [(), ()] return config._candidate_index
def do_help(self, line): if not line: print("list - List all RPC functions advertised by the server") print("doc - Show the documentation of a RPC function") print("type - Show the documentation of a custom RPC type") print("types - List all custom RPC types advertised by the server") print("exec - Execute an RPC call") print("notify - Execute an RPC call but tell the server to send no response") print("raw - Directly send a raw JSON-RPC message to the server") print("quit - Quit this program") print("help - Print this message. 'help [command]' prints a") print(" detailed help message for a command") return if line == 'list': print("List all RPC functions advertised by the server") elif line == 'doc': print("Show the documentation of an RPC function") print("Example:") print(" doc echo") elif line == 'type': print("Shos the documentation of a custom RPC type") print("Example:") print(" type PhoneType") elif line == 'types': print("List all custom RPC types advertised by the server") elif line == 'exec': print("Execute an RPC call") print("Examples:") print(" exec echo \"Hello RPC server\"") print(" exec add 4 8") elif line == 'notify': print("Execute an RPC call but tell the server to send no response") print("Example:") print(" notify rpc_function") elif line == 'raw': print("Directly send a raw JSON-RPC message to the server") print("Example:") print(' raw {"method": "echo", "params": ["Hello Server"], "id": 1}') elif line == 'quit': print("Quit this program") elif line == 'help': pass else: print("No help available for unknown command:", line)
def check_param_type(self, func, param, value, path): """ Check the type of a single parameter Args: value (any): Actual value that was passed by the caller path (str): Path to the variable in nested structures """ real_type = type(value).__name__ name = param['name'] declared_type = param['type'] if not path: path = name # workaround for Python 2.7 if real_type == 'unicode': real_type = 'str' # custom type? if declared_type[0].isupper(): typeobj = self.custom_types_dict[declared_type] if type(typeobj).__name__ == 'JsonEnumType': try: if not typeobj.validate(value): raise InvalidEnumValueError(path, declared_type, str(value)) except ValueError: raise InvalidEnumTypeError(path, self.py2json[real_type]) elif type(typeobj).__name__ == 'JsonHashType': if self.py2json[real_type] != 'hash': raise InvalidNamedHashError(path, declared_type, self.py2json[real_type]) # validate named hashes if named has validation is enabled if self.named_hash_validation and self.__is_named_hash_type(declared_type): self.check_named_hash(func, param, value, path) # typed array? elif declared_type.startswith('array<'): if real_type != 'list': raise InvalidPrimitiveTypeError(path, declared_type, self.py2json[real_type]) array_type = declared_type[len('array<'):-1] i = 0 for v in value: p = {'name': '[%d]' % (i), 'type': array_type} self.check_param_type(func, p, v, path + p['name']) i += 1 # primitive type? elif real_type != self.json2py[declared_type]: raise InvalidPrimitiveTypeError(path, declared_type, self.py2json[real_type])
def test_rpcsh_expect_unix_socket(self): try: server = ServerRunner('../examples/serverunixsocket.py', '/tmp/reflectrpc.sock') server.run() python = sys.executable child = pexpect.spawn('%s ../rpcsh unix:///tmp/reflectrpc.sock' % (python)) child.expect('ReflectRPC Shell\r\n') child.expect('================\r\n\r\n') child.expect("Type 'help' for available commands\r\n\r\n") child.expect('RPC server: unix:///tmp/reflectrpc.sock\r\n\r\n') child.expect('Self-description of the Service:\r\n') child.expect('================================\r\n') child.expect('Example RPC Service \(1.0\)\r\n') child.expect('This is an example service for ReflectRPC\r\n') child.expect('\(rpc\) ') child.sendline('list') child.expect('echo\(message\)\r\n') child.expect('add\(a, b\)\r\n') child.expect('sub\(a, b\)\r\n') child.expect('mul\(a, b\)\r\n') child.expect('div\(a, b\)\r\n') child.expect('enum_echo\(phone_type\)\r\n') child.expect('hash_echo\(address\)\r\n') child.expect('notify\(value\)\r\n') child.expect('is_authenticated\(\)\r\n') child.expect('get_username\(\)\r\n') child.expect('echo_ints\(ints\)\r\n') child.sendline('exec echo "Hello Server"') child.expect('Server replied: "Hello Server"\r\n') child.sendline('exec add 5 6') child.expect('Server replied: 11\r\n') child.sendline('exec is_authenticated') child.expect('Server replied: false\r\n') child.sendline('exec get_username') child.expect('Server replied: null\r\n') finally: child.close(True) server.stop()
def test_rpcsh_expect_http(self): try: server = ServerRunner('../examples/serverhttp.py', 5500) server.run() python = sys.executable child = pexpect.spawn('%s ../rpcsh localhost 5500 --http' % (python)) child.expect('ReflectRPC Shell\r\n') child.expect('================\r\n\r\n') child.expect("Type 'help' for available commands\r\n\r\n") child.expect('RPC server: localhost:5500\r\n\r\n') child.expect('Self-description of the Service:\r\n') child.expect('================================\r\n') child.expect('Example RPC Service \(1.0\)\r\n') child.expect('This is an example service for ReflectRPC\r\n') child.expect('\(rpc\) ') child.sendline('list') child.expect('echo\(message\)\r\n') child.expect('add\(a, b\)\r\n') child.expect('sub\(a, b\)\r\n') child.expect('mul\(a, b\)\r\n') child.expect('div\(a, b\)\r\n') child.expect('enum_echo\(phone_type\)\r\n') child.expect('hash_echo\(address\)\r\n') child.expect('notify\(value\)\r\n') child.expect('is_authenticated\(\)\r\n') child.expect('get_username\(\)\r\n') child.expect('echo_ints\(ints\)\r\n') child.sendline('exec echo "Hello Server"') child.expect('Server replied: "Hello Server"\r\n') child.sendline('exec add 5 6') child.expect('Server replied: 11\r\n') child.sendline('exec is_authenticated') child.expect('Server replied: false\r\n') child.sendline('exec get_username') child.expect('Server replied: null\r\n') finally: child.close(True) server.stop()
def test_rpcsh_expect_http_basic_auth(self): try: server = ServerRunner('../examples/serverhttp_basic_auth.py', 5500) server.run() python = sys.executable child = pexpect.spawn('%s ../rpcsh localhost 5500 --http --http-basic-user testuser' % (python)) child.expect('Password: ') child.sendline('123456') child.expect('ReflectRPC Shell\r\n') child.expect('================\r\n\r\n') child.expect("Type 'help' for available commands\r\n\r\n") child.expect('RPC server: localhost:5500\r\n\r\n') child.expect('Self-description of the Service:\r\n') child.expect('================================\r\n') child.expect('Example RPC Service \(1.0\)\r\n') child.expect('This is an example service for ReflectRPC\r\n') child.expect('\(rpc\) ') child.sendline('list') child.expect('echo\(message\)\r\n') child.expect('add\(a, b\)\r\n') child.expect('sub\(a, b\)\r\n') child.expect('mul\(a, b\)\r\n') child.expect('div\(a, b\)\r\n') child.expect('enum_echo\(phone_type\)\r\n') child.expect('hash_echo\(address\)\r\n') child.expect('notify\(value\)\r\n') child.expect('is_authenticated\(\)\r\n') child.expect('get_username\(\)\r\n') child.expect('echo_ints\(ints\)\r\n') child.sendline('exec echo "Hello Server"') child.expect('Server replied: "Hello Server"\r\n') child.sendline('exec add 5 6') child.expect('Server replied: 11\r\n') child.sendline('exec is_authenticated') child.expect('Server replied: true\r\n') child.sendline('exec get_username') child.expect('Server replied: "testuser"\r\n') finally: child.close(True) server.stop()
def test_rpcsh_expect_tls(self): try: server = ServerRunner('../examples/servertls.py', 5500) server.run() python = sys.executable child = pexpect.spawn('%s ../rpcsh localhost 5500 --tls' % (python)) child.expect('ReflectRPC Shell\r\n') child.expect('================\r\n\r\n') child.expect("Type 'help' for available commands\r\n\r\n") child.expect('RPC server: localhost:5500\r\n\r\n') child.expect('Self-description of the Service:\r\n') child.expect('================================\r\n') child.expect('Example RPC Service \(1.0\)\r\n') child.expect('This is an example service for ReflectRPC\r\n') child.expect('\(rpc\) ') child.sendline('list') child.expect('echo\(message\)\r\n') child.expect('add\(a, b\)\r\n') child.expect('sub\(a, b\)\r\n') child.expect('mul\(a, b\)\r\n') child.expect('div\(a, b\)\r\n') child.expect('enum_echo\(phone_type\)\r\n') child.expect('hash_echo\(address\)\r\n') child.expect('notify\(value\)\r\n') child.expect('is_authenticated\(\)\r\n') child.expect('get_username\(\)\r\n') child.expect('echo_ints\(ints\)\r\n') child.sendline('exec echo "Hello Server"') child.expect('Server replied: "Hello Server"\r\n') child.sendline('exec add 5 6') child.expect('Server replied: 11\r\n') child.sendline('exec is_authenticated') child.expect('Server replied: false\r\n') child.sendline('exec get_username') child.expect('Server replied: null\r\n') finally: child.close(True) server.stop()
def test_rpcsh_expect_tls_client_auth(self): try: server = ServerRunner('../examples/servertls_clientauth.py', 5500) server.run() python = sys.executable child = pexpect.spawn('%s ../rpcsh localhost 5500 --tls --ca ../examples/certs/rootCA.crt --key ../examples/certs/client.key --cert ../examples/certs/client.crt' % (python)) child.expect('ReflectRPC Shell\r\n') child.expect('================\r\n\r\n') child.expect("Type 'help' for available commands\r\n\r\n") child.expect('RPC server: localhost:5500\r\n\r\n') child.expect('Self-description of the Service:\r\n') child.expect('================================\r\n') child.expect('Example RPC Service \(1.0\)\r\n') child.expect('This is an example service for ReflectRPC\r\n') child.expect('\(rpc\) ') child.sendline('list') child.expect('echo\(message\)\r\n') child.expect('add\(a, b\)\r\n') child.expect('sub\(a, b\)\r\n') child.expect('mul\(a, b\)\r\n') child.expect('div\(a, b\)\r\n') child.expect('enum_echo\(phone_type\)\r\n') child.expect('hash_echo\(address\)\r\n') child.expect('notify\(value\)\r\n') child.expect('is_authenticated\(\)\r\n') child.expect('get_username\(\)\r\n') child.expect('echo_ints\(ints\)\r\n') child.sendline('exec echo "Hello Server"') child.expect('Server replied: "Hello Server"\r\n') child.sendline('exec add 5 6') child.expect('Server replied: 11\r\n') child.sendline('exec is_authenticated') child.expect('Server replied: true\r\n') child.sendline('exec get_username') child.expect('Server replied: "example-username"\r\n') finally: child.close(True) server.stop()