def _extract_scripts_from_project(setup_filename='setup.py'): """Parse setup.py and return scripts""" if not os.path.isfile(setup_filename): return '' mock_setup = textwrap.dedent('''\ def setup(*args, **kwargs): __setup_calls__.append((args, kwargs)) ''') parsed_mock_setup = ast.parse(mock_setup, filename=setup_filename) with open(setup_filename, 'rt') as setup_file: parsed = ast.parse(setup_file.read()) for index, node in enumerate(parsed.body[:]): if (not isinstance(node, ast.Expr) or not isinstance(node.value, ast.Call) or node.value.func.id != 'setup'): continue parsed.body[index:index] = parsed_mock_setup.body break fixed = ast.fix_missing_locations(parsed) codeobj = compile(fixed, setup_filename, 'exec') local_vars = {} global_vars = {'__setup_calls__': []} exec(codeobj, global_vars, local_vars) _, kwargs = global_vars['__setup_calls__'][0] return ','.join([os.path.basename(f) for f in kwargs.get('scripts', [])])
def transform(value, transformers, file=None): """ :param value: :param transformers: :return: """ result = value if transformers: node = to_ast(value, file=file) if node: for transformer in transformers: node = transformer.visit(node) node = ast.fix_missing_locations(node) result = from_ast(node, value) return result
def get_client_calls_for_app(source_code): # type: (str) -> APICallT """Return client calls for a chalice app. This is similar to ``get_client_calls`` except it will automatically traverse into chalice views with the assumption that they will be called. """ parsed = parse_code(source_code) parsed.parsed_ast = AppViewTransformer().visit(parsed.parsed_ast) ast.fix_missing_locations(parsed.parsed_ast) t = SymbolTableTypeInfer(parsed) binder = t.bind_types() collector = APICallCollector(binder) api_calls = collector.collect_api_calls(parsed.parsed_ast) return api_calls
def interpret_fraction(f): assert inspect.isfunction(f) members = dict(inspect.getmembers(f)) global_dict = members["__globals__"] source_filename = inspect.getsourcefile(f) f_ast = ast.parse(inspect.getsource(f)) _, starting_line = inspect.getsourcelines(f) # ast_demo.increment_lineno(f_ast, starting_line - 1) # print("AST:", ast_demo.dump(f_ast)) visitor = FractionInterpreter() new_ast = visitor.visit(f_ast) # print(ast_demo.dump(new_ast)) ast.fix_missing_locations(new_ast) co = compile(new_ast, '<ast_demo>', 'exec') fake_locals = {} # exec will define the new function into fake_locals scope # this is to avoid conflict with vars in the real locals() # https://stackoverflow.com/questions/24733831/using-a-function-defined-in-an-execed-string-in-python-3 exec(co, None, fake_locals) # new_f = locals()[visitor._new_func_name(f.__name__)] return fake_locals[f.__name__]
def mess_control(f, *, debug=0): assert inspect.isfunction(f) members = dict(inspect.getmembers(f)) global_dict = members["__globals__"] source_filename = inspect.getsourcefile(f) f_ast = ast.parse(inspect.getsource(f)) _, starting_line = inspect.getsourcelines(f) # ast_demo.increment_lineno(f_ast, starting_line - 1) if debug: print("AST:", ast.dump(f_ast)) visitor = ControlMess() new_ast = visitor.visit(f_ast) if debug: print('NEW AST:', ast.dump(new_ast)) ast.fix_missing_locations(new_ast) co = compile(new_ast, '<ast_demo>', 'exec') fake_locals = {} # exec will define the new function into fake_locals scope # this is to avoid conflict with vars in the real locals() # https://stackoverflow.com/questions/24733831/using-a-function-defined-in-an-execed-string-in-python-3 exec(co, None, fake_locals) # new_f = locals()[visitor._new_func_name(f.__name__)] return fake_locals[f.__name__]
def run_call(args, node, process, get_func, **kwargs): # Get function expression if isinstance(node, ast.FunctionDef): # function name func_expr = ast.Name(id=node.name, ctx=ast.Load()) elif isinstance(node, ast.Lambda): # lambda body expr func_expr = node else: raise TypeError("Only function definition or lambda may be called") # args is a call string or argument list/dict if isinstance(args, str): parsed = ast.parse(args).body[0].value parsed.func = func_expr ast.fix_missing_locations(parsed) return get_func(process = process, tree = parsed, **kwargs) else: # e.g. list -> {args: [...], kwargs: {}} fmt_args = fix_format(args) ast.fix_missing_locations(func_expr) return get_func(process = process, tree=func_expr, call = fmt_args, **kwargs)
def __replace_connection(self, id, node): """ Replace name of connection with user-defined id :param id: Name to replace with :param node: A call node """ newnode = ast.Name(id=id, ctx=ast.Load()) if len(node.args) != 0: # Conn given as positional arg node.args[0] = newnode else: # Conn given by keyword i.e. "connection = x" for keyword in node.keywords: if keyword.arg == 'connection'\ or keyword.arg == 'f'\ or keyword.arg == 'csvfile': keyword.value = newnode # Call to fill in line number and indentation information for the new # node and its children. ast.fix_missing_locations(node)
def eval_function_def(function_def, globals_=None, flags=None): """ Evaluates an AST of a function definition with an optional dictionary of globals. Returns a callable function (a ``types.FunctionType`` object). """ assert type(function_def) == ast.FunctionDef # Making a copy before mutating module = ast.Module(body=[copy.deepcopy(function_def)]) ast.fix_missing_locations(module) if flags is not None: kwds = dict(dont_inherit=True, flags=flags) else: kwds = {} code_object = compile(module, '<nofile>', 'exec', **kwds) locals_ = {} eval(code_object, globals_, locals_) return locals_[function_def.name]
def function_from_source(source, globals_=None): """ A helper function to construct a Function object from a source with custom __future__ imports. """ module = ast.parse(unindent(source)) ast.fix_missing_locations(module) for stmt in module.body: if type(stmt) == ast.FunctionDef: tree = stmt name = stmt.name break else: raise ValueError("No function definitions found in the provided source") code_object = compile(module, '<nofile>', 'exec', dont_inherit=True) locals_ = {} eval(code_object, globals_, locals_) function_obj = locals_[name] function_obj._peval_source = astunparse.unparse(tree) return Function.from_object(function_obj)
def desugar(t): return ast.fix_missing_locations(Desugarer().visit(t))
def convert(self, node): code = compile(node, '<string>', mode = 'eval') value = eval(code) new_node = ast.parse(str(value), mode = 'eval') if isinstance(new_node, ast.Expression): new_node = new_node.body new_node = self.generic_visit(new_node) node = ast.copy_location(new_node, node) node = ast.fix_missing_locations(node) return node
def visit_For(self, node: ast.For): node = self.generic_visit(node) if self._is_for_yield(node): yield_node = ast.YieldFrom(value = node.iter) expr_node = ast.Expr(value = yield_node) node = ast.copy_location(expr_node, node) node = ast.fix_missing_locations(node) return node
def _compile(self): c = _NameSubstitute() t = c.visit(copy(self._tree)) name = repr(self) maxarg = max((int(name[1:]) for name in c.name_cache), default=0) + 1 args = [ ast.arg(arg='_%d' % n, annotation=None) for n in range(1, maxarg) ] code = compile( ast.fix_missing_locations(ast.Module( body=[ ast.FunctionDef( name=name, args=ast.arguments( args=args, vararg=None, kwonlyargs=[], kw_defaults=[], kwarg=None, defaults=[], ), body=[ast.Return(value=t)], decorator_list=[], returns=None, lineno=1, col_offset=0, ), ], )), name, 'exec', ) ns = {} exec(code, ns) return asconstants(**self._constants)(ns[name])
def make_module(function_asts, main_ast): module_body = function_asts module_body.append(ast.Expr(value=main_ast)) module_ast = ast.Module(body=module_body) ast.fix_missing_locations(module_ast) return module_ast
def _make_fn(name, chain_fn, args, defaults): args_with_self = ['_self'] + list(args) arguments = [_ast.Name(id=arg, ctx=_ast.Load()) for arg in args_with_self] defs = [_ast.Name(id='_def{0}'.format(idx), ctx=_ast.Load()) for idx, _ in enumerate(defaults)] if _PY2: parameters = _ast.arguments(args=[_ast.Name(id=arg, ctx=_ast.Param()) for arg in args_with_self], defaults=defs) else: parameters = _ast.arguments(args=[_ast.arg(arg=arg) for arg in args_with_self], kwonlyargs=[], defaults=defs, kw_defaults=[]) module_node = _ast.Module(body=[_ast.FunctionDef(name=name, args=parameters, body=[_ast.Return(value=_ast.Call(func=_ast.Name(id='_chain', ctx=_ast.Load()), args=arguments, keywords=[]))], decorator_list=[])]) module_node = _ast.fix_missing_locations(module_node) # compile the ast code = compile(module_node, '<string>', 'exec') # and eval it in the right context globals_ = {'_chain': chain_fn} locals_ = dict(('_def{0}'.format(idx), value) for idx, value in enumerate(defaults)) eval(code, globals_, locals_) # extract our function from the newly created module return locals_[name] ######################################################################## # Produce a docstring for the class.
def transform_ast(self, node): """Apply the AST transformations from self.ast_transformers Parameters ---------- node : ast.Node The root node to be transformed. Typically called with the ast.Module produced by parsing user input. Returns ------- An ast.Node corresponding to the node it was called with. Note that it may also modify the passed object, so don't rely on references to the original AST. """ for transformer in self.ast_transformers: try: node = transformer.visit(node) except InputRejected: # User-supplied AST transformers can reject an input by raising # an InputRejected. Short-circuit in this case so that we # don't unregister the transform. raise except Exception: warn("AST transformer %r threw an error. It will be unregistered." % transformer) self.ast_transformers.remove(transformer) if self.ast_transformers: ast.fix_missing_locations(node) return node
def evaluateFalse(s): """ Replaces operators with the SymPy equivalent and sets evaluate=False. """ node = ast.parse(s) node = EvaluateFalseTransformer().visit(node) # node is a Module, we want an Expression node = ast.Expression(node.body[0].value) return ast.fix_missing_locations(node)
def test_invalid_identitifer(self): m = ast.Module([ast.Expr(ast.Name(42, ast.Load()))]) ast.fix_missing_locations(m) with self.assertRaises(TypeError) as cm: compile(m, "<test>", "exec") if support.check_impl_detail(): self.assertIn("identifier must be of type str", str(cm.exception))
def test_invalid_string(self): m = ast.Module([ast.Expr(ast.Str(42))]) ast.fix_missing_locations(m) with self.assertRaises(TypeError) as cm: compile(m, "<test>", "exec") if support.check_impl_detail(): self.assertIn("string must be of type str or uni", str(cm.exception))
def test_fix_missing_locations(self): src = ast.parse('write("spam")') src.body.append(ast.Expr(ast.Call(ast.Name('spam', ast.Load()), [ast.Str('eggs')], [], None, None))) self.assertEqual(src, ast.fix_missing_locations(src)) self.assertEqual(ast.dump(src, include_attributes=True), "Module(body=[Expr(value=Call(func=Name(id='write', ctx=Load(), " "lineno=1, col_offset=0), args=[Str(s='spam', lineno=1, " "col_offset=6)], keywords=[], starargs=None, kwargs=None, " "lineno=1, col_offset=0), lineno=1, col_offset=0), " "Expr(value=Call(func=Name(id='spam', ctx=Load(), lineno=1, " "col_offset=0), args=[Str(s='eggs', lineno=1, col_offset=0)], " "keywords=[], starargs=None, kwargs=None, lineno=1, " "col_offset=0), lineno=1, col_offset=0)])" )
def __call__(self, *args, **kwargs): if self.f is None: source = inspect.getsourcelines(self.orig_f)[0] astutil.unindent(source) source = "".join(source) self.ast = ast.parse(source) rewriter = astutil.SchedulerRewriter(concurrent.functions.keys(), self.frame_info) rewriter.visit(self.ast.body[0]) ast.fix_missing_locations(self.ast) out = compile(self.ast, "<string>", "exec") scope = dict(self.orig_f.__globals__) exec(out, scope) self.f = scope[self.orig_f.__name__] return self.f(*args, **kwargs)
def test_invalid_identitifer(self): m = ast.Module([ast.Expr(ast.Name(u"x", ast.Load()))]) ast.fix_missing_locations(m) with self.assertRaises(TypeError) as cm: compile(m, "<test>", "exec") self.assertIn("identifier must be of type str", str(cm.exception))
def test_invalid_string(self): m = ast.Module([ast.Expr(ast.Str(43))]) ast.fix_missing_locations(m) with self.assertRaises(TypeError) as cm: compile(m, "<test>", "exec") self.assertIn("string must be of type str or uni", str(cm.exception))
def visit_Num(self, node): """ WARNING: you cannot directly write constants in list. It will throw obscure error like "TypeError: required field "lineno" missing from expr" You MUST wrap all constants in ast_demo types, like ast_demo.Num(n=42) instead of raw 42 """ n = node.n if isinstance(n, int): new_node = ast.Call(func=ast.Name(id='Fraction', ctx=ast.Load()), args=[node, ast.Num(n=1)], keywords=[]) ast.copy_location(new_node, node) # ast_demo.fix_missing_locations(new_node) return new_node return node
def visit_Assert(self, node): if isinstance(node.test, ast.Compare) and \ len(node.test.ops) == 1 and \ isinstance(node.test.ops[0], ast.Eq): call = ast.Call(func=ast.Name(id='assert_equal', ctx=ast.Load()), args=[node.test.left, node.test.comparators[0]], keywords=[]) # Wrap the call in an Expr node, because the return value isn't used. newnode = ast.Expr(value=call) ast.copy_location(newnode, node) ast.fix_missing_locations(newnode) return newnode # Return the original node if we don't want to change it. return node
def apply_ast_transform(func, ast_transformer, *, keep_original=True, globals_dict=None, debug=0): """ Apply the AST transform class to a function Args: keep_original: True to retain the old function in attribute `.f_original` globals_dict: pass any external function in your NodeTransformer into this """ if (inspect.isclass(ast_transformer) and issubclass(ast_transformer, DecoratorAST)): ast_transformer = ast_transformer() else: assert isinstance(ast_transformer, DecoratorAST) old_ast = get_func_ast(func) # _, starting_line = inspect.getsourcelines(func) if debug: print("======= OLD AST =======") ast_print(old_ast) visitor = ast_transformer new_ast = visitor.visit(old_ast) if debug: print("======= NEW AST =======") ast_print(new_ast) ast.fix_missing_locations(new_ast) co = compile(new_ast, '<ast_demo>', 'exec') fake_locals = {} # exec will define the new function into fake_locals scope # this is to avoid conflict with vars in the real locals() # https://stackoverflow.com/questions/24733831/using-a-function-defined-in-an-execed-string-in-python-3 exec(co, globals_dict, fake_locals) new_f = fake_locals[func.__name__] new_f.f_original = func if keep_original else None return new_f
def test_invalid_identitifer(self): m = ast.Module([ast.Expr(ast.Name(42, ast.Load()))]) ast.fix_missing_locations(m) with self.assertRaises(TypeError) as cm: compile(m, "<test>", "exec") self.assertIn("identifier must be of type str", str(cm.exception))
def test_invalid_string(self): m = ast.Module([ast.Expr(ast.Str(42))]) ast.fix_missing_locations(m) with self.assertRaises(TypeError) as cm: compile(m, "<test>", "exec") self.assertIn("string must be of type str", str(cm.exception))
def mod(self, mod, msg=None, mode="exec", *, exc=ValueError): mod.lineno = mod.col_offset = 0 ast.fix_missing_locations(mod) with self.assertRaises(exc) as cm: compile(mod, "<test>", mode) if msg is not None: self.assertIn(msg, str(cm.exception))
def visit(self, node, **kwargs): if isinstance(node, string_types): clean = self.preparser(node) node = ast.fix_missing_locations(ast.parse(clean)) method = 'visit_' + node.__class__.__name__ visitor = getattr(self, method) return visitor(node, **kwargs)
def add_input_indices(root, input_vars, index_var): class AddInputIndicesVisitor(ast.NodeTransformer): def visit_Subscript(self, node): if get_var_name(node) in input_vars: return extend_subscript_for_input(node, index_var) return node def visit_Name(self, node): if node.id in input_vars: return ast.Subscript(node, ast.Index(index_var), node.ctx) return node vis = AddInputIndicesVisitor() root = vis.visit(root) return ast.fix_missing_locations(root)
def __call__(self, *args, **kwargs): if self.f is None: source = inspect.getsourcelines(self.orig_f)[0] unindent(source) source = "".join(source) self.ast = ast.parse(source) rewriter = SchedulerRewriter(concurrent.functions.keys()) rewriter.visit(self.ast.body[0]) ast.fix_missing_locations(self.ast) out = compile(self.ast, "<string>", "exec") scope = dict(self.orig_f.__globals__) exec(out, scope) self.f = scope[self.orig_f.__name__] return self.f(*args, **kwargs)
def load_file(filename, env): from utils import e filename = e(filename) g = GlobalsWrapper(env, filename) with open(filename, 'r') as f: tree = ast.parse(f.read(), filename) t2 = ast.fix_missing_locations(AstTransformer().visit(tree)) exec(compile(t2, filename, 'exec'), g) return g.dict
def test_invalid_identitifer(self): m = ast.Module([ast.Expr(ast.Name(u"x", ast.Load()))]) ast.fix_missing_locations(m) with self.assertRaises(TypeError) as cm: compile(m, "<test>", "exec") if test_support.check_impl_detail(): self.assertIn("identifier must be of type str", str(cm.exception))
def test_invalid_string(self): m = ast.Module([ast.Expr(ast.Str(43))]) ast.fix_missing_locations(m) with self.assertRaises(TypeError) as cm: compile(m, "<test>", "exec") if test_support.check_impl_detail(): self.assertIn("string must be of type str or uni", str(cm.exception))