xref: /aosp_15_r20/external/toolchain-utils/llvm_tools/atomic_write_file.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1*760c253cSXin Li# Copyright 2023 The ChromiumOS Authors
2*760c253cSXin Li# Use of this source code is governed by a BSD-style license that can be
3*760c253cSXin Li# found in the LICENSE file.
4*760c253cSXin Li
5*760c253cSXin Li"""Atomic file writing utilities.
6*760c253cSXin Li
7*760c253cSXin LiProvides atomic_write(...), which allows atomically replacing the contents
8*760c253cSXin Liof a file.
9*760c253cSXin Li"""
10*760c253cSXin Li
11*760c253cSXin Liimport contextlib
12*760c253cSXin Liimport logging
13*760c253cSXin Liimport os
14*760c253cSXin Lifrom pathlib import Path
15*760c253cSXin Liimport tempfile
16*760c253cSXin Lifrom typing import Iterator, Literal, Optional, Union
17*760c253cSXin Li
18*760c253cSXin Li
19*760c253cSXin Li@contextlib.contextmanager
20*760c253cSXin Lidef atomic_write(
21*760c253cSXin Li    fp: Union[Path, str],
22*760c253cSXin Li    mode: Literal["w", "wb"] = "w",
23*760c253cSXin Li    encoding: Optional[str] = None,
24*760c253cSXin Li) -> Iterator:
25*760c253cSXin Li    """Write to a filepath atomically.
26*760c253cSXin Li
27*760c253cSXin Li    This works by a temp file swap, created with a .tmp suffix in
28*760c253cSXin Li    the same directory briefly until being renamed to the desired
29*760c253cSXin Li    filepath.
30*760c253cSXin Li
31*760c253cSXin Li    In the event an exception is raised during the write, the
32*760c253cSXin Li    temporary file is deleted and the original filepath is untouched.
33*760c253cSXin Li
34*760c253cSXin Li    Examples:
35*760c253cSXin Li        >>> with atomic_write("my_file.txt", encoding="utf-8") as f:
36*760c253cSXin Li        >>>     f.write("Hello world!")
37*760c253cSXin Li        >>>     # my_file.txt is still unmodified
38*760c253cSXin Li        >>> # "f" is closed here, and my_file.txt is written to.
39*760c253cSXin Li
40*760c253cSXin Li    Args:
41*760c253cSXin Li        fp: Filepath to open.
42*760c253cSXin Li        mode: File mode; can be 'w', 'wb'. Default 'w'.
43*760c253cSXin Li        encoding: the encoding to use (defaults to None).
44*760c253cSXin Li
45*760c253cSXin Li    Raises:
46*760c253cSXin Li        ValueError when the mode is invalid.
47*760c253cSXin Li    """
48*760c253cSXin Li    if isinstance(fp, str):
49*760c253cSXin Li        fp = Path(fp)
50*760c253cSXin Li    if mode not in ("w", "wb"):
51*760c253cSXin Li        raise ValueError(f"mode {mode} not accepted")
52*760c253cSXin Li
53*760c253cSXin Li    # We use mkstemp here because we want to handle the closing and
54*760c253cSXin Li    # replacement ourselves.
55*760c253cSXin Li    result = tempfile.mkstemp(
56*760c253cSXin Li        prefix=fp.name,
57*760c253cSXin Li        suffix=".tmp",
58*760c253cSXin Li        dir=fp.parent,
59*760c253cSXin Li    )
60*760c253cSXin Li    fd, tmp_path = (result[0], Path(result[1]))
61*760c253cSXin Li
62*760c253cSXin Li    try:
63*760c253cSXin Li        with os.fdopen(fd, mode=mode, encoding=encoding) as f:
64*760c253cSXin Li            yield f
65*760c253cSXin Li    except:
66*760c253cSXin Li        try:
67*760c253cSXin Li            tmp_path.unlink()
68*760c253cSXin Li        except Exception as e:
69*760c253cSXin Li            logging.exception("unexpected error removing temporary file %s", e)
70*760c253cSXin Li        raise
71*760c253cSXin Li    tmp_path.replace(fp)
72