1*760c253cSXin Li# Copyright 2023 The ChromiumOS Authors 2*760c253cSXin Li# Use of this source code is governed by a BSD-style license that can be 3*760c253cSXin Li# found in the LICENSE file. 4*760c253cSXin Li 5*760c253cSXin Li"""Atomic file writing utilities. 6*760c253cSXin Li 7*760c253cSXin LiProvides atomic_write(...), which allows atomically replacing the contents 8*760c253cSXin Liof a file. 9*760c253cSXin Li""" 10*760c253cSXin Li 11*760c253cSXin Liimport contextlib 12*760c253cSXin Liimport logging 13*760c253cSXin Liimport os 14*760c253cSXin Lifrom pathlib import Path 15*760c253cSXin Liimport tempfile 16*760c253cSXin Lifrom typing import Iterator, Literal, Optional, Union 17*760c253cSXin Li 18*760c253cSXin Li 19*760c253cSXin Li@contextlib.contextmanager 20*760c253cSXin Lidef atomic_write( 21*760c253cSXin Li fp: Union[Path, str], 22*760c253cSXin Li mode: Literal["w", "wb"] = "w", 23*760c253cSXin Li encoding: Optional[str] = None, 24*760c253cSXin Li) -> Iterator: 25*760c253cSXin Li """Write to a filepath atomically. 26*760c253cSXin Li 27*760c253cSXin Li This works by a temp file swap, created with a .tmp suffix in 28*760c253cSXin Li the same directory briefly until being renamed to the desired 29*760c253cSXin Li filepath. 30*760c253cSXin Li 31*760c253cSXin Li In the event an exception is raised during the write, the 32*760c253cSXin Li temporary file is deleted and the original filepath is untouched. 33*760c253cSXin Li 34*760c253cSXin Li Examples: 35*760c253cSXin Li >>> with atomic_write("my_file.txt", encoding="utf-8") as f: 36*760c253cSXin Li >>> f.write("Hello world!") 37*760c253cSXin Li >>> # my_file.txt is still unmodified 38*760c253cSXin Li >>> # "f" is closed here, and my_file.txt is written to. 39*760c253cSXin Li 40*760c253cSXin Li Args: 41*760c253cSXin Li fp: Filepath to open. 42*760c253cSXin Li mode: File mode; can be 'w', 'wb'. Default 'w'. 43*760c253cSXin Li encoding: the encoding to use (defaults to None). 44*760c253cSXin Li 45*760c253cSXin Li Raises: 46*760c253cSXin Li ValueError when the mode is invalid. 47*760c253cSXin Li """ 48*760c253cSXin Li if isinstance(fp, str): 49*760c253cSXin Li fp = Path(fp) 50*760c253cSXin Li if mode not in ("w", "wb"): 51*760c253cSXin Li raise ValueError(f"mode {mode} not accepted") 52*760c253cSXin Li 53*760c253cSXin Li # We use mkstemp here because we want to handle the closing and 54*760c253cSXin Li # replacement ourselves. 55*760c253cSXin Li result = tempfile.mkstemp( 56*760c253cSXin Li prefix=fp.name, 57*760c253cSXin Li suffix=".tmp", 58*760c253cSXin Li dir=fp.parent, 59*760c253cSXin Li ) 60*760c253cSXin Li fd, tmp_path = (result[0], Path(result[1])) 61*760c253cSXin Li 62*760c253cSXin Li try: 63*760c253cSXin Li with os.fdopen(fd, mode=mode, encoding=encoding) as f: 64*760c253cSXin Li yield f 65*760c253cSXin Li except: 66*760c253cSXin Li try: 67*760c253cSXin Li tmp_path.unlink() 68*760c253cSXin Li except Exception as e: 69*760c253cSXin Li logging.exception("unexpected error removing temporary file %s", e) 70*760c253cSXin Li raise 71*760c253cSXin Li tmp_path.replace(fp) 72