我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用builtins.__import__()。
def test_discovery_failed_discovery(self): loader = unittest.TestLoader() package = types.ModuleType('package') orig_import = __import__ def _import(packagename, *args, **kwargs): sys.modules[packagename] = package return package def cleanup(): builtins.__import__ = orig_import self.addCleanup(cleanup) builtins.__import__ = _import with self.assertRaises(TypeError) as cm: loader.discover('package') self.assertEqual(str(cm.exception), 'don\'t know how to discover from {0!r}' .format(package))
def deep_import_hook(name, globals=None, locals=None, fromlist=None, level=-1): """Replacement for __import__()""" parent, buf = get_parent(globals, level) head, name, buf = load_next(parent, None if level < 0 else parent, name, buf) tail = head while name: tail, name, buf = load_next(tail, tail, name, buf) # If tail is None, both get_parent and load_next found # an empty module name: someone called __import__("") or # doctored faulty bytecode if tail is None: raise ValueError('Empty module name') if not fromlist: return head ensure_fromlist(tail, fromlist, buf, 0) return tail
def test_execute_bit_not_copied(self): # Issue 6070: under posix .pyc files got their execute bit set if # the .py file had the execute bit set, but they aren't executable. with temp_umask(0o022): sys.path.insert(0, os.curdir) try: fname = TESTFN + os.extsep + "py" open(fname, 'w').close() os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)) fn = imp.cache_from_source(fname) unlink(fn) __import__(TESTFN) if not os.path.exists(fn): self.fail("__import__ did not result in creation of " "either a .pyc or .pyo file") s = os.stat(fn) self.assertEqual(stat.S_IMODE(s.st_mode), stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) finally: del sys.path[0] remove_files(TESTFN) unload(TESTFN)
def test_failing_import_sticks(self): source = TESTFN + ".py" with open(source, "w") as f: print("a = 1/0", file=f) # New in 2.4, we shouldn't be able to import that no matter how often # we try. sys.path.insert(0, os.curdir) if TESTFN in sys.modules: del sys.modules[TESTFN] try: for i in [1, 2, 3]: self.assertRaises(ZeroDivisionError, __import__, TESTFN) self.assertNotIn(TESTFN, sys.modules, "damaged module in sys.modules on %i try" % i) finally: del sys.path[0] remove_files(TESTFN)
def test_file_to_source(self): # check if __file__ points to the source file where available source = TESTFN + ".py" with open(source, "w") as f: f.write("test = None\n") sys.path.insert(0, os.curdir) try: mod = __import__(TESTFN) self.assertTrue(mod.__file__.endswith('.py')) os.remove(source) del sys.modules[TESTFN] make_legacy_pyc(source) mod = __import__(TESTFN) base, ext = os.path.splitext(mod.__file__) self.assertIn(ext, ('.pyc', '.pyo')) finally: del sys.path[0] remove_files(TESTFN) if TESTFN in sys.modules: del sys.modules[TESTFN]
def test_timestamp_overflow(self): # A modification timestamp larger than 2**32 should not be a problem # when importing a module (issue #11235). sys.path.insert(0, os.curdir) try: source = TESTFN + ".py" compiled = imp.cache_from_source(source) with open(source, 'w') as f: pass try: os.utime(source, (2 ** 33 - 5, 2 ** 33 - 5)) except OverflowError: self.skipTest("cannot set modification time to large integer") except OSError as e: if e.errno != getattr(errno, 'EOVERFLOW', None): raise self.skipTest("cannot set modification time to large integer ({})".format(e)) __import__(TESTFN) # The pyc file was created. os.stat(compiled) finally: del sys.path[0] remove_files(TESTFN)
def test_package___cached__(self): # Like test___cached__ but for packages. def cleanup(): rmtree('pep3147') os.mkdir('pep3147') self.addCleanup(cleanup) # Touch the __init__.py with open(os.path.join('pep3147', '__init__.py'), 'w'): pass with open(os.path.join('pep3147', 'foo.py'), 'w'): pass unload('pep3147.foo') unload('pep3147') m = __import__('pep3147.foo') init_pyc = imp.cache_from_source( os.path.join('pep3147', '__init__.py')) self.assertEqual(m.__cached__, os.path.join(os.curdir, init_pyc)) foo_pyc = imp.cache_from_source(os.path.join('pep3147', 'foo.py')) self.assertEqual(sys.modules['pep3147.foo'].__cached__, os.path.join(os.curdir, foo_pyc))
def test_package___cached___from_pyc(self): # Like test___cached__ but ensuring __cached__ when imported from a # PEP 3147 pyc file. def cleanup(): rmtree('pep3147') os.mkdir('pep3147') self.addCleanup(cleanup) unload('pep3147.foo') unload('pep3147') # Touch the __init__.py with open(os.path.join('pep3147', '__init__.py'), 'w'): pass with open(os.path.join('pep3147', 'foo.py'), 'w'): pass m = __import__('pep3147.foo') unload('pep3147.foo') unload('pep3147') m = __import__('pep3147.foo') init_pyc = imp.cache_from_source( os.path.join('pep3147', '__init__.py')) self.assertEqual(m.__cached__, os.path.join(os.curdir, init_pyc)) foo_pyc = imp.cache_from_source(os.path.join('pep3147', 'foo.py')) self.assertEqual(sys.modules['pep3147.foo'].__cached__, os.path.join(os.curdir, foo_pyc))
def test_failing_import_sticks(self): source = TESTFN + ".py" with open(source, "w") as f: print("a = 1/0", file=f) # New in 2.4, we shouldn't be able to import that no matter how often # we try. sys.path.insert(0, os.curdir) importlib.invalidate_caches() if TESTFN in sys.modules: del sys.modules[TESTFN] try: for i in [1, 2, 3]: self.assertRaises(ZeroDivisionError, __import__, TESTFN) self.assertNotIn(TESTFN, sys.modules, "damaged module in sys.modules on %i try" % i) finally: del sys.path[0] remove_files(TESTFN)
def test_file_to_source(self): # check if __file__ points to the source file where available source = TESTFN + ".py" with open(source, "w") as f: f.write("test = None\n") sys.path.insert(0, os.curdir) try: mod = __import__(TESTFN) self.assertTrue(mod.__file__.endswith('.py')) os.remove(source) del sys.modules[TESTFN] make_legacy_pyc(source) importlib.invalidate_caches() mod = __import__(TESTFN) base, ext = os.path.splitext(mod.__file__) self.assertIn(ext, ('.pyc', '.pyo')) finally: del sys.path[0] remove_files(TESTFN) if TESTFN in sys.modules: del sys.modules[TESTFN]
def test_override_builtin(self): # Test that overriding builtins.__import__ can bypass sys.modules. import os def foo(): import os return os self.assertEqual(foo(), os) # Quick sanity check. with swap_attr(builtins, "__import__", lambda *x: 5): self.assertEqual(foo(), 5) # Test what happens when we shadow __import__ in globals(); this # currently does not impact the import process, but if this changes, # other code will need to change, so keep this test as a tripwire. with swap_item(globals(), "__import__", lambda *x: 5): self.assertEqual(foo(), os)
def test_package___cached__(self): # Like test___cached__ but for packages. def cleanup(): rmtree('pep3147') unload('pep3147.foo') unload('pep3147') os.mkdir('pep3147') self.addCleanup(cleanup) # Touch the __init__.py with open(os.path.join('pep3147', '__init__.py'), 'w'): pass with open(os.path.join('pep3147', 'foo.py'), 'w'): pass importlib.invalidate_caches() m = __import__('pep3147.foo') init_pyc = imp.cache_from_source( os.path.join('pep3147', '__init__.py')) self.assertEqual(m.__cached__, os.path.join(os.curdir, init_pyc)) foo_pyc = imp.cache_from_source(os.path.join('pep3147', 'foo.py')) self.assertEqual(sys.modules['pep3147.foo'].__cached__, os.path.join(os.curdir, foo_pyc))
def test_passlib(self): # force import of passlib pbkdf2 function, and compare headers/keys # generated from both hashlib and passlib self.cipher1 = sagecipher.Cipher() try: import builtins except ImportError: import __builtin__ as builtins realimport = builtins.__import__ def myimport(*args, **kwargs): if args[0] == 'hashlib' and args[3] is not None and 'pbkdf2_hmac' in args[3]: raise ImportError return realimport(*args, **kwargs) builtins.__import__ = myimport del(sagecipher.cipher.pbkdf2_hashlib) reload(sagecipher.cipher) reload(sagecipher) self.assertIn('pbkdf2_passlib', dir(sagecipher.cipher)) self.assertNotIn('pbkdf2_hashlib', dir(sagecipher.cipher)) self.cipher2 = sagecipher.Cipher(self.cipher1.header()) self.assertEqual(self.cipher1.header(), self.cipher2.header())
def test_hooked_import(self): # It should work even if __import__ is hooked by someone else. saved_import = builtins.__import__ @functools.wraps(saved_import) def my_import(*args, **kwargs): return saved_import(*args, **kwargs) end builtins.__import__ = my_import try: with self.assertRaises(SyntaxError) as cm: import cases.hooked_import end self.assertTrue('hooked_import.py' in str(cm.exception)) finally: builtins.__import__ = saved_import end
def test_discovery_failed_discovery(self): loader = unittest.TestLoader() package = types.ModuleType('package') orig_import = __import__ def _import(packagename, *args, **kwargs): sys.modules[packagename] = package return package def cleanup(): builtins.__import__ = orig_import self.addCleanup(cleanup) builtins.__import__ = _import with self.assertRaises(TypeError) as cm: loader.discover('package') self.assertEqual(str(cm.exception), 'don\'t know how to discover from {!r}' .format(package))
def test_timestamp_overflow(self): # A modification timestamp larger than 2**32 should not be a problem # when importing a module (issue #11235). sys.path.insert(0, os.curdir) try: source = TESTFN + ".py" compiled = importlib.util.cache_from_source(source) with open(source, 'w') as f: pass try: os.utime(source, (2 ** 33 - 5, 2 ** 33 - 5)) except OverflowError: self.skipTest("cannot set modification time to large integer") except OSError as e: if e.errno != getattr(errno, 'EOVERFLOW', None): raise self.skipTest("cannot set modification time to large integer ({})".format(e)) __import__(TESTFN) # The pyc file was created. os.stat(compiled) finally: del sys.path[0] remove_files(TESTFN)
def test_package___cached__(self): # Like test___cached__ but for packages. def cleanup(): rmtree('pep3147') unload('pep3147.foo') unload('pep3147') os.mkdir('pep3147') self.addCleanup(cleanup) # Touch the __init__.py with open(os.path.join('pep3147', '__init__.py'), 'w'): pass with open(os.path.join('pep3147', 'foo.py'), 'w'): pass importlib.invalidate_caches() m = __import__('pep3147.foo') init_pyc = importlib.util.cache_from_source( os.path.join('pep3147', '__init__.py')) self.assertEqual(m.__cached__, os.path.join(os.curdir, init_pyc)) foo_pyc = importlib.util.cache_from_source(os.path.join('pep3147', 'foo.py')) self.assertEqual(sys.modules['pep3147.foo'].__cached__, os.path.join(os.curdir, foo_pyc))
def test_import_error(self): """ """ real_import = builtins.__import__ def fake_import(*args, **kwargs): if args and args[0].startswith('plotly'): raise ImportError('Fake Error') return real_import(*args, **kwargs) with patch('builtins.__import__') as import_func: import_func.side_effect = fake_import component = plotly_component.create(None) self.assertEqual(len(component.files), 0) self.assertEqual(len(component.includes), 0)
def importing_fromlist_aggresively(modules): orig___import__ = builtins.__import__ @functools.wraps(orig___import__) def __import__(name, globals=None, locals=None, fromlist=(), level=0): module = orig___import__(name, globals, locals, fromlist, level) if fromlist and module.__name__ in modules: if '*' in fromlist: fromlist = list(fromlist) fromlist.remove('*') fromlist.extend(getattr(module, '__all__', [])) for x in fromlist: if isinstance(getattr(module, x, None), types.ModuleType): from_name = '{}.{}'.format(module.__name__, x) if from_name in modules: importlib.import_module(from_name) return module builtins.__import__ = __import__ try: yield finally: builtins.__import__ = orig___import__
def setUp(self): super(ImportErrorFixture, self).setUp() def mock_import(name, *import_args, **kwargs): if name == self.module_name: module_list = import_args[2] if self.sub_name in module_list: raise ImportError("ImportErrorFixture raising ImportError " "exception on targeted import: %s.%s" % ( self.module_name, self.sub_name)) return self.__real_import(name, *import_args, **kwargs) self.__real_import = builtins.__import__ builtins.__import__ = mock_import self.addCleanup(setattr, builtins, "__import__", self.__real_import)
def _import(self, mod, **kwds): """ Request the remote process import a module (or symbols from a module) and return the proxied results. Uses built-in __import__() function, but adds a bit more processing: _import('module') => returns module _import('module.submodule') => returns submodule (note this differs from behavior of __import__) _import('module', fromlist=[name1, name2, ...]) => returns [module.name1, module.name2, ...] (this also differs from behavior of __import__) """ return self.send(request='import', callSync='sync', opts=dict(module=mod), **kwds)
def test_discovery_from_dotted_namespace_packages(self): if not getattr(types, 'SimpleNamespace', None): raise unittest.SkipTest('Namespaces not supported') loader = unittest.TestLoader() orig_import = __import__ package = types.ModuleType('package') package.__path__ = ['/a', '/b'] package.__spec__ = types.SimpleNamespace( loader=None, submodule_search_locations=['/a', '/b'] ) def _import(packagename, *args, **kwargs): sys.modules[packagename] = package return package def cleanup(): builtins.__import__ = orig_import self.addCleanup(cleanup) builtins.__import__ = _import _find_tests_args = [] def _find_tests(start_dir, pattern, namespace=None): _find_tests_args.append((start_dir, pattern)) return ['%s/tests' % start_dir] loader._find_tests = _find_tests loader.suiteClass = list suite = loader.discover('package') self.assertEqual(suite, ['/a/tests', '/b/tests'])
def __import__(*args, **kwargs): """ __import__(name, globals=None, locals=None, fromlist=(), level=0) -> object Normally python protects imports against concurrency by doing some locking at the C level (at least, it does that in CPython). This function just wraps the normal __import__ functionality in a recursive lock, ensuring that we're protected against greenlet import concurrency as well. """ if len(args) > 0 and not issubclass(type(args[0]), _allowed_module_name_types): # if a builtin has been acquired as a bound instance method, # python knows not to pass 'self' when the method is called. # No such protection exists for monkey-patched builtins, # however, so this is necessary. args = args[1:] if not __lock_imports: return _import(*args, **kwargs) module_lock = __module_lock(args[0]) # Get a lock for the module name imp.acquire_lock() try: module_lock.acquire() try: result = _import(*args, **kwargs) finally: module_lock.release() finally: imp.release_lock() return result
def _unlock_imports(): """ Internal function, called when gevent needs to perform imports lazily, but does not know the state of the system. It may be impossible to take the import lock because there are no other running greenlets, for example. This causes a monkey-patched __import__ to avoid taking any locks. until the corresponding call to lock_imports. This should only be done for limited amounts of time and when the set of imports is statically known to be "safe". """ global __lock_imports # This could easily become a list that we push/pop from or an integer # we increment if we need to do this recursively, but we shouldn't get # that complex. __lock_imports = False
def replace_import_hook(new_import): saved_import = builtin_mod.__import__ builtin_mod.__import__ = new_import try: yield finally: builtin_mod.__import__ = saved_import
def load_next(mod, altmod, name, buf): """ mod, name, buf = load_next(mod, altmod, name, buf) altmod is either None or same as mod """ if len(name) == 0: # completely empty module name should only happen in # 'from . import' (or '__import__("")') return mod, None, buf dot = name.find('.') if dot == 0: raise ValueError('Empty module name') if dot < 0: subname = name next = None else: subname = name[:dot] next = name[dot+1:] if buf != '': buf += '.' buf += subname result = import_submodule(mod, subname, buf) if result is None and mod != altmod: result = import_submodule(altmod, subname, subname) if result is not None: buf = subname if result is None: raise ImportError("No module named %.200s" % name) return result, next, buf # Need to keep track of what we've already reloaded to prevent cyclic evil
def test_failing_reload(self): # A failing reload should leave the module object in sys.modules. source = TESTFN + os.extsep + "py" with open(source, "w") as f: f.write("a = 1\nb=2\n") sys.path.insert(0, os.curdir) try: mod = __import__(TESTFN) self.assertIn(TESTFN, sys.modules) self.assertEqual(mod.a, 1, "module has wrong attribute values") self.assertEqual(mod.b, 2, "module has wrong attribute values") # On WinXP, just replacing the .py file wasn't enough to # convince reload() to reparse it. Maybe the timestamp didn't # move enough. We force it to get reparsed by removing the # compiled file too. remove_files(TESTFN) # Now damage the module. with open(source, "w") as f: f.write("a = 10\nb=20//0\n") self.assertRaises(ZeroDivisionError, imp.reload, mod) # But we still expect the module to be in sys.modules. mod = sys.modules.get(TESTFN) self.assertIsNot(mod, None, "expected module to be in sys.modules") # We should have replaced a w/ 10, but the old b value should # stick. self.assertEqual(mod.a, 10, "module has wrong attribute values") self.assertEqual(mod.b, 2, "module has wrong attribute values") finally: del sys.path[0] remove_files(TESTFN) unload(TESTFN)
def test_import_by_filename(self): path = os.path.abspath(TESTFN) encoding = sys.getfilesystemencoding() try: path.encode(encoding) except UnicodeEncodeError: self.skipTest('path is not encodable to {}'.format(encoding)) with self.assertRaises(ImportError) as c: __import__(path) self.assertEqual("Import by filename is not supported.", c.exception.args[0])
def import_module(self): ns = globals() __import__(self.module_name, ns, ns) return sys.modules[self.module_name]
def test_trailing_slash(self): with open(os.path.join(self.path, 'test_trailing_slash.py'), 'w') as f: f.write("testdata = 'test_trailing_slash'") sys.path.append(self.path+'/') mod = __import__("test_trailing_slash") self.assertEqual(mod.testdata, 'test_trailing_slash') unload("test_trailing_slash") # Regression test for http://bugs.python.org/issue3677.
def _test_UNC_path(self): with open(os.path.join(self.path, 'test_trailing_slash.py'), 'w') as f: f.write("testdata = 'test_trailing_slash'") # Create the UNC path, like \\myhost\c$\foo\bar. path = os.path.abspath(self.path) import socket hn = socket.gethostname() drive = path[0] unc = "\\\\%s\\%s$"%(hn, drive) unc += path[2:] sys.path.append(path) mod = __import__("test_trailing_slash") self.assertEqual(mod.testdata, 'test_trailing_slash') unload("test_trailing_slash")
def test_import_pyc_path(self): self.assertFalse(os.path.exists('__pycache__')) __import__(TESTFN) self.assertTrue(os.path.exists('__pycache__')) self.assertTrue(os.path.exists(os.path.join( '__pycache__', '{}.{}.py{}'.format( TESTFN, self.tag, __debug__ and 'c' or 'o'))))
def test_unwritable_directory(self): # When the umask causes the new __pycache__ directory to be # unwritable, the import still succeeds but no .pyc file is written. with temp_umask(0o222): __import__(TESTFN) self.assertTrue(os.path.exists('__pycache__')) self.assertFalse(os.path.exists(os.path.join( '__pycache__', '{}.{}.pyc'.format(TESTFN, self.tag))))
def test_missing_source(self): # With PEP 3147 cache layout, removing the source but leaving the pyc # file does not satisfy the import. __import__(TESTFN) pyc_file = imp.cache_from_source(self.source) self.assertTrue(os.path.exists(pyc_file)) os.remove(self.source) forget(TESTFN) self.assertRaises(ImportError, __import__, TESTFN)
def test_missing_source_legacy(self): # Like test_missing_source() except that for backward compatibility, # when the pyc file lives where the py file would have been (and named # without the tag), it is importable. The __file__ of the imported # module is the pyc location. __import__(TESTFN) # pyc_file gets removed in _clean() via tearDown(). pyc_file = make_legacy_pyc(self.source) os.remove(self.source) unload(TESTFN) m = __import__(TESTFN) self.assertEqual(m.__file__, os.path.join(os.curdir, os.path.relpath(pyc_file)))
def test___cached__(self): # Modules now also have an __cached__ that points to the pyc file. m = __import__(TESTFN) pyc_file = imp.cache_from_source(TESTFN + '.py') self.assertEqual(m.__cached__, os.path.join(os.curdir, pyc_file))
def restore___import__(self, import_): builtins.__import__ = import_
def _install_import_hook(): global _import_hook_installed if not _import_hook_installed: builtins.__import__ = _pytypes___import__ _import_hook_installed = True
def _patch_import_to_patch_pyqt_on_import(patch_qt_on_import): # I don't like this approach very much as we have to patch __import__, but I like even less # asking the user to configure something in the client side... # So, our approach is to patch PyQt4/5 right before the user tries to import it (at which # point he should've set the sip api version properly already anyways). dotted = patch_qt_on_import + '.' original_import = __import__ from _pydev_imps._pydev_sys_patch import patch_sys_module, patch_reload, cancel_patches_in_sys_module patch_sys_module() patch_reload() def patched_import(name, *args, **kwargs): if patch_qt_on_import == name or name.startswith(dotted): builtins.__import__ = original_import cancel_patches_in_sys_module() _internal_patch_qt() # Patch it only when the user would import the qt module return original_import(name, *args, **kwargs) try: import builtins except ImportError: import __builtin__ as builtins builtins.__import__ = patched_import
def test_import_by_filename(self): path = os.path.abspath(TESTFN) encoding = sys.getfilesystemencoding() try: path.encode(encoding) except UnicodeEncodeError: self.skipTest('path is not encodable to {}'.format(encoding)) with self.assertRaises(ImportError) as c: __import__(path)
def test_bogus_fromlist(self): try: __import__('http', fromlist=['blah']) except ImportError: self.fail("fromlist must allow bogus names")
def test_delete_builtins_import(self): args = ["-c", "del __builtins__.__import__; import os"] popen = script_helper.spawn_python(*args) stdout, stderr = popen.communicate() self.assertIn(b"ImportError", stdout)