1import os 2import pathlib 3import tempfile 4import functools 5import contextlib 6import types 7import importlib 8 9from typing import Union, Optional 10from .abc import ResourceReader, Traversable 11 12from ._adapters import wrap_spec 13 14Package = Union[types.ModuleType, str] 15 16 17def files(package): 18 # type: (Package) -> Traversable 19 """ 20 Get a Traversable resource from a package 21 """ 22 return from_package(get_package(package)) 23 24 25def get_resource_reader(package): 26 # type: (types.ModuleType) -> Optional[ResourceReader] 27 """ 28 Return the package's loader if it's a ResourceReader. 29 """ 30 # We can't use 31 # a issubclass() check here because apparently abc.'s __subclasscheck__() 32 # hook wants to create a weak reference to the object, but 33 # zipimport.zipimporter does not support weak references, resulting in a 34 # TypeError. That seems terrible. 35 spec = package.__spec__ 36 reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore 37 if reader is None: 38 return None 39 return reader(spec.name) # type: ignore 40 41 42def resolve(cand): 43 # type: (Package) -> types.ModuleType 44 return cand if isinstance(cand, types.ModuleType) else importlib.import_module(cand) 45 46 47def get_package(package): 48 # type: (Package) -> types.ModuleType 49 """Take a package name or module object and return the module. 50 51 Raise an exception if the resolved module is not a package. 52 """ 53 resolved = resolve(package) 54 if wrap_spec(resolved).submodule_search_locations is None: 55 raise TypeError(f'{package!r} is not a package') 56 return resolved 57 58 59def from_package(package): 60 """ 61 Return a Traversable object for the given package. 62 63 """ 64 spec = wrap_spec(package) 65 reader = spec.loader.get_resource_reader(spec.name) 66 return reader.files() 67 68 69@contextlib.contextmanager 70def _tempfile(reader, suffix='', 71 # gh-93353: Keep a reference to call os.remove() in late Python 72 # finalization. 73 *, _os_remove=os.remove): 74 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' 75 # blocks due to the need to close the temporary file to work on Windows 76 # properly. 77 fd, raw_path = tempfile.mkstemp(suffix=suffix) 78 try: 79 try: 80 os.write(fd, reader()) 81 finally: 82 os.close(fd) 83 del reader 84 yield pathlib.Path(raw_path) 85 finally: 86 try: 87 _os_remove(raw_path) 88 except FileNotFoundError: 89 pass 90 91 92@functools.singledispatch 93def as_file(path): 94 """ 95 Given a Traversable object, return that object as a 96 path on the local file system in a context manager. 97 """ 98 return _tempfile(path.read_bytes, suffix=path.name) 99 100 101@as_file.register(pathlib.Path) 102@contextlib.contextmanager 103def _(path): 104 """ 105 Degenerate behavior for pathlib.Path objects. 106 """ 107 yield path 108