1from contextlib import suppress 2from io import TextIOWrapper 3 4from . import abc 5 6 7class SpecLoaderAdapter: 8 """ 9 Adapt a package spec to adapt the underlying loader. 10 """ 11 12 def __init__(self, spec, adapter=lambda spec: spec.loader): 13 self.spec = spec 14 self.loader = adapter(spec) 15 16 def __getattr__(self, name): 17 return getattr(self.spec, name) 18 19 20class TraversableResourcesLoader: 21 """ 22 Adapt a loader to provide TraversableResources. 23 """ 24 25 def __init__(self, spec): 26 self.spec = spec 27 28 def get_resource_reader(self, name): 29 return CompatibilityFiles(self.spec)._native() 30 31 32def _io_wrapper(file, mode='r', *args, **kwargs): 33 if mode == 'r': 34 return TextIOWrapper(file, *args, **kwargs) 35 elif mode == 'rb': 36 return file 37 raise ValueError( 38 "Invalid mode value '{}', only 'r' and 'rb' are supported".format(mode) 39 ) 40 41 42class CompatibilityFiles: 43 """ 44 Adapter for an existing or non-existent resource reader 45 to provide a compatibility .files(). 46 """ 47 48 class SpecPath(abc.Traversable): 49 """ 50 Path tied to a module spec. 51 Can be read and exposes the resource reader children. 52 """ 53 54 def __init__(self, spec, reader): 55 self._spec = spec 56 self._reader = reader 57 58 def iterdir(self): 59 if not self._reader: 60 return iter(()) 61 return iter( 62 CompatibilityFiles.ChildPath(self._reader, path) 63 for path in self._reader.contents() 64 ) 65 66 def is_file(self): 67 return False 68 69 is_dir = is_file 70 71 def joinpath(self, other): 72 if not self._reader: 73 return CompatibilityFiles.OrphanPath(other) 74 return CompatibilityFiles.ChildPath(self._reader, other) 75 76 @property 77 def name(self): 78 return self._spec.name 79 80 def open(self, mode='r', *args, **kwargs): 81 return _io_wrapper(self._reader.open_resource(None), mode, *args, **kwargs) 82 83 class ChildPath(abc.Traversable): 84 """ 85 Path tied to a resource reader child. 86 Can be read but doesn't expose any meaningful children. 87 """ 88 89 def __init__(self, reader, name): 90 self._reader = reader 91 self._name = name 92 93 def iterdir(self): 94 return iter(()) 95 96 def is_file(self): 97 return self._reader.is_resource(self.name) 98 99 def is_dir(self): 100 return not self.is_file() 101 102 def joinpath(self, other): 103 return CompatibilityFiles.OrphanPath(self.name, other) 104 105 @property 106 def name(self): 107 return self._name 108 109 def open(self, mode='r', *args, **kwargs): 110 return _io_wrapper( 111 self._reader.open_resource(self.name), mode, *args, **kwargs 112 ) 113 114 class OrphanPath(abc.Traversable): 115 """ 116 Orphan path, not tied to a module spec or resource reader. 117 Can't be read and doesn't expose any meaningful children. 118 """ 119 120 def __init__(self, *path_parts): 121 if len(path_parts) < 1: 122 raise ValueError('Need at least one path part to construct a path') 123 self._path = path_parts 124 125 def iterdir(self): 126 return iter(()) 127 128 def is_file(self): 129 return False 130 131 is_dir = is_file 132 133 def joinpath(self, other): 134 return CompatibilityFiles.OrphanPath(*self._path, other) 135 136 @property 137 def name(self): 138 return self._path[-1] 139 140 def open(self, mode='r', *args, **kwargs): 141 raise FileNotFoundError("Can't open orphan path") 142 143 def __init__(self, spec): 144 self.spec = spec 145 146 @property 147 def _reader(self): 148 with suppress(AttributeError): 149 return self.spec.loader.get_resource_reader(self.spec.name) 150 151 def _native(self): 152 """ 153 Return the native reader if it supports files(). 154 """ 155 reader = self._reader 156 return reader if hasattr(reader, 'files') else self 157 158 def __getattr__(self, attr): 159 return getattr(self._reader, attr) 160 161 def files(self): 162 return CompatibilityFiles.SpecPath(self.spec, self._reader) 163 164 165def wrap_spec(package): 166 """ 167 Construct a package spec with traversable compatibility 168 on the spec/loader/reader. 169 """ 170 return SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader) 171