import contextlib import json import os import time import numpy as np import torch from . import tensor_engine class Benchmark: def __init__(self, mode, device, dtype): self.mode = mode self.deterministic = False self.device = device self.dtype = dtype self.output_type = "stdout" self.print_ir = False self.print_kernel = False if mode == "both": self.requires_grad = True elif mode == "fwd": self.requires_grad = False else: raise ValueError(f"invalid mode: {mode}") self.result_grad = None self.grad_variables = [] self.engine = tensor_engine.get_engine() self.engine.reset(device) # forward all member functions in self.engine to self for method in dir(self.engine): if not callable(getattr(self.engine, method)): continue # don't forward if this function is overriden here if hasattr(self, method): continue # don't forward if it is a internal function if method.startswith("_"): continue method_engine = getattr(self.engine, method) setattr(self, method, method_engine) def forward(self): """do one step worth of computation""" raise ValueError("this method should be reimplemented by subclass") def check(self): if not self.deterministic: return np.testing.assert_allclose( self.reference(), self.numpy(self.compute()), atol=1e-2 ) def config(self): """returns an array for the current benchmark configs""" raise ValueError("this method should be reimplemented by subclass") def desc(self): """return the description of the current benchmark""" config = self.config() config_str = "_".join([str(x) for x in config]) device = self.device if "NNC_NUM_THREADS" in os.environ: num_threads_str = os.environ["NNC_NUM_THREADS"] device += num_threads_str return f"{self.engine.mode}: {self.module()}_{self.mode}_{device}_{config_str}" @staticmethod def module(): raise ValueError("this method should be reimplemented by subclass") def memory_workload(self): raise ValueError("this method should be reimplemented by subclass") def compute_workload(self): """return the number of scalar operations it takes to finish the tensor op""" return None @staticmethod def input_iterable(): """A benchmark child class should return true if it utilizes the input iter arg""" return False def dtype_to_bytes(self): return torch.tensor(0, dtype=self.dtype).element_size() @staticmethod def default_configs(): """return a list of defualt configs for this benchmark""" raise ValueError("this method should be reimplemented by subclass") def is_supported(self): return True def rand(self, shape, device=None, dtype=None, requires_grad=False): v = self.engine.rand( shape, device=device, dtype=dtype, requires_grad=requires_grad ) if requires_grad: self.grad_variables.append(v) return v def nchw_rand(self, shape, device=None, requires_grad=False): v = self.engine.nchw_rand(shape, device=device, requires_grad=requires_grad) if requires_grad: self.grad_variables.append(v) return v def compute(self): if self.bm_jit: return self.bm_jit(*self.inputs) else: return self.forward(*self.inputs) def run(self, args): self.print_ir = args.print_ir if args.cuda_fuser == "old": torch._C._jit_override_can_fuse_on_gpu(True) if args.print_kernel: os.environ["PYTORCH_FUSION_DEBUG"] = "1" return self.run_impl(True) elif args.cuda_fuser == "te": torch._C._jit_set_texpr_fuser_enabled(True) with cuda_pointwise_context( args.cuda_pointwise_loop_levels, args.cuda_pointwise_block_count, args.cuda_pointwise_block_size, ): return self.run_impl(True) elif args.cuda_fuser == "nvf": torch._C._jit_set_nvfuser_enabled(True) torch._C._jit_set_profiling_executor(True) torch._C._jit_set_profiling_mode(True) torch._C._jit_override_can_fuse_on_cpu(False) torch._C._jit_override_can_fuse_on_gpu(False) torch._C._jit_set_bailout_depth(20) if args.print_kernel: os.environ["PYTORCH_CUDA_FUSER_DEBUG"] = "1" return self.run_impl(True) else: return self.run_impl(False) def run_impl(self, use_fuser): warmups = 10 if self.device == "cuda": iters = 1000 else: iters = 10 engine = tensor_engine.get_engine() self.bm_jit = None for i in range(warmups + iters): if i == warmups: if self.device == "cuda": engine.sync_cuda() time_start = time.time() if i == 0: if self.jit_mode == "trace" and use_fuser: self.bm_jit = torch.jit.trace( self.forward, example_inputs=self.inputs, check_trace=False ) if callable(getattr(self, "reference", None)): self.check() else: print("Warning: no reference result for ", self.module()) elif i == 1: # The fusion graph is visible after the first iter is executed if self.jit_mode == "trace" and use_fuser and self.print_ir: print(self.bm_jit.graph_for(*self.inputs)) z = self.compute() if self.mode == "both": if self.result_grad is None: self.result_grad = engine.rand_like(z) engine.backward([z], [self.result_grad], self.grad_variables) if self.device == "cuda": engine.sync_cuda() duration = time.time() - time_start iter_time = duration / iters memory_workload = self.memory_workload() compute_workload = self.compute_workload() result_dict = { "desc": self.desc(), "us": iter_time * 1e6, "sol": memory_workload["sol"] * self.dtype_to_bytes() / iter_time / 1e9, "algorithmic": memory_workload["algorithmic"] * self.dtype_to_bytes() / iter_time / 1e9, } if compute_workload: result_dict["compute_workload"] = compute_workload / iter_time / 1e9 self.dump_result(result_dict) def dump_result(self, result_dict): if self.output_type == "json": print(json.dumps(result_dict)) elif self.output_type == "stdout": msg = "{}: {:.2f} us, SOL {:.2f} GB/s, algorithmic {:.2f} GB/s".format( result_dict["desc"], result_dict["us"], result_dict["sol"], result_dict["algorithmic"], ) if "compute_workload" in result_dict: msg += f", compute {result_dict['compute_workload']:.2f} Gops/s" print(msg) else: raise Exception("Unknown output_type " + self.output_type) # noqa: TRY002 @contextlib.contextmanager def cuda_pointwise_context(loop_levels, block_count, block_size): if loop_levels: old_loop_levels = torch._C._jit_get_te_cuda_pointwise_loop_levels() torch._C._jit_set_te_cuda_pointwise_loop_levels(loop_levels) if block_count: old_block_count = torch._C._jit_get_te_cuda_pointwise_block_count() torch._C._jit_set_te_cuda_pointwise_block_count(block_count) if block_size: old_block_size = torch._C._jit_get_te_cuda_pointwise_block_size() torch._C._jit_set_te_cuda_pointwise_block_size(block_size) try: yield finally: if loop_levels: torch._C._jit_set_te_cuda_pointwise_loop_levels(old_loop_levels) if block_count: torch._C._jit_set_te_cuda_pointwise_block_count(old_block_count) if block_size: torch._C._jit_set_te_cuda_pointwise_block_size(old_block_size) # Auxiliary class to facilitate dynamic input shape class DynamicShape: r""" An Auxiliary class for dynamic shape benchmarks Pre-computes input with random shapes and also modifies the compute method so in each call the fuser sees a different input tensor shape """ # Number of random inputs in an instance SAMPLE_SIZE = 100 def __init__(self, dynamic_range=1.2): self._input_samples = [] self._input_sample_index = 0 self._dynamic_range = ( 1.0 / dynamic_range if dynamic_range > 1.0 else dynamic_range ) self._enable_dynamic_shapes = True # Returns the input test case that current index points to @property def inputs(self): return self._input_samples[self._input_sample_index] # An inputs assignment actually adds a test case in the class buffer @inputs.setter def inputs(self, val): self._input_samples.append(val) # Runs normal compute while increment test case index def compute(self): super().compute() self._input_sample_index = (self._input_sample_index + 1) % self.SAMPLE_SIZE # Defined by benchmark, the benchmark needs to specify the input # tensor construction in this method, essentially the same way # a benchmark creates the inputs list in the initializer def instantiate_input(self): raise NotImplementedError # Instantiate random shaped inputs and start the benchmark run def run(self, args): # force disable dynamic shape from command line if args.no_dynamic_shape: self._enable_dynamic_shapes = False self.load_inputs() super().run(args) # pre-compute inputs so the creations of random tensors # do not add to the compute time def load_inputs(self): for i in range(self.SAMPLE_SIZE - 1): self.instantiate_input() # returns a randomized shape def rand_shape(self, shape): if not self._enable_dynamic_shapes: return shape ratios = np.random.uniform(self._dynamic_range, 1.0, len(shape)) dyn_shape = list(np.multiply(shape, ratios).astype(int)) return dyn_shape benchmark_classes = [] def register_benchmark_class(benchmark_cls): benchmark_classes.append(benchmark_cls)