1  /*
2   * Copyright (C) 2017 The Android Open Source Project
3   *
4   * Licensed under the Apache License, Version 2.0 (the "License");
5   * you may not use this file except in compliance with the License.
6   * You may obtain a copy of the License at
7   *
8   *      http://www.apache.org/licenses/LICENSE-2.0
9   *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  #define STATSD_DEBUG false
18  #include "Log.h"
19  
20  #include "StatsPullerManager.h"
21  
22  #include <cutils/log.h>
23  #include <math.h>
24  #include <stdint.h>
25  
26  #include <algorithm>
27  #include <iostream>
28  
29  #include "../StatsService.h"
30  #include "../logd/LogEvent.h"
31  #include "../stats_log_util.h"
32  #include "../statscompanion_util.h"
33  #include "StatsCallbackPuller.h"
34  #include "TrainInfoPuller.h"
35  #include "statslog_statsd.h"
36  
37  using std::shared_ptr;
38  using std::vector;
39  
40  namespace android {
41  namespace os {
42  namespace statsd {
43  
44  // Values smaller than this may require to update the alarm.
45  const int64_t NO_ALARM_UPDATE = INT64_MAX;
46  
StatsPullerManager()47  StatsPullerManager::StatsPullerManager()
48      : kAllPullAtomInfo({
49                // TrainInfo.
50                {{.uid = AID_STATSD, .atomTag = util::TRAIN_INFO}, new TrainInfoPuller()},
51        }),
52        mNextPullTimeNs(NO_ALARM_UPDATE) {
53  }
54  
Pull(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)55  bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs,
56                                vector<shared_ptr<LogEvent>>* data) {
57      ATRACE_CALL();
58      std::lock_guard<std::mutex> _l(mLock);
59      return PullLocked(tagId, configKey, eventTimeNs, data);
60  }
61  
Pull(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<std::shared_ptr<LogEvent>> * data)62  bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs,
63                                vector<std::shared_ptr<LogEvent>>* data) {
64      ATRACE_CALL();
65      std::lock_guard<std::mutex> _l(mLock);
66      return PullLocked(tagId, uids, eventTimeNs, data);
67  }
68  
PullLocked(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)69  bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey,
70                                      const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
71      vector<int32_t> uids;
72      const auto& uidProviderIt = mPullUidProviders.find(configKey);
73      if (uidProviderIt == mPullUidProviders.end()) {
74          ALOGE("Error pulling tag %d. No pull uid provider for config key %s", tagId,
75                configKey.ToString().c_str());
76          StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
77          return false;
78      }
79      sp<PullUidProvider> pullUidProvider = uidProviderIt->second.promote();
80      if (pullUidProvider == nullptr) {
81          ALOGE("Error pulling tag %d, pull uid provider for config %s is gone.", tagId,
82                configKey.ToString().c_str());
83          StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
84          return false;
85      }
86      uids = pullUidProvider->getPullAtomUids(tagId);
87      return PullLocked(tagId, uids, eventTimeNs, data);
88  }
89  
PullLocked(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)90  bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids,
91                                      const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
92      VLOG("Initiating pulling %d", tagId);
93      for (int32_t uid : uids) {
94          PullerKey key = {.uid = uid, .atomTag = tagId};
95          auto pullerIt = kAllPullAtomInfo.find(key);
96          if (pullerIt != kAllPullAtomInfo.end()) {
97              PullErrorCode status = pullerIt->second->Pull(eventTimeNs, data);
98              VLOG("pulled %zu items", data->size());
99              if (status != PULL_SUCCESS) {
100                  StatsdStats::getInstance().notePullFailed(tagId);
101              }
102              // If we received a dead object exception, it means the client process has died.
103              // We can remove the puller from the map.
104              if (status == PULL_DEAD_OBJECT) {
105                  StatsdStats::getInstance().notePullerCallbackRegistrationChanged(
106                          tagId,
107                          /*registered=*/false);
108                  kAllPullAtomInfo.erase(pullerIt);
109              }
110              return status == PULL_SUCCESS;
111          }
112      }
113      StatsdStats::getInstance().notePullerNotFound(tagId);
114      ALOGW("StatsPullerManager: Unknown tagId %d", tagId);
115      return false;  // Return early since we don't know what to pull.
116  }
117  
PullerForMatcherExists(int tagId) const118  bool StatsPullerManager::PullerForMatcherExists(int tagId) const {
119      // Pulled atoms might be registered after we parse the config, so just make sure the id is in
120      // an appropriate range.
121      return isVendorPulledAtom(tagId) || isPulledAtom(tagId);
122  }
123  
updateAlarmLocked()124  void StatsPullerManager::updateAlarmLocked() {
125      if (mNextPullTimeNs == NO_ALARM_UPDATE) {
126          VLOG("No need to set alarms. Skipping");
127          return;
128      }
129  
130      // TODO(b/151045771): do not hold a lock while making a binder call
131      if (mStatsCompanionService != nullptr) {
132          mStatsCompanionService->setPullingAlarm(mNextPullTimeNs / 1000000);
133      } else {
134          VLOG("StatsCompanionService not available. Alarm not set.");
135      }
136      return;
137  }
138  
SetStatsCompanionService(const shared_ptr<IStatsCompanionService> & statsCompanionService)139  void StatsPullerManager::SetStatsCompanionService(
140          const shared_ptr<IStatsCompanionService>& statsCompanionService) {
141      std::lock_guard<std::mutex> _l(mLock);
142      shared_ptr<IStatsCompanionService> tmpForLock = mStatsCompanionService;
143      mStatsCompanionService = statsCompanionService;
144      for (const auto& pulledAtom : kAllPullAtomInfo) {
145          pulledAtom.second->SetStatsCompanionService(statsCompanionService);
146      }
147      if (mStatsCompanionService != nullptr) {
148          updateAlarmLocked();
149      }
150  }
151  
RegisterReceiver(int tagId,const ConfigKey & configKey,const wp<PullDataReceiver> & receiver,int64_t nextPullTimeNs,int64_t intervalNs)152  void StatsPullerManager::RegisterReceiver(int tagId, const ConfigKey& configKey,
153                                            const wp<PullDataReceiver>& receiver,
154                                            int64_t nextPullTimeNs, int64_t intervalNs) {
155      std::lock_guard<std::mutex> _l(mLock);
156      auto& receivers = mReceivers[{.atomTag = tagId, .configKey = configKey}];
157      for (auto it = receivers.begin(); it != receivers.end(); it++) {
158          if (it->receiver == receiver) {
159              VLOG("Receiver already registered of %d", (int)receivers.size());
160              return;
161          }
162      }
163      ReceiverInfo receiverInfo;
164      receiverInfo.receiver = receiver;
165  
166      // Round it to the nearest minutes. This is the limit of alarm manager.
167      // In practice, we should always have larger buckets.
168      int64_t roundedIntervalNs = intervalNs / NS_PER_SEC / 60 * NS_PER_SEC * 60;
169      // Scheduled pulling should be at least 1 min apart.
170      // This can be lower in cts tests, in which case we round it to 1 min.
171      if (roundedIntervalNs < 60 * (int64_t)NS_PER_SEC) {
172          roundedIntervalNs = 60 * (int64_t)NS_PER_SEC;
173      }
174  
175      receiverInfo.intervalNs = roundedIntervalNs;
176      receiverInfo.nextPullTimeNs = nextPullTimeNs;
177      receivers.push_back(receiverInfo);
178  
179      // There is only one alarm for all pulled events. So only set it to the smallest denom.
180      if (nextPullTimeNs < mNextPullTimeNs) {
181          VLOG("Updating next pull time %lld", (long long)mNextPullTimeNs);
182          mNextPullTimeNs = nextPullTimeNs;
183          updateAlarmLocked();
184      }
185      VLOG("Puller for tagId %d registered of %d", tagId, (int)receivers.size());
186  }
187  
UnRegisterReceiver(int tagId,const ConfigKey & configKey,const wp<PullDataReceiver> & receiver)188  void StatsPullerManager::UnRegisterReceiver(int tagId, const ConfigKey& configKey,
189                                              const wp<PullDataReceiver>& receiver) {
190      std::lock_guard<std::mutex> _l(mLock);
191      auto receiversIt = mReceivers.find({.atomTag = tagId, .configKey = configKey});
192      if (receiversIt == mReceivers.end()) {
193          VLOG("Unknown pull code or no receivers: %d", tagId);
194          return;
195      }
196      std::list<ReceiverInfo>& receivers = receiversIt->second;
197      for (auto it = receivers.begin(); it != receivers.end(); it++) {
198          if (receiver == it->receiver) {
199              receivers.erase(it);
200              VLOG("Puller for tagId %d unregistered of %d", tagId, (int)receivers.size());
201              return;
202          }
203      }
204  }
205  
RegisterPullUidProvider(const ConfigKey & configKey,const wp<PullUidProvider> & provider)206  void StatsPullerManager::RegisterPullUidProvider(const ConfigKey& configKey,
207                                                   const wp<PullUidProvider>& provider) {
208      std::lock_guard<std::mutex> _l(mLock);
209      mPullUidProviders[configKey] = provider;
210  }
211  
UnregisterPullUidProvider(const ConfigKey & configKey,const wp<PullUidProvider> & provider)212  void StatsPullerManager::UnregisterPullUidProvider(const ConfigKey& configKey,
213                                                     const wp<PullUidProvider>& provider) {
214      std::lock_guard<std::mutex> _l(mLock);
215      const auto& it = mPullUidProviders.find(configKey);
216      if (it != mPullUidProviders.end() && it->second == provider) {
217          mPullUidProviders.erase(it);
218      }
219  }
220  
OnAlarmFired(int64_t elapsedTimeNs)221  void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
222      ATRACE_CALL();
223      std::lock_guard<std::mutex> _l(mLock);
224      int64_t wallClockNs = getWallClockNs();
225  
226      int64_t minNextPullTimeNs = NO_ALARM_UPDATE;
227  
228      vector<pair<const ReceiverKey*, vector<ReceiverInfo*>>> needToPull;
229      for (auto& pair : mReceivers) {
230          vector<ReceiverInfo*> receivers;
231          if (pair.second.size() != 0) {
232              for (ReceiverInfo& receiverInfo : pair.second) {
233                  // If pullNecessary and enough time has passed for the next bucket, then add
234                  // receiver to the list that will pull on this alarm.
235                  // If pullNecessary is false, check if next pull time needs to be updated.
236                  sp<PullDataReceiver> receiverPtr = receiverInfo.receiver.promote();
237                  if (receiverInfo.nextPullTimeNs <= elapsedTimeNs && receiverPtr != nullptr &&
238                      receiverPtr->isPullNeeded()) {
239                      receivers.push_back(&receiverInfo);
240                  } else {
241                      if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) {
242                          receiverPtr->onDataPulled({}, PullResult::PULL_NOT_NEEDED, elapsedTimeNs);
243                          int numBucketsAhead = (elapsedTimeNs - receiverInfo.nextPullTimeNs) /
244                                                receiverInfo.intervalNs;
245                          receiverInfo.nextPullTimeNs +=
246                                  (numBucketsAhead + 1) * receiverInfo.intervalNs;
247                      }
248                      minNextPullTimeNs = min(receiverInfo.nextPullTimeNs, minNextPullTimeNs);
249                  }
250              }
251              if (receivers.size() > 0) {
252                  needToPull.push_back(make_pair(&pair.first, receivers));
253              }
254          }
255      }
256      for (const auto& pullInfo : needToPull) {
257          vector<shared_ptr<LogEvent>> data;
258          PullResult pullResult =
259                  PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey, elapsedTimeNs, &data)
260                          ? PullResult::PULL_RESULT_SUCCESS
261                          : PullResult::PULL_RESULT_FAIL;
262          if (pullResult == PullResult::PULL_RESULT_FAIL) {
263              VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
264          }
265  
266          // Convention is to mark pull atom timestamp at request time.
267          // If we pull at t0, puller starts at t1, finishes at t2, and send back
268          // at t3, we mark t0 as its timestamp, which should correspond to its
269          // triggering event, such as condition change at t0.
270          // Here the triggering event is alarm fired from AlarmManager.
271          // In ValueMetricProducer and GaugeMetricProducer we do same thing
272          // when pull on condition change, etc.
273          for (auto& event : data) {
274              event->setElapsedTimestampNs(elapsedTimeNs);
275              event->setLogdWallClockTimestampNs(wallClockNs);
276          }
277  
278          for (const auto& receiverInfo : pullInfo.second) {
279              sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
280              if (receiverPtr != nullptr) {
281                  receiverPtr->onDataPulled(data, pullResult, elapsedTimeNs);
282                  // We may have just come out of a coma, compute next pull time.
283                  int numBucketsAhead =
284                          (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
285                  receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
286                  minNextPullTimeNs = min(receiverInfo->nextPullTimeNs, minNextPullTimeNs);
287              } else {
288                  VLOG("receiver already gone.");
289              }
290          }
291      }
292  
293      VLOG("mNextPullTimeNs: %lld updated to %lld", (long long)mNextPullTimeNs,
294           (long long)minNextPullTimeNs);
295      mNextPullTimeNs = minNextPullTimeNs;
296      updateAlarmLocked();
297  }
298  
ForceClearPullerCache()299  int StatsPullerManager::ForceClearPullerCache() {
300      ATRACE_CALL();
301      std::lock_guard<std::mutex> _l(mLock);
302      int totalCleared = 0;
303      for (const auto& pulledAtom : kAllPullAtomInfo) {
304          totalCleared += pulledAtom.second->ForceClearCache();
305      }
306      return totalCleared;
307  }
308  
ClearPullerCacheIfNecessary(int64_t timestampNs)309  int StatsPullerManager::ClearPullerCacheIfNecessary(int64_t timestampNs) {
310      ATRACE_CALL();
311      std::lock_guard<std::mutex> _l(mLock);
312      int totalCleared = 0;
313      for (const auto& pulledAtom : kAllPullAtomInfo) {
314          totalCleared += pulledAtom.second->ClearCacheIfNecessary(timestampNs);
315      }
316      return totalCleared;
317  }
318  
RegisterPullAtomCallback(const int uid,const int32_t atomTag,const int64_t coolDownNs,const int64_t timeoutNs,const vector<int32_t> & additiveFields,const shared_ptr<IPullAtomCallback> & callback)319  void StatsPullerManager::RegisterPullAtomCallback(const int uid, const int32_t atomTag,
320                                                    const int64_t coolDownNs, const int64_t timeoutNs,
321                                                    const vector<int32_t>& additiveFields,
322                                                    const shared_ptr<IPullAtomCallback>& callback) {
323      ATRACE_CALL();
324      std::lock_guard<std::mutex> _l(mLock);
325      VLOG("RegisterPullerCallback: adding puller for tag %d", atomTag);
326  
327      if (callback == nullptr) {
328          ALOGW("SetPullAtomCallback called with null callback for atom %d.", atomTag);
329          return;
330      }
331  
332      int64_t actualCoolDownNs = coolDownNs < kMinCoolDownNs ? kMinCoolDownNs : coolDownNs;
333      int64_t actualTimeoutNs = timeoutNs > kMaxTimeoutNs ? kMaxTimeoutNs : timeoutNs;
334  
335      sp<StatsCallbackPuller> puller = new StatsCallbackPuller(atomTag, callback, actualCoolDownNs,
336                                                               actualTimeoutNs, additiveFields);
337      PullerKey key = {.uid = uid, .atomTag = atomTag};
338      auto it = kAllPullAtomInfo.find(key);
339      if (it != kAllPullAtomInfo.end()) {
340          StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
341                                                                           /*registered=*/false);
342      }
343      kAllPullAtomInfo[key] = puller;
344      StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag, /*registered=*/true);
345  }
346  
UnregisterPullAtomCallback(const int uid,const int32_t atomTag)347  void StatsPullerManager::UnregisterPullAtomCallback(const int uid, const int32_t atomTag) {
348      ATRACE_CALL();
349      std::lock_guard<std::mutex> _l(mLock);
350      PullerKey key = {.uid = uid, .atomTag = atomTag};
351      if (kAllPullAtomInfo.find(key) != kAllPullAtomInfo.end()) {
352          StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
353                                                                           /*registered=*/false);
354          kAllPullAtomInfo.erase(key);
355      }
356  }
357  
358  }  // namespace statsd
359  }  // namespace os
360  }  // namespace android
361