1# Copyright 2024, The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16import getpass 17import logging 18import multiprocessing.connection 19import os 20import pathlib 21import platform 22import threading 23import time 24 25from atest.metrics import clearcut_client 26from atest.proto import clientanalytics_pb2 27from proto import edit_event_pb2 28from watchdog.events import FileSystemEvent 29from watchdog.events import PatternMatchingEventHandler 30from watchdog.observers import Observer 31 32# Enum of the Clearcut log source defined under 33# /google3/wireless/android/play/playlog/proto/log_source_enum.proto 34LOG_SOURCE = 2524 35DEFAULT_FLUSH_INTERVAL_SECONDS = 5 36DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100 37 38 39class ClearcutEventHandler(PatternMatchingEventHandler): 40 41 def __init__( 42 self, 43 path: str, 44 flush_interval_sec: int, 45 single_events_size_threshold: int, 46 is_dry_run: bool = False, 47 cclient: clearcut_client.Clearcut | None = None, 48 ): 49 50 super().__init__(patterns=["*"], ignore_directories=True) 51 self.root_monitoring_path = path 52 self.flush_interval_sec = flush_interval_sec 53 self.single_events_size_threshold = single_events_size_threshold 54 self.is_dry_run = is_dry_run 55 self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE) 56 57 self.user_name = getpass.getuser() 58 self.host_name = platform.node() 59 self.source_root = os.environ.get("ANDROID_BUILD_TOP", "") 60 61 self.pending_events = [] 62 self._scheduled_log_thread = None 63 self._pending_events_lock = threading.Lock() 64 65 def on_moved(self, event: FileSystemEvent): 66 self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE) 67 68 def on_created(self, event: FileSystemEvent): 69 self._log_edit_event(event, edit_event_pb2.EditEvent.CREATE) 70 71 def on_deleted(self, event: FileSystemEvent): 72 self._log_edit_event(event, edit_event_pb2.EditEvent.DELETE) 73 74 def on_modified(self, event: FileSystemEvent): 75 self._log_edit_event(event, edit_event_pb2.EditEvent.MODIFY) 76 77 def flushall(self): 78 logging.info("flushing all pending events.") 79 if self._scheduled_log_thread: 80 logging.info("canceling log thread") 81 self._scheduled_log_thread.cancel() 82 self._scheduled_log_thread = None 83 84 self._log_clearcut_events() 85 self.cclient.flush_events() 86 87 def _log_edit_event( 88 self, event: FileSystemEvent, edit_type: edit_event_pb2.EditEvent.EditType 89 ): 90 try: 91 event_time = time.time() 92 93 if self._is_hidden_file(pathlib.Path(event.src_path)): 94 logging.debug("ignore hidden file: %s.", event.src_path) 95 return 96 97 if not self._is_under_git_project(pathlib.Path(event.src_path)): 98 logging.debug( 99 "ignore file %s which does not belong to a git project", 100 event.src_path, 101 ) 102 return 103 104 logging.info("%s: %s", event.event_type, event.src_path) 105 106 event_proto = edit_event_pb2.EditEvent( 107 user_name=self.user_name, 108 host_name=self.host_name, 109 source_root=self.source_root, 110 ) 111 event_proto.single_edit_event.CopyFrom( 112 edit_event_pb2.EditEvent.SingleEditEvent( 113 file_path=event.src_path, edit_type=edit_type 114 ) 115 ) 116 with self._pending_events_lock: 117 self.pending_events.append((event_proto, event_time)) 118 if not self._scheduled_log_thread: 119 logging.debug( 120 "Scheduling thread to run in %d seconds", self.flush_interval_sec 121 ) 122 self._scheduled_log_thread = threading.Timer( 123 self.flush_interval_sec, self._log_clearcut_events 124 ) 125 self._scheduled_log_thread.start() 126 127 except Exception: 128 logging.exception("Failed to log edit event.") 129 130 def _is_hidden_file(self, file_path: pathlib.Path) -> bool: 131 return any( 132 part.startswith(".") 133 for part in file_path.relative_to(self.root_monitoring_path).parts 134 ) 135 136 def _is_under_git_project(self, file_path: pathlib.Path) -> bool: 137 root_path = pathlib.Path(self.root_monitoring_path).resolve() 138 return any( 139 root_path.joinpath(dir).joinpath('.git').exists() 140 for dir in file_path.relative_to(root_path).parents 141 ) 142 143 def _log_clearcut_events(self): 144 with self._pending_events_lock: 145 self._scheduled_log_thread = None 146 edit_events = self.pending_events 147 self.pending_events = [] 148 149 pending_events_size = len(edit_events) 150 if pending_events_size > self.single_events_size_threshold: 151 logging.info( 152 "got %d events in %d seconds, sending aggregated events instead", 153 pending_events_size, 154 self.flush_interval_sec, 155 ) 156 aggregated_event_time = edit_events[0][1] 157 aggregated_event_proto = edit_event_pb2.EditEvent( 158 user_name=self.user_name, 159 host_name=self.host_name, 160 source_root=self.source_root, 161 ) 162 aggregated_event_proto.aggregated_edit_event.CopyFrom( 163 edit_event_pb2.EditEvent.AggregatedEditEvent( 164 num_edits=pending_events_size 165 ) 166 ) 167 edit_events = [(aggregated_event_proto, aggregated_event_time)] 168 169 if self.is_dry_run: 170 logging.info("Sent %d edit events in dry run.", len(edit_events)) 171 return 172 173 for event_proto, event_time in edit_events: 174 log_event = clientanalytics_pb2.LogEvent( 175 event_time_ms=int(event_time * 1000), 176 source_extension=event_proto.SerializeToString(), 177 ) 178 self.cclient.log(log_event) 179 180 logging.info("sent %d edit events", len(edit_events)) 181 182 183def start( 184 path: str, 185 is_dry_run: bool = False, 186 flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS, 187 single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD, 188 cclient: clearcut_client.Clearcut | None = None, 189 pipe_sender: multiprocessing.connection.Connection | None = None, 190): 191 """Method to start the edit monitor. 192 193 This is the entry point to start the edit monitor as a subprocess of 194 the daemon manager. 195 196 params: 197 path: The root path to monitor 198 cclient: The clearcut client to send the edit logs. 199 conn: the sender of the pipe to communicate with the deamon manager. 200 """ 201 event_handler = ClearcutEventHandler( 202 path, flush_interval_sec, single_events_size_threshold, is_dry_run, cclient) 203 observer = Observer() 204 205 logging.info("Starting observer on path %s.", path) 206 observer.schedule(event_handler, path, recursive=True) 207 observer.start() 208 logging.info("Observer started.") 209 if pipe_sender: 210 pipe_sender.send("Observer started.") 211 212 try: 213 while True: 214 time.sleep(1) 215 finally: 216 event_handler.flushall() 217 observer.stop() 218 observer.join() 219 if pipe_sender: 220 pipe_sender.close() 221