xref: /aosp_15_r20/build/make/tools/edit_monitor/edit_monitor.py (revision 9e94795a3d4ef5c1d47486f9a02bb378756cea8a)
1# Copyright 2024, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16import getpass
17import logging
18import multiprocessing.connection
19import os
20import pathlib
21import platform
22import threading
23import time
24
25from atest.metrics import clearcut_client
26from atest.proto import clientanalytics_pb2
27from proto import edit_event_pb2
28from watchdog.events import FileSystemEvent
29from watchdog.events import PatternMatchingEventHandler
30from watchdog.observers import Observer
31
32# Enum of the Clearcut log source defined under
33# /google3/wireless/android/play/playlog/proto/log_source_enum.proto
34LOG_SOURCE = 2524
35DEFAULT_FLUSH_INTERVAL_SECONDS = 5
36DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100
37
38
39class ClearcutEventHandler(PatternMatchingEventHandler):
40
41  def __init__(
42      self,
43      path: str,
44      flush_interval_sec: int,
45      single_events_size_threshold: int,
46      is_dry_run: bool = False,
47      cclient: clearcut_client.Clearcut | None = None,
48  ):
49
50    super().__init__(patterns=["*"], ignore_directories=True)
51    self.root_monitoring_path = path
52    self.flush_interval_sec = flush_interval_sec
53    self.single_events_size_threshold = single_events_size_threshold
54    self.is_dry_run = is_dry_run
55    self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE)
56
57    self.user_name = getpass.getuser()
58    self.host_name = platform.node()
59    self.source_root = os.environ.get("ANDROID_BUILD_TOP", "")
60
61    self.pending_events = []
62    self._scheduled_log_thread = None
63    self._pending_events_lock = threading.Lock()
64
65  def on_moved(self, event: FileSystemEvent):
66    self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE)
67
68  def on_created(self, event: FileSystemEvent):
69    self._log_edit_event(event, edit_event_pb2.EditEvent.CREATE)
70
71  def on_deleted(self, event: FileSystemEvent):
72    self._log_edit_event(event, edit_event_pb2.EditEvent.DELETE)
73
74  def on_modified(self, event: FileSystemEvent):
75    self._log_edit_event(event, edit_event_pb2.EditEvent.MODIFY)
76
77  def flushall(self):
78    logging.info("flushing all pending events.")
79    if self._scheduled_log_thread:
80      logging.info("canceling log thread")
81      self._scheduled_log_thread.cancel()
82      self._scheduled_log_thread = None
83
84    self._log_clearcut_events()
85    self.cclient.flush_events()
86
87  def _log_edit_event(
88      self, event: FileSystemEvent, edit_type: edit_event_pb2.EditEvent.EditType
89  ):
90    try:
91      event_time = time.time()
92
93      if self._is_hidden_file(pathlib.Path(event.src_path)):
94        logging.debug("ignore hidden file: %s.", event.src_path)
95        return
96
97      if not self._is_under_git_project(pathlib.Path(event.src_path)):
98        logging.debug(
99            "ignore file %s which does not belong to a git project",
100            event.src_path,
101        )
102        return
103
104      logging.info("%s: %s", event.event_type, event.src_path)
105
106      event_proto = edit_event_pb2.EditEvent(
107          user_name=self.user_name,
108          host_name=self.host_name,
109          source_root=self.source_root,
110      )
111      event_proto.single_edit_event.CopyFrom(
112          edit_event_pb2.EditEvent.SingleEditEvent(
113              file_path=event.src_path, edit_type=edit_type
114          )
115      )
116      with self._pending_events_lock:
117        self.pending_events.append((event_proto, event_time))
118        if not self._scheduled_log_thread:
119          logging.debug(
120              "Scheduling thread to run in %d seconds", self.flush_interval_sec
121          )
122          self._scheduled_log_thread = threading.Timer(
123              self.flush_interval_sec, self._log_clearcut_events
124          )
125          self._scheduled_log_thread.start()
126
127    except Exception:
128      logging.exception("Failed to log edit event.")
129
130  def _is_hidden_file(self, file_path: pathlib.Path) -> bool:
131    return any(
132        part.startswith(".")
133        for part in file_path.relative_to(self.root_monitoring_path).parts
134    )
135
136  def _is_under_git_project(self, file_path: pathlib.Path) -> bool:
137    root_path = pathlib.Path(self.root_monitoring_path).resolve()
138    return any(
139        root_path.joinpath(dir).joinpath('.git').exists()
140        for dir in file_path.relative_to(root_path).parents
141    )
142
143  def _log_clearcut_events(self):
144    with self._pending_events_lock:
145      self._scheduled_log_thread = None
146      edit_events = self.pending_events
147      self.pending_events = []
148
149    pending_events_size = len(edit_events)
150    if pending_events_size > self.single_events_size_threshold:
151      logging.info(
152          "got %d events in %d seconds, sending aggregated events instead",
153          pending_events_size,
154          self.flush_interval_sec,
155      )
156      aggregated_event_time = edit_events[0][1]
157      aggregated_event_proto = edit_event_pb2.EditEvent(
158          user_name=self.user_name,
159          host_name=self.host_name,
160          source_root=self.source_root,
161      )
162      aggregated_event_proto.aggregated_edit_event.CopyFrom(
163          edit_event_pb2.EditEvent.AggregatedEditEvent(
164              num_edits=pending_events_size
165          )
166      )
167      edit_events = [(aggregated_event_proto, aggregated_event_time)]
168
169    if self.is_dry_run:
170      logging.info("Sent %d edit events in dry run.", len(edit_events))
171      return
172
173    for event_proto, event_time in edit_events:
174      log_event = clientanalytics_pb2.LogEvent(
175          event_time_ms=int(event_time * 1000),
176          source_extension=event_proto.SerializeToString(),
177      )
178      self.cclient.log(log_event)
179
180    logging.info("sent %d edit events", len(edit_events))
181
182
183def start(
184    path: str,
185    is_dry_run: bool = False,
186    flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS,
187    single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD,
188    cclient: clearcut_client.Clearcut | None = None,
189    pipe_sender: multiprocessing.connection.Connection | None = None,
190):
191  """Method to start the edit monitor.
192
193  This is the entry point to start the edit monitor as a subprocess of
194  the daemon manager.
195
196  params:
197    path: The root path to monitor
198    cclient: The clearcut client to send the edit logs.
199    conn: the sender of the pipe to communicate with the deamon manager.
200  """
201  event_handler = ClearcutEventHandler(
202      path, flush_interval_sec, single_events_size_threshold, is_dry_run, cclient)
203  observer = Observer()
204
205  logging.info("Starting observer on path %s.", path)
206  observer.schedule(event_handler, path, recursive=True)
207  observer.start()
208  logging.info("Observer started.")
209  if pipe_sender:
210    pipe_sender.send("Observer started.")
211
212  try:
213    while True:
214      time.sleep(1)
215  finally:
216    event_handler.flushall()
217    observer.stop()
218    observer.join()
219    if pipe_sender:
220      pipe_sender.close()
221