1 // Copyright 2021 The gRPC Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 #ifndef GRPC_EVENT_ENGINE_EVENT_ENGINE_H 15 #define GRPC_EVENT_ENGINE_EVENT_ENGINE_H 16 17 #include <vector> 18 19 #include "absl/functional/any_invocable.h" 20 #include "absl/status/status.h" 21 #include "absl/status/statusor.h" 22 23 #include <grpc/event_engine/endpoint_config.h> 24 #include <grpc/event_engine/extensible.h> 25 #include <grpc/event_engine/memory_allocator.h> 26 #include <grpc/event_engine/port.h> 27 #include <grpc/event_engine/slice_buffer.h> 28 #include <grpc/support/port_platform.h> 29 30 // TODO(vigneshbabu): Define the Endpoint::Write metrics collection system 31 namespace grpc_event_engine { 32 namespace experimental { 33 34 //////////////////////////////////////////////////////////////////////////////// 35 /// The EventEngine Interface 36 /// 37 /// Overview 38 /// -------- 39 /// 40 /// The EventEngine encapsulates all platform-specific behaviors related to low 41 /// level network I/O, timers, asynchronous execution, and DNS resolution. 42 /// 43 /// This interface allows developers to provide their own event management and 44 /// network stacks. Motivating uses cases for supporting custom EventEngines 45 /// include the ability to hook into external event loops, and using different 46 /// EventEngine instances for each channel to better insulate network I/O and 47 /// callback processing from other channels. 48 /// 49 /// A default cross-platform EventEngine instance is provided by gRPC. 50 /// 51 /// Lifespan and Ownership 52 /// ---------------------- 53 /// 54 /// gRPC takes shared ownership of EventEngines via std::shared_ptrs to ensure 55 /// that the engines remain available until they are no longer needed. Depending 56 /// on the use case, engines may live until gRPC is shut down. 57 /// 58 /// EXAMPLE USAGE (Not yet implemented) 59 /// 60 /// Custom EventEngines can be specified per channel, and allow configuration 61 /// for both clients and servers. To set a custom EventEngine for a client 62 /// channel, you can do something like the following: 63 /// 64 /// ChannelArguments args; 65 /// std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...); 66 /// args.SetEventEngine(engine); 67 /// MyAppClient client(grpc::CreateCustomChannel( 68 /// "localhost:50051", grpc::InsecureChannelCredentials(), args)); 69 /// 70 /// A gRPC server can use a custom EventEngine by calling the 71 /// ServerBuilder::SetEventEngine method: 72 /// 73 /// ServerBuilder builder; 74 /// std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...); 75 /// builder.SetEventEngine(engine); 76 /// std::unique_ptr<Server> server(builder.BuildAndStart()); 77 /// server->Wait(); 78 /// 79 /// 80 /// Blocking EventEngine Callbacks 81 /// ------------------------------ 82 /// 83 /// Doing blocking work in EventEngine callbacks is generally not advisable. 84 /// While gRPC's default EventEngine implementations have some capacity to scale 85 /// their thread pools to avoid starvation, this is not an instantaneous 86 /// process. Further, user-provided EventEngines may not be optimized to handle 87 /// excessive blocking work at all. 88 /// 89 /// *Best Practice* : Occasional blocking work may be fine, but we do not 90 /// recommend running a mostly blocking workload in EventEngine threads. 91 /// 92 /// 93 /// Thread-safety guarantees 94 /// ------------------------ 95 /// 96 /// All EventEngine methods are guaranteed to be thread-safe, no external 97 /// synchronization is required to call any EventEngine method. Please note that 98 /// this does not apply to application callbacks, which may be run concurrently; 99 /// application state synchronization must be managed by the application. 100 /// 101 //////////////////////////////////////////////////////////////////////////////// 102 class EventEngine : public std::enable_shared_from_this<EventEngine>, 103 public Extensible { 104 public: 105 /// A duration between two events. 106 /// 107 /// Throughout the EventEngine API durations are used to express how long 108 /// until an action should be performed. 109 using Duration = std::chrono::duration<int64_t, std::nano>; 110 /// A custom closure type for EventEngine task execution. 111 /// 112 /// Throughout the EventEngine API, \a Closure ownership is retained by the 113 /// caller - the EventEngine will never delete a Closure, and upon 114 /// cancellation, the EventEngine will simply forget the Closure exists. The 115 /// caller is responsible for all necessary cleanup. 116 117 class Closure { 118 public: 119 Closure() = default; 120 // Closure's are an interface, and thus non-copyable. 121 Closure(const Closure&) = delete; 122 Closure& operator=(const Closure&) = delete; 123 // Polymorphic type => virtual destructor 124 virtual ~Closure() = default; 125 // Run the contained code. 126 virtual void Run() = 0; 127 }; 128 /// Represents a scheduled task. 129 /// 130 /// \a TaskHandles are returned by \a Run* methods, and can be given to the 131 /// \a Cancel method. 132 struct TaskHandle { 133 intptr_t keys[2]; 134 static const GRPC_DLL TaskHandle kInvalid; 135 friend bool operator==(const TaskHandle& lhs, const TaskHandle& rhs); 136 friend bool operator!=(const TaskHandle& lhs, const TaskHandle& rhs); 137 }; 138 /// A handle to a cancellable connection attempt. 139 /// 140 /// Returned by \a Connect, and can be passed to \a CancelConnect. 141 struct ConnectionHandle { 142 intptr_t keys[2]; 143 static const GRPC_DLL ConnectionHandle kInvalid; 144 friend bool operator==(const ConnectionHandle& lhs, 145 const ConnectionHandle& rhs); 146 friend bool operator!=(const ConnectionHandle& lhs, 147 const ConnectionHandle& rhs); 148 }; 149 /// Thin wrapper around a platform-specific sockaddr type. A sockaddr struct 150 /// exists on all platforms that gRPC supports. 151 /// 152 /// Platforms are expected to provide definitions for: 153 /// * sockaddr 154 /// * sockaddr_in 155 /// * sockaddr_in6 156 class ResolvedAddress { 157 public: 158 static constexpr socklen_t MAX_SIZE_BYTES = 128; 159 160 ResolvedAddress(const sockaddr* address, socklen_t size); 161 ResolvedAddress() = default; 162 ResolvedAddress(const ResolvedAddress&) = default; 163 const struct sockaddr* address() const; 164 socklen_t size() const; 165 166 private: 167 char address_[MAX_SIZE_BYTES] = {}; 168 socklen_t size_ = 0; 169 }; 170 171 /// One end of a connection between a gRPC client and server. Endpoints are 172 /// created when connections are established, and Endpoint operations are 173 /// gRPC's primary means of communication. 174 /// 175 /// Endpoints must use the provided MemoryAllocator for all data buffer memory 176 /// allocations. gRPC allows applications to set memory constraints per 177 /// Channel or Server, and the implementation depends on all dynamic memory 178 /// allocation being handled by the quota system. 179 class Endpoint : public Extensible { 180 public: 181 /// Shuts down all connections and invokes all pending read or write 182 /// callbacks with an error status. 183 virtual ~Endpoint() = default; 184 /// A struct representing optional arguments that may be provided to an 185 /// EventEngine Endpoint Read API call. 186 /// 187 /// Passed as argument to an Endpoint \a Read 188 struct ReadArgs { 189 // A suggestion to the endpoint implementation to read at-least the 190 // specified number of bytes over the network connection before marking 191 // the endpoint read operation as complete. gRPC may use this argument 192 // to minimize the number of endpoint read API calls over the lifetime 193 // of a connection. 194 int64_t read_hint_bytes; 195 }; 196 /// Reads data from the Endpoint. 197 /// 198 /// When data is available on the connection, that data is moved into the 199 /// \a buffer. If the read succeeds immediately, it returns true and the \a 200 /// on_read callback is not executed. Otherwise it returns false and the \a 201 /// on_read callback executes asynchronously when the read completes. The 202 /// caller must ensure that the callback has access to the buffer when it 203 /// executes. Ownership of the buffer is not transferred. Valid slices *may* 204 /// be placed into the buffer even if the callback is invoked with a non-OK 205 /// Status. 206 /// 207 /// There can be at most one outstanding read per Endpoint at any given 208 /// time. An outstanding read is one in which the \a on_read callback has 209 /// not yet been executed for some previous call to \a Read. If an attempt 210 /// is made to call \a Read while a previous read is still outstanding, the 211 /// \a EventEngine must abort. 212 /// 213 /// For failed read operations, implementations should pass the appropriate 214 /// statuses to \a on_read. For example, callbacks might expect to receive 215 /// CANCELLED on endpoint shutdown. 216 virtual bool Read(absl::AnyInvocable<void(absl::Status)> on_read, 217 SliceBuffer* buffer, const ReadArgs* args) = 0; 218 /// A struct representing optional arguments that may be provided to an 219 /// EventEngine Endpoint Write API call. 220 /// 221 /// Passed as argument to an Endpoint \a Write 222 struct WriteArgs { 223 // Represents private information that may be passed by gRPC for 224 // select endpoints expected to be used only within google. 225 void* google_specific = nullptr; 226 // A suggestion to the endpoint implementation to group data to be written 227 // into frames of the specified max_frame_size. gRPC may use this 228 // argument to dynamically control the max sizes of frames sent to a 229 // receiver in response to high receiver memory pressure. 230 int64_t max_frame_size; 231 }; 232 /// Writes data out on the connection. 233 /// 234 /// If the write succeeds immediately, it returns true and the 235 /// \a on_writable callback is not executed. Otherwise it returns false and 236 /// the \a on_writable callback is called asynchronously when the connection 237 /// is ready for more data. The Slices within the \a data buffer may be 238 /// mutated at will by the Endpoint until \a on_writable is called. The \a 239 /// data SliceBuffer will remain valid after calling \a Write, but its state 240 /// is otherwise undefined. All bytes in \a data must have been written 241 /// before calling \a on_writable unless an error has occurred. 242 /// 243 /// There can be at most one outstanding write per Endpoint at any given 244 /// time. An outstanding write is one in which the \a on_writable callback 245 /// has not yet been executed for some previous call to \a Write. If an 246 /// attempt is made to call \a Write while a previous write is still 247 /// outstanding, the \a EventEngine must abort. 248 /// 249 /// For failed write operations, implementations should pass the appropriate 250 /// statuses to \a on_writable. For example, callbacks might expect to 251 /// receive CANCELLED on endpoint shutdown. 252 virtual bool Write(absl::AnyInvocable<void(absl::Status)> on_writable, 253 SliceBuffer* data, const WriteArgs* args) = 0; 254 /// Returns an address in the format described in DNSResolver. The returned 255 /// values are expected to remain valid for the life of the Endpoint. 256 virtual const ResolvedAddress& GetPeerAddress() const = 0; 257 virtual const ResolvedAddress& GetLocalAddress() const = 0; 258 }; 259 260 /// Called when a new connection is established. 261 /// 262 /// If the connection attempt was not successful, implementations should pass 263 /// the appropriate statuses to this callback. For example, callbacks might 264 /// expect to receive DEADLINE_EXCEEDED statuses when appropriate, or 265 /// CANCELLED statuses on EventEngine shutdown. 266 using OnConnectCallback = 267 absl::AnyInvocable<void(absl::StatusOr<std::unique_ptr<Endpoint>>)>; 268 269 /// Listens for incoming connection requests from gRPC clients and initiates 270 /// request processing once connections are established. 271 class Listener : public Extensible { 272 public: 273 /// Called when the listener has accepted a new client connection. 274 using AcceptCallback = absl::AnyInvocable<void( 275 std::unique_ptr<Endpoint>, MemoryAllocator memory_allocator)>; 276 virtual ~Listener() = default; 277 /// Bind an address/port to this Listener. 278 /// 279 /// It is expected that multiple addresses/ports can be bound to this 280 /// Listener before Listener::Start has been called. Returns either the 281 /// bound port or an appropriate error status. 282 virtual absl::StatusOr<int> Bind(const ResolvedAddress& addr) = 0; 283 virtual absl::Status Start() = 0; 284 }; 285 286 /// Factory method to create a network listener / server. 287 /// 288 /// Once a \a Listener is created and started, the \a on_accept callback will 289 /// be called once asynchronously for each established connection. This method 290 /// may return a non-OK status immediately if an error was encountered in any 291 /// synchronous steps required to create the Listener. In this case, 292 /// \a on_shutdown will never be called. 293 /// 294 /// If this method returns a Listener, then \a on_shutdown will be invoked 295 /// exactly once when the Listener is shut down, and only after all 296 /// \a on_accept callbacks have finished executing. The status passed to it 297 /// will indicate if there was a problem during shutdown. 298 /// 299 /// The provided \a MemoryAllocatorFactory is used to create \a 300 /// MemoryAllocators for Endpoint construction. 301 virtual absl::StatusOr<std::unique_ptr<Listener>> CreateListener( 302 Listener::AcceptCallback on_accept, 303 absl::AnyInvocable<void(absl::Status)> on_shutdown, 304 const EndpointConfig& config, 305 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) = 0; 306 /// Creates a client network connection to a remote network listener. 307 /// 308 /// Even in the event of an error, it is expected that the \a on_connect 309 /// callback will be asynchronously executed exactly once by the EventEngine. 310 /// A connection attempt can be cancelled using the \a CancelConnect method. 311 /// 312 /// Implementation Note: it is important that the \a memory_allocator be used 313 /// for all read/write buffer allocations in the EventEngine implementation. 314 /// This allows gRPC's \a ResourceQuota system to monitor and control memory 315 /// usage with graceful degradation mechanisms. Please see the \a 316 /// MemoryAllocator API for more information. 317 virtual ConnectionHandle Connect(OnConnectCallback on_connect, 318 const ResolvedAddress& addr, 319 const EndpointConfig& args, 320 MemoryAllocator memory_allocator, 321 Duration timeout) = 0; 322 323 /// Request cancellation of a connection attempt. 324 /// 325 /// If the associated connection has already been completed, it will not be 326 /// cancelled, and this method will return false. 327 /// 328 /// If the associated connection has not been completed, it will be cancelled, 329 /// and this method will return true. The \a OnConnectCallback will not be 330 /// called, and \a on_connect will be destroyed before this method returns. 331 virtual bool CancelConnect(ConnectionHandle handle) = 0; 332 /// Provides asynchronous resolution. 333 /// 334 /// This object has a destruction-is-cancellation semantic. 335 /// Implementations should make sure that all pending requests are cancelled 336 /// when the object is destroyed and all pending callbacks will be called 337 /// shortly. If cancellation races with request completion, implementations 338 /// may choose to either cancel or satisfy the request. 339 class DNSResolver { 340 public: 341 /// Optional configuration for DNSResolvers. 342 struct ResolverOptions { 343 /// If empty, default DNS servers will be used. 344 /// Must be in the "IP:port" format as described in naming.md. 345 std::string dns_server; 346 }; 347 /// DNS SRV record type. 348 struct SRVRecord { 349 std::string host; 350 int port = 0; 351 int priority = 0; 352 int weight = 0; 353 }; 354 /// Called with the collection of sockaddrs that were resolved from a given 355 /// target address. 356 using LookupHostnameCallback = 357 absl::AnyInvocable<void(absl::StatusOr<std::vector<ResolvedAddress>>)>; 358 /// Called with a collection of SRV records. 359 using LookupSRVCallback = 360 absl::AnyInvocable<void(absl::StatusOr<std::vector<SRVRecord>>)>; 361 /// Called with the result of a TXT record lookup 362 using LookupTXTCallback = 363 absl::AnyInvocable<void(absl::StatusOr<std::vector<std::string>>)>; 364 365 virtual ~DNSResolver() = default; 366 367 /// Asynchronously resolve an address. 368 /// 369 /// \a default_port may be a non-numeric named service port, and will only 370 /// be used if \a address does not already contain a port component. 371 /// 372 /// When the lookup is complete or cancelled, the \a on_resolve callback 373 /// will be invoked with a status indicating the success or failure of the 374 /// lookup. Implementations should pass the appropriate statuses to the 375 /// callback. For example, callbacks might expect to receive CANCELLED or 376 /// NOT_FOUND. 377 virtual void LookupHostname(LookupHostnameCallback on_resolve, 378 absl::string_view name, 379 absl::string_view default_port) = 0; 380 /// Asynchronously perform an SRV record lookup. 381 /// 382 /// \a on_resolve has the same meaning and expectations as \a 383 /// LookupHostname's \a on_resolve callback. 384 virtual void LookupSRV(LookupSRVCallback on_resolve, 385 absl::string_view name) = 0; 386 /// Asynchronously perform a TXT record lookup. 387 /// 388 /// \a on_resolve has the same meaning and expectations as \a 389 /// LookupHostname's \a on_resolve callback. 390 virtual void LookupTXT(LookupTXTCallback on_resolve, 391 absl::string_view name) = 0; 392 }; 393 394 /// At time of destruction, the EventEngine must have no active 395 /// responsibilities. EventEngine users (applications) are responsible for 396 /// cancelling all tasks and DNS lookups, shutting down listeners and 397 /// endpoints, prior to EventEngine destruction. If there are any outstanding 398 /// tasks, any running listeners, etc. at time of EventEngine destruction, 399 /// that is an invalid use of the API, and it will result in undefined 400 /// behavior. 401 virtual ~EventEngine() = default; 402 403 // TODO(nnoble): consider whether we can remove this method before we 404 // de-experimentalize this API. 405 virtual bool IsWorkerThread() = 0; 406 407 /// Creates and returns an instance of a DNSResolver, optionally configured by 408 /// the \a options struct. This method may return a non-OK status if an error 409 /// occurred when creating the DNSResolver. If the caller requests a custom 410 /// DNS server, and the EventEngine implementation does not support it, this 411 /// must return an error. 412 virtual absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver( 413 const DNSResolver::ResolverOptions& options) = 0; 414 415 /// Asynchronously executes a task as soon as possible. 416 /// 417 /// \a Closures passed to \a Run cannot be cancelled. The \a closure will not 418 /// be deleted after it has been run, ownership remains with the caller. 419 /// 420 /// Implementations must not execute the closure in the calling thread before 421 /// \a Run returns. For example, if the caller must release a lock before the 422 /// closure can proceed, running the closure immediately would cause a 423 /// deadlock. 424 virtual void Run(Closure* closure) = 0; 425 /// Asynchronously executes a task as soon as possible. 426 /// 427 /// \a Closures passed to \a Run cannot be cancelled. Unlike the overloaded \a 428 /// Closure alternative, the absl::AnyInvocable version's \a closure will be 429 /// deleted by the EventEngine after the closure has been run. 430 /// 431 /// This version of \a Run may be less performant than the \a Closure version 432 /// in some scenarios. This overload is useful in situations where performance 433 /// is not a critical concern. 434 /// 435 /// Implementations must not execute the closure in the calling thread before 436 /// \a Run returns. 437 virtual void Run(absl::AnyInvocable<void()> closure) = 0; 438 /// Synonymous with scheduling an alarm to run after duration \a when. 439 /// 440 /// The \a closure will execute when time \a when arrives unless it has been 441 /// cancelled via the \a Cancel method. If cancelled, the closure will not be 442 /// run, nor will it be deleted. Ownership remains with the caller. 443 /// 444 /// Implementations must not execute the closure in the calling thread before 445 /// \a RunAfter returns. 446 /// 447 /// Implementations may return a \a kInvalid handle if the callback can be 448 /// immediately executed, and is therefore not cancellable. 449 virtual TaskHandle RunAfter(Duration when, Closure* closure) = 0; 450 /// Synonymous with scheduling an alarm to run after duration \a when. 451 /// 452 /// The \a closure will execute when time \a when arrives unless it has been 453 /// cancelled via the \a Cancel method. If cancelled, the closure will not be 454 /// run. Unlike the overloaded \a Closure alternative, the absl::AnyInvocable 455 /// version's \a closure will be deleted by the EventEngine after the closure 456 /// has been run, or upon cancellation. 457 /// 458 /// This version of \a RunAfter may be less performant than the \a Closure 459 /// version in some scenarios. This overload is useful in situations where 460 /// performance is not a critical concern. 461 /// 462 /// Implementations must not execute the closure in the calling thread before 463 /// \a RunAfter returns. 464 virtual TaskHandle RunAfter(Duration when, 465 absl::AnyInvocable<void()> closure) = 0; 466 /// Request cancellation of a task. 467 /// 468 /// If the associated closure cannot be cancelled for any reason, this 469 /// function will return false. 470 /// 471 /// If the associated closure can be cancelled, the associated callback will 472 /// never be run, and this method will return true. If the callback type was 473 /// an absl::AnyInvocable, it will be destroyed before the method returns. 474 virtual bool Cancel(TaskHandle handle) = 0; 475 }; 476 477 /// Replace gRPC's default EventEngine factory. 478 /// 479 /// Applications may call \a SetEventEngineFactory at any time to replace the 480 /// default factory used within gRPC. EventEngines will be created when 481 /// necessary, when they are otherwise not provided by the application. 482 /// 483 /// To be certain that none of the gRPC-provided built-in EventEngines are 484 /// created, applications must set a custom EventEngine factory method *before* 485 /// grpc is initialized. 486 void SetEventEngineFactory( 487 absl::AnyInvocable<std::unique_ptr<EventEngine>()> factory); 488 489 /// Reset gRPC's EventEngine factory to the built-in default. 490 /// 491 /// Applications that have called \a SetEventEngineFactory can remove their 492 /// custom factory using this method. The built-in EventEngine factories will be 493 /// used going forward. This has no affect on any EventEngines that were created 494 /// using the previous factories. 495 void EventEngineFactoryReset(); 496 /// Create an EventEngine using the default factory. 497 std::unique_ptr<EventEngine> CreateEventEngine(); 498 499 } // namespace experimental 500 } // namespace grpc_event_engine 501 502 #endif // GRPC_EVENT_ENGINE_EVENT_ENGINE_H 503