1# Copyright 2009 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""A fake open() function replacement. See ``fake_filesystem`` for usage. 16""" 17import errno 18import os 19import sys 20from collections import namedtuple 21from stat import ( 22 S_ISDIR, 23) 24from typing import ( 25 Optional, 26 Union, 27 Any, 28 Tuple, 29 cast, 30 AnyStr, 31 TYPE_CHECKING, 32) 33 34from pyfakefs import helpers 35from pyfakefs.fake_file import ( 36 FakePipeWrapper, 37 FakeFileWrapper, 38 FakeFile, 39 AnyFileWrapper, 40) 41from pyfakefs.helpers import ( 42 AnyString, 43 is_root, 44 PERM_READ, 45 PERM_WRITE, 46) 47 48if TYPE_CHECKING: 49 from pyfakefs.fake_filesystem import FakeFilesystem 50 51 52_OpenModes = namedtuple( 53 "_OpenModes", 54 "must_exist can_read can_write truncate append must_not_exist", 55) 56 57_OPEN_MODE_MAP = { 58 # mode name:(file must exist, can read, can write, 59 # truncate, append, must not exist) 60 "r": (True, True, False, False, False, False), 61 "w": (False, False, True, True, False, False), 62 "a": (False, False, True, False, True, False), 63 "r+": (True, True, True, False, False, False), 64 "w+": (False, True, True, True, False, False), 65 "a+": (False, True, True, False, True, False), 66 "x": (False, False, True, False, False, True), 67 "x+": (False, True, True, False, False, True), 68} 69 70 71class FakeFileOpen: 72 """Faked `file()` and `open()` function replacements. 73 74 Returns FakeFile objects in a FakeFilesystem in place of the `file()` 75 or `open()` function. 76 """ 77 78 __name__ = "FakeFileOpen" 79 80 def __init__( 81 self, 82 filesystem: "FakeFilesystem", 83 delete_on_close: bool = False, 84 raw_io: bool = False, 85 ): 86 """ 87 Args: 88 filesystem: FakeFilesystem used to provide file system information 89 delete_on_close: optional boolean, deletes file on close() 90 """ 91 self.filesystem = filesystem 92 self._delete_on_close = delete_on_close 93 self.raw_io = raw_io 94 95 def __call__(self, *args: Any, **kwargs: Any) -> AnyFileWrapper: 96 """Redirects calls to file() or open() to appropriate method.""" 97 return self.call(*args, **kwargs) 98 99 def call( 100 self, 101 file_: Union[AnyStr, int], 102 mode: str = "r", 103 buffering: int = -1, 104 encoding: Optional[str] = None, 105 errors: Optional[str] = None, 106 newline: Optional[str] = None, 107 closefd: bool = True, 108 opener: Any = None, 109 open_modes: Optional[_OpenModes] = None, 110 ) -> AnyFileWrapper: 111 """Return a file-like object with the contents of the target 112 file object. 113 114 Args: 115 file_: Path to target file or a file descriptor. 116 mode: Additional file modes (all modes in `open()` are supported). 117 buffering: the buffer size used for writing. Data will only be 118 flushed if buffer size is exceeded. The default (-1) uses a 119 system specific default buffer size. Text line mode (e.g. 120 buffering=1 in text mode) is not supported. 121 encoding: The encoding used to encode unicode strings / decode 122 bytes. 123 errors: (str) Defines how encoding errors are handled. 124 newline: Controls universal newlines, passed to stream object. 125 closefd: If a file descriptor rather than file name is passed, 126 and this is set to `False`, then the file descriptor is kept 127 open when file is closed. 128 opener: an optional function object that will be called with 129 `file_` and the open flags (derived from `mode`) and returns 130 a file descriptor. 131 open_modes: Modes for opening files if called from low-level API. 132 133 Returns: 134 A file-like object containing the contents of the target file. 135 136 Raises: 137 OSError depending on Python version / call mode: 138 - if the target object is a directory 139 - on an invalid path 140 - if the file does not exist when it should 141 - if the file exists but should not 142 - if permission is denied 143 ValueError: for an invalid mode or mode combination 144 """ 145 binary = "b" in mode 146 147 if binary and encoding: 148 raise ValueError("binary mode doesn't take an encoding argument") 149 150 newline, open_modes = self._handle_file_mode(mode, newline, open_modes) 151 152 # the pathlib opener is defined in a Path instance that may not be 153 # patched under some circumstances; as it just calls standard open(), 154 # we may ignore it, as it would not change the behavior 155 if opener is not None and opener.__module__ != "pathlib": 156 # opener shall return a file descriptor, which will be handled 157 # here as if directly passed 158 file_ = opener(file_, self._open_flags_from_open_modes(open_modes)) 159 160 file_object, file_path, filedes, real_path = self._handle_file_arg(file_) 161 if file_object is None and file_path is None: 162 # file must be a fake pipe wrapper, find it... 163 if ( 164 filedes is None 165 or len(self.filesystem.open_files) <= filedes 166 or not self.filesystem.open_files[filedes] 167 ): 168 raise OSError(errno.EBADF, "invalid pipe file descriptor") 169 wrappers = self.filesystem.open_files[filedes] 170 assert wrappers is not None 171 existing_wrapper = wrappers[0] 172 assert isinstance(existing_wrapper, FakePipeWrapper) 173 wrapper = FakePipeWrapper( 174 self.filesystem, 175 existing_wrapper.fd, 176 existing_wrapper.can_write, 177 mode, 178 ) 179 file_des = self.filesystem._add_open_file(wrapper) 180 wrapper.filedes = file_des 181 return wrapper 182 183 assert file_path is not None 184 if not filedes: 185 closefd = True 186 187 if ( 188 not opener 189 and open_modes.must_not_exist 190 and ( 191 file_object 192 or self.filesystem.islink(file_path) 193 and not self.filesystem.is_windows_fs 194 ) 195 ): 196 self.filesystem.raise_os_error(errno.EEXIST, file_path) 197 198 assert real_path is not None 199 file_object = self._init_file_object( 200 file_object, file_path, open_modes, real_path 201 ) 202 203 if S_ISDIR(file_object.st_mode): 204 if self.filesystem.is_windows_fs: 205 self.filesystem.raise_os_error(errno.EACCES, file_path) 206 else: 207 self.filesystem.raise_os_error(errno.EISDIR, file_path) 208 209 # If you print obj.name, the argument to open() must be printed. 210 # Not the abspath, not the filename, but the actual argument. 211 file_object.opened_as = file_path 212 if open_modes.truncate: 213 current_time = helpers.now() 214 file_object.st_mtime = current_time 215 if not self.filesystem.is_windows_fs: 216 file_object.st_ctime = current_time 217 218 fakefile = FakeFileWrapper( 219 file_object, 220 file_path, 221 update=open_modes.can_write, 222 read=open_modes.can_read, 223 append=open_modes.append, 224 delete_on_close=self._delete_on_close, 225 filesystem=self.filesystem, 226 newline=newline, 227 binary=binary, 228 closefd=closefd, 229 encoding=encoding, 230 errors=errors, 231 buffering=buffering, 232 raw_io=self.raw_io, 233 ) 234 if filedes is not None: 235 fakefile.filedes = filedes 236 # replace the file wrapper 237 open_files_list = self.filesystem.open_files[filedes] 238 assert open_files_list is not None 239 open_files_list.append(fakefile) 240 else: 241 fakefile.filedes = self.filesystem._add_open_file(fakefile) 242 return fakefile 243 244 @staticmethod 245 def _open_flags_from_open_modes(open_modes: _OpenModes) -> int: 246 flags = 0 247 if open_modes.can_read and open_modes.can_write: 248 flags |= os.O_RDWR 249 elif open_modes.can_read: 250 flags |= os.O_RDONLY 251 elif open_modes.can_write: 252 flags |= os.O_WRONLY 253 254 if open_modes.append: 255 flags |= os.O_APPEND 256 if open_modes.truncate: 257 flags |= os.O_TRUNC 258 if not open_modes.must_exist and open_modes.can_write: 259 flags |= os.O_CREAT 260 if open_modes.must_not_exist and open_modes.can_write: 261 flags |= os.O_EXCL 262 return flags 263 264 def _init_file_object( 265 self, 266 file_object: Optional[FakeFile], 267 file_path: AnyStr, 268 open_modes: _OpenModes, 269 real_path: AnyString, 270 ) -> FakeFile: 271 if file_object: 272 if not is_root() and ( 273 (open_modes.can_read and not file_object.st_mode & PERM_READ) 274 or (open_modes.can_write and not file_object.st_mode & PERM_WRITE) 275 ): 276 self.filesystem.raise_os_error(errno.EACCES, file_path) 277 if open_modes.can_write: 278 if open_modes.truncate: 279 file_object.set_contents("") 280 else: 281 if open_modes.must_exist: 282 self.filesystem.raise_os_error(errno.ENOENT, file_path) 283 if self.filesystem.islink(file_path): 284 link_object = self.filesystem.resolve(file_path, follow_symlinks=False) 285 assert link_object.contents is not None 286 target_path = cast( 287 AnyStr, link_object.contents 288 ) # pytype: disable=invalid-annotation 289 else: 290 target_path = file_path 291 if self.filesystem.ends_with_path_separator(target_path): 292 error = ( 293 errno.EINVAL 294 if self.filesystem.is_windows_fs 295 else errno.ENOENT 296 if self.filesystem.is_macos 297 else errno.EISDIR 298 ) 299 self.filesystem.raise_os_error(error, file_path) 300 file_object = self.filesystem.create_file_internally( 301 real_path, create_missing_dirs=False, apply_umask=True 302 ) 303 return file_object 304 305 def _handle_file_arg( 306 self, file_: Union[AnyStr, int] 307 ) -> Tuple[Optional[FakeFile], Optional[AnyStr], Optional[int], Optional[AnyStr]]: 308 file_object = None 309 if isinstance(file_, int): 310 # opening a file descriptor 311 filedes: int = file_ 312 wrapper = self.filesystem.get_open_file(filedes) 313 if isinstance(wrapper, FakePipeWrapper): 314 return None, None, filedes, None 315 if isinstance(wrapper, FakeFileWrapper): 316 self._delete_on_close = wrapper.delete_on_close 317 file_object = cast( 318 FakeFile, self.filesystem.get_open_file(filedes).get_object() 319 ) 320 assert file_object is not None 321 path = file_object.name 322 return ( 323 file_object, 324 cast(AnyStr, path), # pytype: disable=invalid-annotation 325 filedes, 326 cast(AnyStr, path), # pytype: disable=invalid-annotation 327 ) 328 329 # open a file file by path 330 file_path = cast(AnyStr, file_) # pytype: disable=invalid-annotation 331 if file_path == self.filesystem.dev_null.name: 332 file_object = self.filesystem.dev_null 333 real_path = file_path 334 else: 335 real_path = self.filesystem.resolve_path(file_path) 336 if self.filesystem.exists(file_path): 337 file_object = self.filesystem.get_object_from_normpath( 338 real_path, check_read_perm=False 339 ) 340 return file_object, file_path, None, real_path 341 342 def _handle_file_mode( 343 self, 344 mode: str, 345 newline: Optional[str], 346 open_modes: Optional[_OpenModes], 347 ) -> Tuple[Optional[str], _OpenModes]: 348 orig_modes = mode # Save original modes for error messages. 349 # Normalize modes. Handle 't' and 'U'. 350 if ("b" in mode and "t" in mode) or ( 351 sys.version_info > (3, 10) and "U" in mode 352 ): 353 raise ValueError("Invalid mode: " + mode) 354 mode = mode.replace("t", "").replace("b", "") 355 mode = mode.replace("rU", "r").replace("U", "r") 356 if not self.raw_io: 357 if mode not in _OPEN_MODE_MAP: 358 raise ValueError("Invalid mode: %r" % orig_modes) 359 open_modes = _OpenModes(*_OPEN_MODE_MAP[mode]) 360 assert open_modes is not None 361 return newline, open_modes 362