xref: /aosp_15_r20/external/grpc-grpc/include/grpc/event_engine/event_engine.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2021 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #ifndef GRPC_EVENT_ENGINE_EVENT_ENGINE_H
15 #define GRPC_EVENT_ENGINE_EVENT_ENGINE_H
16 
17 #include <vector>
18 
19 #include "absl/functional/any_invocable.h"
20 #include "absl/status/status.h"
21 #include "absl/status/statusor.h"
22 
23 #include <grpc/event_engine/endpoint_config.h>
24 #include <grpc/event_engine/extensible.h>
25 #include <grpc/event_engine/memory_allocator.h>
26 #include <grpc/event_engine/port.h>
27 #include <grpc/event_engine/slice_buffer.h>
28 #include <grpc/support/port_platform.h>
29 
30 // TODO(vigneshbabu): Define the Endpoint::Write metrics collection system
31 namespace grpc_event_engine {
32 namespace experimental {
33 
34 ////////////////////////////////////////////////////////////////////////////////
35 /// The EventEngine Interface
36 ///
37 /// Overview
38 /// --------
39 ///
40 /// The EventEngine encapsulates all platform-specific behaviors related to low
41 /// level network I/O, timers, asynchronous execution, and DNS resolution.
42 ///
43 /// This interface allows developers to provide their own event management and
44 /// network stacks. Motivating uses cases for supporting custom EventEngines
45 /// include the ability to hook into external event loops, and using different
46 /// EventEngine instances for each channel to better insulate network I/O and
47 /// callback processing from other channels.
48 ///
49 /// A default cross-platform EventEngine instance is provided by gRPC.
50 ///
51 /// Lifespan and Ownership
52 /// ----------------------
53 ///
54 /// gRPC takes shared ownership of EventEngines via std::shared_ptrs to ensure
55 /// that the engines remain available until they are no longer needed. Depending
56 /// on the use case, engines may live until gRPC is shut down.
57 ///
58 /// EXAMPLE USAGE (Not yet implemented)
59 ///
60 /// Custom EventEngines can be specified per channel, and allow configuration
61 /// for both clients and servers. To set a custom EventEngine for a client
62 /// channel, you can do something like the following:
63 ///
64 ///    ChannelArguments args;
65 ///    std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...);
66 ///    args.SetEventEngine(engine);
67 ///    MyAppClient client(grpc::CreateCustomChannel(
68 ///        "localhost:50051", grpc::InsecureChannelCredentials(), args));
69 ///
70 /// A gRPC server can use a custom EventEngine by calling the
71 /// ServerBuilder::SetEventEngine method:
72 ///
73 ///    ServerBuilder builder;
74 ///    std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...);
75 ///    builder.SetEventEngine(engine);
76 ///    std::unique_ptr<Server> server(builder.BuildAndStart());
77 ///    server->Wait();
78 ///
79 ///
80 /// Blocking EventEngine Callbacks
81 /// ------------------------------
82 ///
83 /// Doing blocking work in EventEngine callbacks is generally not advisable.
84 /// While gRPC's default EventEngine implementations have some capacity to scale
85 /// their thread pools to avoid starvation, this is not an instantaneous
86 /// process. Further, user-provided EventEngines may not be optimized to handle
87 /// excessive blocking work at all.
88 ///
89 /// *Best Practice* : Occasional blocking work may be fine, but we do not
90 /// recommend running a mostly blocking workload in EventEngine threads.
91 ///
92 ///
93 /// Thread-safety guarantees
94 /// ------------------------
95 ///
96 /// All EventEngine methods are guaranteed to be thread-safe, no external
97 /// synchronization is required to call any EventEngine method. Please note that
98 /// this does not apply to application callbacks, which may be run concurrently;
99 /// application state synchronization must be managed by the application.
100 ///
101 ////////////////////////////////////////////////////////////////////////////////
102 class EventEngine : public std::enable_shared_from_this<EventEngine>,
103                     public Extensible {
104  public:
105   /// A duration between two events.
106   ///
107   /// Throughout the EventEngine API durations are used to express how long
108   /// until an action should be performed.
109   using Duration = std::chrono::duration<int64_t, std::nano>;
110   /// A custom closure type for EventEngine task execution.
111   ///
112   /// Throughout the EventEngine API, \a Closure ownership is retained by the
113   /// caller - the EventEngine will never delete a Closure, and upon
114   /// cancellation, the EventEngine will simply forget the Closure exists. The
115   /// caller is responsible for all necessary cleanup.
116 
117   class Closure {
118    public:
119     Closure() = default;
120     // Closure's are an interface, and thus non-copyable.
121     Closure(const Closure&) = delete;
122     Closure& operator=(const Closure&) = delete;
123     // Polymorphic type => virtual destructor
124     virtual ~Closure() = default;
125     // Run the contained code.
126     virtual void Run() = 0;
127   };
128   /// Represents a scheduled task.
129   ///
130   /// \a TaskHandles are returned by \a Run* methods, and can be given to the
131   /// \a Cancel method.
132   struct TaskHandle {
133     intptr_t keys[2];
134     static const GRPC_DLL TaskHandle kInvalid;
135     friend bool operator==(const TaskHandle& lhs, const TaskHandle& rhs);
136     friend bool operator!=(const TaskHandle& lhs, const TaskHandle& rhs);
137   };
138   /// A handle to a cancellable connection attempt.
139   ///
140   /// Returned by \a Connect, and can be passed to \a CancelConnect.
141   struct ConnectionHandle {
142     intptr_t keys[2];
143     static const GRPC_DLL ConnectionHandle kInvalid;
144     friend bool operator==(const ConnectionHandle& lhs,
145                            const ConnectionHandle& rhs);
146     friend bool operator!=(const ConnectionHandle& lhs,
147                            const ConnectionHandle& rhs);
148   };
149   /// Thin wrapper around a platform-specific sockaddr type. A sockaddr struct
150   /// exists on all platforms that gRPC supports.
151   ///
152   /// Platforms are expected to provide definitions for:
153   /// * sockaddr
154   /// * sockaddr_in
155   /// * sockaddr_in6
156   class ResolvedAddress {
157    public:
158     static constexpr socklen_t MAX_SIZE_BYTES = 128;
159 
160     ResolvedAddress(const sockaddr* address, socklen_t size);
161     ResolvedAddress() = default;
162     ResolvedAddress(const ResolvedAddress&) = default;
163     const struct sockaddr* address() const;
164     socklen_t size() const;
165 
166    private:
167     char address_[MAX_SIZE_BYTES] = {};
168     socklen_t size_ = 0;
169   };
170 
171   /// One end of a connection between a gRPC client and server. Endpoints are
172   /// created when connections are established, and Endpoint operations are
173   /// gRPC's primary means of communication.
174   ///
175   /// Endpoints must use the provided MemoryAllocator for all data buffer memory
176   /// allocations. gRPC allows applications to set memory constraints per
177   /// Channel or Server, and the implementation depends on all dynamic memory
178   /// allocation being handled by the quota system.
179   class Endpoint : public Extensible {
180    public:
181     /// Shuts down all connections and invokes all pending read or write
182     /// callbacks with an error status.
183     virtual ~Endpoint() = default;
184     /// A struct representing optional arguments that may be provided to an
185     /// EventEngine Endpoint Read API  call.
186     ///
187     /// Passed as argument to an Endpoint \a Read
188     struct ReadArgs {
189       // A suggestion to the endpoint implementation to read at-least the
190       // specified number of bytes over the network connection before marking
191       // the endpoint read operation as complete. gRPC may use this argument
192       // to minimize the number of endpoint read API calls over the lifetime
193       // of a connection.
194       int64_t read_hint_bytes;
195     };
196     /// Reads data from the Endpoint.
197     ///
198     /// When data is available on the connection, that data is moved into the
199     /// \a buffer. If the read succeeds immediately, it returns true and the \a
200     /// on_read callback is not executed. Otherwise it returns false and the \a
201     /// on_read callback executes asynchronously when the read completes. The
202     /// caller must ensure that the callback has access to the buffer when it
203     /// executes. Ownership of the buffer is not transferred. Valid slices *may*
204     /// be placed into the buffer even if the callback is invoked with a non-OK
205     /// Status.
206     ///
207     /// There can be at most one outstanding read per Endpoint at any given
208     /// time. An outstanding read is one in which the \a on_read callback has
209     /// not yet been executed for some previous call to \a Read.  If an attempt
210     /// is made to call \a Read while a previous read is still outstanding, the
211     /// \a EventEngine must abort.
212     ///
213     /// For failed read operations, implementations should pass the appropriate
214     /// statuses to \a on_read. For example, callbacks might expect to receive
215     /// CANCELLED on endpoint shutdown.
216     virtual bool Read(absl::AnyInvocable<void(absl::Status)> on_read,
217                       SliceBuffer* buffer, const ReadArgs* args) = 0;
218     /// A struct representing optional arguments that may be provided to an
219     /// EventEngine Endpoint Write API call.
220     ///
221     /// Passed as argument to an Endpoint \a Write
222     struct WriteArgs {
223       // Represents private information that may be passed by gRPC for
224       // select endpoints expected to be used only within google.
225       void* google_specific = nullptr;
226       // A suggestion to the endpoint implementation to group data to be written
227       // into frames of the specified max_frame_size. gRPC may use this
228       // argument to dynamically control the max sizes of frames sent to a
229       // receiver in response to high receiver memory pressure.
230       int64_t max_frame_size;
231     };
232     /// Writes data out on the connection.
233     ///
234     /// If the write succeeds immediately, it returns true and the
235     /// \a on_writable callback is not executed. Otherwise it returns false and
236     /// the \a on_writable callback is called asynchronously when the connection
237     /// is ready for more data. The Slices within the \a data buffer may be
238     /// mutated at will by the Endpoint until \a on_writable is called. The \a
239     /// data SliceBuffer will remain valid after calling \a Write, but its state
240     /// is otherwise undefined.  All bytes in \a data must have been written
241     /// before calling \a on_writable unless an error has occurred.
242     ///
243     /// There can be at most one outstanding write per Endpoint at any given
244     /// time. An outstanding write is one in which the \a on_writable callback
245     /// has not yet been executed for some previous call to \a Write.  If an
246     /// attempt is made to call \a Write while a previous write is still
247     /// outstanding, the \a EventEngine must abort.
248     ///
249     /// For failed write operations, implementations should pass the appropriate
250     /// statuses to \a on_writable. For example, callbacks might expect to
251     /// receive CANCELLED on endpoint shutdown.
252     virtual bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
253                        SliceBuffer* data, const WriteArgs* args) = 0;
254     /// Returns an address in the format described in DNSResolver. The returned
255     /// values are expected to remain valid for the life of the Endpoint.
256     virtual const ResolvedAddress& GetPeerAddress() const = 0;
257     virtual const ResolvedAddress& GetLocalAddress() const = 0;
258   };
259 
260   /// Called when a new connection is established.
261   ///
262   /// If the connection attempt was not successful, implementations should pass
263   /// the appropriate statuses to this callback. For example, callbacks might
264   /// expect to receive DEADLINE_EXCEEDED statuses when appropriate, or
265   /// CANCELLED statuses on EventEngine shutdown.
266   using OnConnectCallback =
267       absl::AnyInvocable<void(absl::StatusOr<std::unique_ptr<Endpoint>>)>;
268 
269   /// Listens for incoming connection requests from gRPC clients and initiates
270   /// request processing once connections are established.
271   class Listener : public Extensible {
272    public:
273     /// Called when the listener has accepted a new client connection.
274     using AcceptCallback = absl::AnyInvocable<void(
275         std::unique_ptr<Endpoint>, MemoryAllocator memory_allocator)>;
276     virtual ~Listener() = default;
277     /// Bind an address/port to this Listener.
278     ///
279     /// It is expected that multiple addresses/ports can be bound to this
280     /// Listener before Listener::Start has been called. Returns either the
281     /// bound port or an appropriate error status.
282     virtual absl::StatusOr<int> Bind(const ResolvedAddress& addr) = 0;
283     virtual absl::Status Start() = 0;
284   };
285 
286   /// Factory method to create a network listener / server.
287   ///
288   /// Once a \a Listener is created and started, the \a on_accept callback will
289   /// be called once asynchronously for each established connection. This method
290   /// may return a non-OK status immediately if an error was encountered in any
291   /// synchronous steps required to create the Listener. In this case,
292   /// \a on_shutdown will never be called.
293   ///
294   /// If this method returns a Listener, then \a on_shutdown will be invoked
295   /// exactly once when the Listener is shut down, and only after all
296   /// \a on_accept callbacks have finished executing. The status passed to it
297   /// will indicate if there was a problem during shutdown.
298   ///
299   /// The provided \a MemoryAllocatorFactory is used to create \a
300   /// MemoryAllocators for Endpoint construction.
301   virtual absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
302       Listener::AcceptCallback on_accept,
303       absl::AnyInvocable<void(absl::Status)> on_shutdown,
304       const EndpointConfig& config,
305       std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) = 0;
306   /// Creates a client network connection to a remote network listener.
307   ///
308   /// Even in the event of an error, it is expected that the \a on_connect
309   /// callback will be asynchronously executed exactly once by the EventEngine.
310   /// A connection attempt can be cancelled using the \a CancelConnect method.
311   ///
312   /// Implementation Note: it is important that the \a memory_allocator be used
313   /// for all read/write buffer allocations in the EventEngine implementation.
314   /// This allows gRPC's \a ResourceQuota system to monitor and control memory
315   /// usage with graceful degradation mechanisms. Please see the \a
316   /// MemoryAllocator API for more information.
317   virtual ConnectionHandle Connect(OnConnectCallback on_connect,
318                                    const ResolvedAddress& addr,
319                                    const EndpointConfig& args,
320                                    MemoryAllocator memory_allocator,
321                                    Duration timeout) = 0;
322 
323   /// Request cancellation of a connection attempt.
324   ///
325   /// If the associated connection has already been completed, it will not be
326   /// cancelled, and this method will return false.
327   ///
328   /// If the associated connection has not been completed, it will be cancelled,
329   /// and this method will return true. The \a OnConnectCallback will not be
330   /// called, and \a on_connect will be destroyed before this method returns.
331   virtual bool CancelConnect(ConnectionHandle handle) = 0;
332   /// Provides asynchronous resolution.
333   ///
334   /// This object has a destruction-is-cancellation semantic.
335   /// Implementations should make sure that all pending requests are cancelled
336   /// when the object is destroyed and all pending callbacks will be called
337   /// shortly. If cancellation races with request completion, implementations
338   /// may choose to either cancel or satisfy the request.
339   class DNSResolver {
340    public:
341     /// Optional configuration for DNSResolvers.
342     struct ResolverOptions {
343       /// If empty, default DNS servers will be used.
344       /// Must be in the "IP:port" format as described in naming.md.
345       std::string dns_server;
346     };
347     /// DNS SRV record type.
348     struct SRVRecord {
349       std::string host;
350       int port = 0;
351       int priority = 0;
352       int weight = 0;
353     };
354     /// Called with the collection of sockaddrs that were resolved from a given
355     /// target address.
356     using LookupHostnameCallback =
357         absl::AnyInvocable<void(absl::StatusOr<std::vector<ResolvedAddress>>)>;
358     /// Called with a collection of SRV records.
359     using LookupSRVCallback =
360         absl::AnyInvocable<void(absl::StatusOr<std::vector<SRVRecord>>)>;
361     /// Called with the result of a TXT record lookup
362     using LookupTXTCallback =
363         absl::AnyInvocable<void(absl::StatusOr<std::vector<std::string>>)>;
364 
365     virtual ~DNSResolver() = default;
366 
367     /// Asynchronously resolve an address.
368     ///
369     /// \a default_port may be a non-numeric named service port, and will only
370     /// be used if \a address does not already contain a port component.
371     ///
372     /// When the lookup is complete or cancelled, the \a on_resolve callback
373     /// will be invoked with a status indicating the success or failure of the
374     /// lookup. Implementations should pass the appropriate statuses to the
375     /// callback. For example, callbacks might expect to receive CANCELLED or
376     /// NOT_FOUND.
377     virtual void LookupHostname(LookupHostnameCallback on_resolve,
378                                 absl::string_view name,
379                                 absl::string_view default_port) = 0;
380     /// Asynchronously perform an SRV record lookup.
381     ///
382     /// \a on_resolve has the same meaning and expectations as \a
383     /// LookupHostname's \a on_resolve callback.
384     virtual void LookupSRV(LookupSRVCallback on_resolve,
385                            absl::string_view name) = 0;
386     /// Asynchronously perform a TXT record lookup.
387     ///
388     /// \a on_resolve has the same meaning and expectations as \a
389     /// LookupHostname's \a on_resolve callback.
390     virtual void LookupTXT(LookupTXTCallback on_resolve,
391                            absl::string_view name) = 0;
392   };
393 
394   /// At time of destruction, the EventEngine must have no active
395   /// responsibilities. EventEngine users (applications) are responsible for
396   /// cancelling all tasks and DNS lookups, shutting down listeners and
397   /// endpoints, prior to EventEngine destruction. If there are any outstanding
398   /// tasks, any running listeners, etc. at time of EventEngine destruction,
399   /// that is an invalid use of the API, and it will result in undefined
400   /// behavior.
401   virtual ~EventEngine() = default;
402 
403   // TODO(nnoble): consider whether we can remove this method before we
404   // de-experimentalize this API.
405   virtual bool IsWorkerThread() = 0;
406 
407   /// Creates and returns an instance of a DNSResolver, optionally configured by
408   /// the \a options struct. This method may return a non-OK status if an error
409   /// occurred when creating the DNSResolver. If the caller requests a custom
410   /// DNS server, and the EventEngine implementation does not support it, this
411   /// must return an error.
412   virtual absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver(
413       const DNSResolver::ResolverOptions& options) = 0;
414 
415   /// Asynchronously executes a task as soon as possible.
416   ///
417   /// \a Closures passed to \a Run cannot be cancelled. The \a closure will not
418   /// be deleted after it has been run, ownership remains with the caller.
419   ///
420   /// Implementations must not execute the closure in the calling thread before
421   /// \a Run returns. For example, if the caller must release a lock before the
422   /// closure can proceed, running the closure immediately would cause a
423   /// deadlock.
424   virtual void Run(Closure* closure) = 0;
425   /// Asynchronously executes a task as soon as possible.
426   ///
427   /// \a Closures passed to \a Run cannot be cancelled. Unlike the overloaded \a
428   /// Closure alternative, the absl::AnyInvocable version's \a closure will be
429   /// deleted by the EventEngine after the closure has been run.
430   ///
431   /// This version of \a Run may be less performant than the \a Closure version
432   /// in some scenarios. This overload is useful in situations where performance
433   /// is not a critical concern.
434   ///
435   /// Implementations must not execute the closure in the calling thread before
436   /// \a Run returns.
437   virtual void Run(absl::AnyInvocable<void()> closure) = 0;
438   /// Synonymous with scheduling an alarm to run after duration \a when.
439   ///
440   /// The \a closure will execute when time \a when arrives unless it has been
441   /// cancelled via the \a Cancel method. If cancelled, the closure will not be
442   /// run, nor will it be deleted. Ownership remains with the caller.
443   ///
444   /// Implementations must not execute the closure in the calling thread before
445   /// \a RunAfter returns.
446   ///
447   /// Implementations may return a \a kInvalid handle if the callback can be
448   /// immediately executed, and is therefore not cancellable.
449   virtual TaskHandle RunAfter(Duration when, Closure* closure) = 0;
450   /// Synonymous with scheduling an alarm to run after duration \a when.
451   ///
452   /// The \a closure will execute when time \a when arrives unless it has been
453   /// cancelled via the \a Cancel method. If cancelled, the closure will not be
454   /// run. Unlike the overloaded \a Closure alternative, the absl::AnyInvocable
455   /// version's \a closure will be deleted by the EventEngine after the closure
456   /// has been run, or upon cancellation.
457   ///
458   /// This version of \a RunAfter may be less performant than the \a Closure
459   /// version in some scenarios. This overload is useful in situations where
460   /// performance is not a critical concern.
461   ///
462   /// Implementations must not execute the closure in the calling thread before
463   /// \a RunAfter returns.
464   virtual TaskHandle RunAfter(Duration when,
465                               absl::AnyInvocable<void()> closure) = 0;
466   /// Request cancellation of a task.
467   ///
468   /// If the associated closure cannot be cancelled for any reason, this
469   /// function will return false.
470   ///
471   /// If the associated closure can be cancelled, the associated callback will
472   /// never be run, and this method will return true. If the callback type was
473   /// an absl::AnyInvocable, it will be destroyed before the method returns.
474   virtual bool Cancel(TaskHandle handle) = 0;
475 };
476 
477 /// Replace gRPC's default EventEngine factory.
478 ///
479 /// Applications may call \a SetEventEngineFactory at any time to replace the
480 /// default factory used within gRPC. EventEngines will be created when
481 /// necessary, when they are otherwise not provided by the application.
482 ///
483 /// To be certain that none of the gRPC-provided built-in EventEngines are
484 /// created, applications must set a custom EventEngine factory method *before*
485 /// grpc is initialized.
486 void SetEventEngineFactory(
487     absl::AnyInvocable<std::unique_ptr<EventEngine>()> factory);
488 
489 /// Reset gRPC's EventEngine factory to the built-in default.
490 ///
491 /// Applications that have called \a SetEventEngineFactory can remove their
492 /// custom factory using this method. The built-in EventEngine factories will be
493 /// used going forward. This has no affect on any EventEngines that were created
494 /// using the previous factories.
495 void EventEngineFactoryReset();
496 /// Create an EventEngine using the default factory.
497 std::unique_ptr<EventEngine> CreateEventEngine();
498 
499 }  // namespace experimental
500 }  // namespace grpc_event_engine
501 
502 #endif  // GRPC_EVENT_ENGINE_EVENT_ENGINE_H
503