xref: /aosp_15_r20/external/angle/build/android/pylib/utils/instrumentation_tracing.py (revision 8975f5c5ed3d1c378011245431ada316dfb6f244)
1# Copyright 2017 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Functions to instrument all Python function calls.
6
7This generates a JSON file readable by Chrome's about:tracing. To use it,
8either call start_instrumenting and stop_instrumenting at the appropriate times,
9or use the Instrument context manager.
10
11A function is only traced if it is from a Python module that matches at least
12one regular expression object in to_include, and does not match any in
13to_exclude. In between the start and stop events, every function call of a
14function from such a module will be added to the trace.
15"""
16
17import contextlib
18import functools
19import inspect
20import os
21import re
22import sys
23import threading
24
25from py_trace_event import trace_event
26
27
28# Modules to exclude by default (to avoid problems like infinite loops)
29DEFAULT_EXCLUDE = [r'py_trace_event\..*']
30
31
32class _TraceArguments:
33  def __init__(self):
34    """Wraps a dictionary to ensure safe evaluation of repr()."""
35    self._arguments = {}
36
37  @staticmethod
38  def _safeStringify(item):
39    try:
40      item_str = repr(item)
41    except Exception: # pylint: disable=broad-except
42      try:
43        item_str = str(item)
44      except Exception: # pylint: disable=broad-except
45        item_str = "<ERROR>"
46    return item_str
47
48  def add(self, key, val):
49    key_str = _TraceArguments._safeStringify(key)
50    val_str = _TraceArguments._safeStringify(val)
51
52    self._arguments[key_str] = val_str
53
54  def __repr__(self):
55    return repr(self._arguments)
56
57
58saved_thread_ids = set()
59
60def _shouldTrace(frame, to_include, to_exclude, included, excluded):
61  """
62  Decides whether or not the function called in frame should be traced.
63
64  Args:
65    frame: The Python frame object of this function call.
66    to_include: Set of regex objects for modules which should be traced.
67    to_exclude: Set of regex objects for modules which should not be traced.
68    included: Set of module names we've determined should be traced.
69    excluded: Set of module names we've determined should not be traced.
70  """
71  if not inspect.getmodule(frame):
72    return False
73
74  module_name = inspect.getmodule(frame).__name__
75
76  if module_name in included:
77    includes = True
78  elif to_include:
79    includes = any(pattern.match(module_name) for pattern in to_include)
80  else:
81    includes = True
82
83  if includes:
84    included.add(module_name)
85  else:
86    return False
87
88  # Find the modules of every function in the stack trace.
89  frames = inspect.getouterframes(frame)
90  calling_module_names = [inspect.getmodule(fr[0]).__name__ for fr in frames]
91
92  # Return False for anything with an excluded module's function anywhere in the
93  # stack trace (even if the function itself is in an included module).
94  if to_exclude:
95    for calling_module in calling_module_names:
96      if calling_module in excluded:
97        return False
98      for pattern in to_exclude:
99        if pattern.match(calling_module):
100          excluded.add(calling_module)
101          return False
102
103  return True
104
105def _generate_trace_function(to_include, to_exclude):
106  to_include = {re.compile(item) for item in to_include}
107  to_exclude = {re.compile(item) for item in to_exclude}
108  to_exclude.update({re.compile(item) for item in DEFAULT_EXCLUDE})
109
110  included = set()
111  excluded = set()
112
113  tracing_pid = os.getpid()
114
115  def traceFunction(frame, event, arg):
116    del arg
117
118    # Don't try to trace in subprocesses.
119    if os.getpid() != tracing_pid:
120      sys.settrace(None)
121      return None
122
123    # pylint: disable=unused-argument
124    if event not in ("call", "return"):
125      return None
126
127    function_name = frame.f_code.co_name
128    filename = frame.f_code.co_filename
129    line_number = frame.f_lineno
130
131    if _shouldTrace(frame, to_include, to_exclude, included, excluded):
132      if event == "call":
133        # This function is beginning; we save the thread name (if that hasn't
134        # been done), record the Begin event, and return this function to be
135        # used as the local trace function.
136
137        thread_id = threading.current_thread().ident
138
139        if thread_id not in saved_thread_ids:
140          thread_name = threading.current_thread().name
141
142          trace_event.trace_set_thread_name(thread_name)
143
144          saved_thread_ids.add(thread_id)
145
146        arguments = _TraceArguments()
147        # The function's argument values are stored in the frame's
148        # |co_varnames| as the first |co_argcount| elements. (Following that
149        # are local variables.)
150        for idx in range(frame.f_code.co_argcount):
151          arg_name = frame.f_code.co_varnames[idx]
152          arguments.add(arg_name, frame.f_locals[arg_name])
153        trace_event.trace_begin(function_name, arguments=arguments,
154                                module=inspect.getmodule(frame).__name__,
155                                filename=filename, line_number=line_number)
156
157        # Return this function, so it gets used as the "local trace function"
158        # within this function's frame (and in particular, gets called for this
159        # function's "return" event).
160        return traceFunction
161
162      if event == "return":
163        trace_event.trace_end(function_name)
164        return None
165    return None
166
167  return traceFunction
168
169
170def no_tracing(f):
171  @functools.wraps(f)
172  def wrapper(*args, **kwargs):
173    trace_func = sys.gettrace()
174    try:
175      sys.settrace(None)
176      threading.settrace(None)
177      return f(*args, **kwargs)
178    finally:
179      sys.settrace(trace_func)
180      threading.settrace(trace_func)
181  return wrapper
182
183
184def start_instrumenting(output_file, to_include=(), to_exclude=()):
185  """Enable tracing of all function calls (from specified modules)."""
186  trace_event.trace_enable(output_file)
187
188  traceFunc = _generate_trace_function(to_include, to_exclude)
189  sys.settrace(traceFunc)
190  threading.settrace(traceFunc)
191
192
193def stop_instrumenting():
194  trace_event.trace_disable()
195
196  sys.settrace(None)
197  threading.settrace(None)
198
199
200@contextlib.contextmanager
201def Instrument(output_file, to_include=(), to_exclude=()):
202  try:
203    start_instrumenting(output_file, to_include, to_exclude)
204    yield None
205  finally:
206    stop_instrumenting()
207