xref: /aosp_15_r20/external/grpc-grpc/src/cpp/server/orca/orca_service.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <stddef.h>
18 
19 #include <map>
20 #include <memory>
21 #include <utility>
22 
23 #include "absl/base/thread_annotations.h"
24 #include "absl/strings/string_view.h"
25 #include "absl/time/time.h"
26 #include "absl/types/optional.h"
27 #include "google/protobuf/duration.upb.h"
28 #include "upb/base/string_view.h"
29 #include "upb/mem/arena.hpp"
30 #include "xds/data/orca/v3/orca_load_report.upb.h"
31 #include "xds/service/orca/v3/orca.upb.h"
32 
33 #include <grpc/event_engine/event_engine.h>
34 #include <grpc/support/log.h>
35 #include <grpcpp/ext/orca_service.h>
36 #include <grpcpp/ext/server_metric_recorder.h>
37 #include <grpcpp/impl/rpc_method.h>
38 #include <grpcpp/impl/rpc_service_method.h>
39 #include <grpcpp/impl/server_callback_handlers.h>
40 #include <grpcpp/impl/sync.h>
41 #include <grpcpp/server_context.h>
42 #include <grpcpp/support/byte_buffer.h>
43 #include <grpcpp/support/server_callback.h>
44 #include <grpcpp/support/slice.h>
45 #include <grpcpp/support/status.h>
46 
47 #include "src/core/lib/event_engine/default_event_engine.h"
48 #include "src/core/lib/gprpp/debug_location.h"
49 #include "src/core/lib/gprpp/ref_counted.h"
50 #include "src/core/lib/gprpp/ref_counted_ptr.h"
51 #include "src/core/lib/gprpp/time.h"
52 #include "src/core/lib/iomgr/exec_ctx.h"
53 #include "src/core/load_balancing/backend_metric_data.h"
54 #include "src/cpp/server/backend_metric_recorder.h"
55 
56 namespace grpc {
57 namespace experimental {
58 
59 using ::grpc_event_engine::experimental::EventEngine;
60 
61 //
62 // OrcaService::Reactor
63 //
64 
65 class OrcaService::Reactor : public ServerWriteReactor<ByteBuffer>,
66                              public grpc_core::RefCounted<Reactor> {
67  public:
Reactor(OrcaService * service,const ByteBuffer * request_buffer)68   explicit Reactor(OrcaService* service, const ByteBuffer* request_buffer)
69       : RefCounted("OrcaService::Reactor"),
70         service_(service),
71         engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {
72     // Get slice from request.
73     Slice slice;
74     GPR_ASSERT(request_buffer->DumpToSingleSlice(&slice).ok());
75     // Parse request proto.
76     upb::Arena arena;
77     xds_service_orca_v3_OrcaLoadReportRequest* request =
78         xds_service_orca_v3_OrcaLoadReportRequest_parse(
79             reinterpret_cast<const char*>(slice.begin()), slice.size(),
80             arena.ptr());
81     if (request == nullptr) {
82       Finish(Status(StatusCode::INTERNAL, "could not parse request proto"));
83       return;
84     }
85     const auto* duration_proto =
86         xds_service_orca_v3_OrcaLoadReportRequest_report_interval(request);
87     if (duration_proto != nullptr) {
88       report_interval_ = grpc_core::Duration::FromSecondsAndNanoseconds(
89           google_protobuf_Duration_seconds(duration_proto),
90           google_protobuf_Duration_nanos(duration_proto));
91     }
92     auto min_interval = grpc_core::Duration::Milliseconds(
93         service_->min_report_duration_ / absl::Milliseconds(1));
94     if (report_interval_ < min_interval) report_interval_ = min_interval;
95     // Send initial response.
96     SendResponse();
97   }
98 
OnWriteDone(bool ok)99   void OnWriteDone(bool ok) override {
100     if (!ok) {
101       Finish(Status(StatusCode::UNKNOWN, "write failed"));
102       return;
103     }
104     response_.Clear();
105     if (!MaybeScheduleTimer()) {
106       Finish(Status(StatusCode::UNKNOWN, "call cancelled by client"));
107     }
108   }
109 
OnCancel()110   void OnCancel() override {
111     if (MaybeCancelTimer()) {
112       Finish(Status(StatusCode::UNKNOWN, "call cancelled by client"));
113     }
114   }
115 
OnDone()116   void OnDone() override {
117     // Free the initial ref from instantiation.
118     Unref(DEBUG_LOCATION, "OnDone");
119   }
120 
121  private:
SendResponse()122   void SendResponse() {
123     Slice response_slice = service_->GetOrCreateSerializedResponse();
124     ByteBuffer response_buffer(&response_slice, 1);
125     response_.Swap(&response_buffer);
126     StartWrite(&response_);
127   }
128 
MaybeScheduleTimer()129   bool MaybeScheduleTimer() {
130     grpc::internal::MutexLock lock(&timer_mu_);
131     if (cancelled_) return false;
132     timer_handle_ = engine_->RunAfter(
133         report_interval_,
134         [self = Ref(DEBUG_LOCATION, "Orca Service")] { self->OnTimer(); });
135     return true;
136   }
137 
MaybeCancelTimer()138   bool MaybeCancelTimer() {
139     grpc::internal::MutexLock lock(&timer_mu_);
140     cancelled_ = true;
141     if (timer_handle_.has_value() && engine_->Cancel(*timer_handle_)) {
142       timer_handle_.reset();
143       return true;
144     }
145     return false;
146   }
147 
OnTimer()148   void OnTimer() {
149     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
150     grpc_core::ExecCtx exec_ctx;
151     grpc::internal::MutexLock lock(&timer_mu_);
152     timer_handle_.reset();
153     SendResponse();
154   }
155 
156   OrcaService* service_;
157 
158   grpc::internal::Mutex timer_mu_;
159   absl::optional<EventEngine::TaskHandle> timer_handle_
160       ABSL_GUARDED_BY(&timer_mu_);
161   bool cancelled_ ABSL_GUARDED_BY(&timer_mu_) = false;
162 
163   grpc_core::Duration report_interval_;
164   ByteBuffer response_;
165   std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
166 };
167 
168 //
169 // OrcaService
170 //
171 
OrcaService(ServerMetricRecorder * const server_metric_recorder,Options options)172 OrcaService::OrcaService(ServerMetricRecorder* const server_metric_recorder,
173                          Options options)
174     : server_metric_recorder_(server_metric_recorder),
175       min_report_duration_(options.min_report_duration) {
176   GPR_ASSERT(server_metric_recorder_ != nullptr);
177   AddMethod(new internal::RpcServiceMethod(
178       "/xds.service.orca.v3.OpenRcaService/StreamCoreMetrics",
179       internal::RpcMethod::SERVER_STREAMING, /*handler=*/nullptr));
180   MarkMethodCallback(
181       0, new internal::CallbackServerStreamingHandler<ByteBuffer, ByteBuffer>(
182              [this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) {
183                return new Reactor(this, request);
184              }));
185 }
186 
GetOrCreateSerializedResponse()187 Slice OrcaService::GetOrCreateSerializedResponse() {
188   grpc::internal::MutexLock lock(&mu_);
189   std::shared_ptr<const ServerMetricRecorder::BackendMetricDataState> result =
190       server_metric_recorder_->GetMetricsIfChanged();
191   if (!response_slice_seq_.has_value() ||
192       *response_slice_seq_ != result->sequence_number) {
193     const auto& data = result->data;
194     upb::Arena arena;
195     xds_data_orca_v3_OrcaLoadReport* response =
196         xds_data_orca_v3_OrcaLoadReport_new(arena.ptr());
197     if (data.cpu_utilization != -1) {
198       xds_data_orca_v3_OrcaLoadReport_set_cpu_utilization(response,
199                                                           data.cpu_utilization);
200     }
201     if (data.mem_utilization != -1) {
202       xds_data_orca_v3_OrcaLoadReport_set_mem_utilization(response,
203                                                           data.mem_utilization);
204     }
205     if (data.application_utilization != -1) {
206       xds_data_orca_v3_OrcaLoadReport_set_application_utilization(
207           response, data.application_utilization);
208     }
209     if (data.qps != -1) {
210       xds_data_orca_v3_OrcaLoadReport_set_rps_fractional(response, data.qps);
211     }
212     if (data.eps != -1) {
213       xds_data_orca_v3_OrcaLoadReport_set_eps(response, data.eps);
214     }
215     for (const auto& u : data.utilization) {
216       xds_data_orca_v3_OrcaLoadReport_utilization_set(
217           response,
218           upb_StringView_FromDataAndSize(u.first.data(), u.first.size()),
219           u.second, arena.ptr());
220     }
221     size_t buf_length;
222     char* buf = xds_data_orca_v3_OrcaLoadReport_serialize(response, arena.ptr(),
223                                                           &buf_length);
224     response_slice_.emplace(buf, buf_length);
225     response_slice_seq_ = result->sequence_number;
226   }
227   return Slice(*response_slice_);
228 }
229 
230 }  // namespace experimental
231 }  // namespace grpc
232