1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #define STATSD_DEBUG false 18 #include "Log.h" 19 20 #include "StatsPullerManager.h" 21 22 #include <cutils/log.h> 23 #include <math.h> 24 #include <stdint.h> 25 26 #include <algorithm> 27 #include <iostream> 28 29 #include "../StatsService.h" 30 #include "../logd/LogEvent.h" 31 #include "../stats_log_util.h" 32 #include "../statscompanion_util.h" 33 #include "StatsCallbackPuller.h" 34 #include "TrainInfoPuller.h" 35 #include "statslog_statsd.h" 36 37 using std::shared_ptr; 38 using std::vector; 39 40 namespace android { 41 namespace os { 42 namespace statsd { 43 44 // Values smaller than this may require to update the alarm. 45 const int64_t NO_ALARM_UPDATE = INT64_MAX; 46 StatsPullerManager()47 StatsPullerManager::StatsPullerManager() 48 : kAllPullAtomInfo({ 49 // TrainInfo. 50 {{.uid = AID_STATSD, .atomTag = util::TRAIN_INFO}, new TrainInfoPuller()}, 51 }), 52 mNextPullTimeNs(NO_ALARM_UPDATE) { 53 } 54 Pull(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)55 bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs, 56 vector<shared_ptr<LogEvent>>* data) { 57 ATRACE_CALL(); 58 std::lock_guard<std::mutex> _l(mLock); 59 return PullLocked(tagId, configKey, eventTimeNs, data); 60 } 61 Pull(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<std::shared_ptr<LogEvent>> * data)62 bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs, 63 vector<std::shared_ptr<LogEvent>>* data) { 64 ATRACE_CALL(); 65 std::lock_guard<std::mutex> _l(mLock); 66 return PullLocked(tagId, uids, eventTimeNs, data); 67 } 68 PullLocked(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)69 bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey, 70 const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) { 71 vector<int32_t> uids; 72 const auto& uidProviderIt = mPullUidProviders.find(configKey); 73 if (uidProviderIt == mPullUidProviders.end()) { 74 ALOGE("Error pulling tag %d. No pull uid provider for config key %s", tagId, 75 configKey.ToString().c_str()); 76 StatsdStats::getInstance().notePullUidProviderNotFound(tagId); 77 return false; 78 } 79 sp<PullUidProvider> pullUidProvider = uidProviderIt->second.promote(); 80 if (pullUidProvider == nullptr) { 81 ALOGE("Error pulling tag %d, pull uid provider for config %s is gone.", tagId, 82 configKey.ToString().c_str()); 83 StatsdStats::getInstance().notePullUidProviderNotFound(tagId); 84 return false; 85 } 86 uids = pullUidProvider->getPullAtomUids(tagId); 87 return PullLocked(tagId, uids, eventTimeNs, data); 88 } 89 PullLocked(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)90 bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids, 91 const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) { 92 VLOG("Initiating pulling %d", tagId); 93 for (int32_t uid : uids) { 94 PullerKey key = {.uid = uid, .atomTag = tagId}; 95 auto pullerIt = kAllPullAtomInfo.find(key); 96 if (pullerIt != kAllPullAtomInfo.end()) { 97 PullErrorCode status = pullerIt->second->Pull(eventTimeNs, data); 98 VLOG("pulled %zu items", data->size()); 99 if (status != PULL_SUCCESS) { 100 StatsdStats::getInstance().notePullFailed(tagId); 101 } 102 // If we received a dead object exception, it means the client process has died. 103 // We can remove the puller from the map. 104 if (status == PULL_DEAD_OBJECT) { 105 StatsdStats::getInstance().notePullerCallbackRegistrationChanged( 106 tagId, 107 /*registered=*/false); 108 kAllPullAtomInfo.erase(pullerIt); 109 } 110 return status == PULL_SUCCESS; 111 } 112 } 113 StatsdStats::getInstance().notePullerNotFound(tagId); 114 ALOGW("StatsPullerManager: Unknown tagId %d", tagId); 115 return false; // Return early since we don't know what to pull. 116 } 117 PullerForMatcherExists(int tagId) const118 bool StatsPullerManager::PullerForMatcherExists(int tagId) const { 119 // Pulled atoms might be registered after we parse the config, so just make sure the id is in 120 // an appropriate range. 121 return isVendorPulledAtom(tagId) || isPulledAtom(tagId); 122 } 123 updateAlarmLocked()124 void StatsPullerManager::updateAlarmLocked() { 125 if (mNextPullTimeNs == NO_ALARM_UPDATE) { 126 VLOG("No need to set alarms. Skipping"); 127 return; 128 } 129 130 // TODO(b/151045771): do not hold a lock while making a binder call 131 if (mStatsCompanionService != nullptr) { 132 mStatsCompanionService->setPullingAlarm(mNextPullTimeNs / 1000000); 133 } else { 134 VLOG("StatsCompanionService not available. Alarm not set."); 135 } 136 return; 137 } 138 SetStatsCompanionService(const shared_ptr<IStatsCompanionService> & statsCompanionService)139 void StatsPullerManager::SetStatsCompanionService( 140 const shared_ptr<IStatsCompanionService>& statsCompanionService) { 141 std::lock_guard<std::mutex> _l(mLock); 142 shared_ptr<IStatsCompanionService> tmpForLock = mStatsCompanionService; 143 mStatsCompanionService = statsCompanionService; 144 for (const auto& pulledAtom : kAllPullAtomInfo) { 145 pulledAtom.second->SetStatsCompanionService(statsCompanionService); 146 } 147 if (mStatsCompanionService != nullptr) { 148 updateAlarmLocked(); 149 } 150 } 151 RegisterReceiver(int tagId,const ConfigKey & configKey,const wp<PullDataReceiver> & receiver,int64_t nextPullTimeNs,int64_t intervalNs)152 void StatsPullerManager::RegisterReceiver(int tagId, const ConfigKey& configKey, 153 const wp<PullDataReceiver>& receiver, 154 int64_t nextPullTimeNs, int64_t intervalNs) { 155 std::lock_guard<std::mutex> _l(mLock); 156 auto& receivers = mReceivers[{.atomTag = tagId, .configKey = configKey}]; 157 for (auto it = receivers.begin(); it != receivers.end(); it++) { 158 if (it->receiver == receiver) { 159 VLOG("Receiver already registered of %d", (int)receivers.size()); 160 return; 161 } 162 } 163 ReceiverInfo receiverInfo; 164 receiverInfo.receiver = receiver; 165 166 // Round it to the nearest minutes. This is the limit of alarm manager. 167 // In practice, we should always have larger buckets. 168 int64_t roundedIntervalNs = intervalNs / NS_PER_SEC / 60 * NS_PER_SEC * 60; 169 // Scheduled pulling should be at least 1 min apart. 170 // This can be lower in cts tests, in which case we round it to 1 min. 171 if (roundedIntervalNs < 60 * (int64_t)NS_PER_SEC) { 172 roundedIntervalNs = 60 * (int64_t)NS_PER_SEC; 173 } 174 175 receiverInfo.intervalNs = roundedIntervalNs; 176 receiverInfo.nextPullTimeNs = nextPullTimeNs; 177 receivers.push_back(receiverInfo); 178 179 // There is only one alarm for all pulled events. So only set it to the smallest denom. 180 if (nextPullTimeNs < mNextPullTimeNs) { 181 VLOG("Updating next pull time %lld", (long long)mNextPullTimeNs); 182 mNextPullTimeNs = nextPullTimeNs; 183 updateAlarmLocked(); 184 } 185 VLOG("Puller for tagId %d registered of %d", tagId, (int)receivers.size()); 186 } 187 UnRegisterReceiver(int tagId,const ConfigKey & configKey,const wp<PullDataReceiver> & receiver)188 void StatsPullerManager::UnRegisterReceiver(int tagId, const ConfigKey& configKey, 189 const wp<PullDataReceiver>& receiver) { 190 std::lock_guard<std::mutex> _l(mLock); 191 auto receiversIt = mReceivers.find({.atomTag = tagId, .configKey = configKey}); 192 if (receiversIt == mReceivers.end()) { 193 VLOG("Unknown pull code or no receivers: %d", tagId); 194 return; 195 } 196 std::list<ReceiverInfo>& receivers = receiversIt->second; 197 for (auto it = receivers.begin(); it != receivers.end(); it++) { 198 if (receiver == it->receiver) { 199 receivers.erase(it); 200 VLOG("Puller for tagId %d unregistered of %d", tagId, (int)receivers.size()); 201 return; 202 } 203 } 204 } 205 RegisterPullUidProvider(const ConfigKey & configKey,const wp<PullUidProvider> & provider)206 void StatsPullerManager::RegisterPullUidProvider(const ConfigKey& configKey, 207 const wp<PullUidProvider>& provider) { 208 std::lock_guard<std::mutex> _l(mLock); 209 mPullUidProviders[configKey] = provider; 210 } 211 UnregisterPullUidProvider(const ConfigKey & configKey,const wp<PullUidProvider> & provider)212 void StatsPullerManager::UnregisterPullUidProvider(const ConfigKey& configKey, 213 const wp<PullUidProvider>& provider) { 214 std::lock_guard<std::mutex> _l(mLock); 215 const auto& it = mPullUidProviders.find(configKey); 216 if (it != mPullUidProviders.end() && it->second == provider) { 217 mPullUidProviders.erase(it); 218 } 219 } 220 OnAlarmFired(int64_t elapsedTimeNs)221 void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) { 222 ATRACE_CALL(); 223 std::lock_guard<std::mutex> _l(mLock); 224 int64_t wallClockNs = getWallClockNs(); 225 226 int64_t minNextPullTimeNs = NO_ALARM_UPDATE; 227 228 vector<pair<const ReceiverKey*, vector<ReceiverInfo*>>> needToPull; 229 for (auto& pair : mReceivers) { 230 vector<ReceiverInfo*> receivers; 231 if (pair.second.size() != 0) { 232 for (ReceiverInfo& receiverInfo : pair.second) { 233 // If pullNecessary and enough time has passed for the next bucket, then add 234 // receiver to the list that will pull on this alarm. 235 // If pullNecessary is false, check if next pull time needs to be updated. 236 sp<PullDataReceiver> receiverPtr = receiverInfo.receiver.promote(); 237 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs && receiverPtr != nullptr && 238 receiverPtr->isPullNeeded()) { 239 receivers.push_back(&receiverInfo); 240 } else { 241 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) { 242 receiverPtr->onDataPulled({}, PullResult::PULL_NOT_NEEDED, elapsedTimeNs); 243 int numBucketsAhead = (elapsedTimeNs - receiverInfo.nextPullTimeNs) / 244 receiverInfo.intervalNs; 245 receiverInfo.nextPullTimeNs += 246 (numBucketsAhead + 1) * receiverInfo.intervalNs; 247 } 248 minNextPullTimeNs = min(receiverInfo.nextPullTimeNs, minNextPullTimeNs); 249 } 250 } 251 if (receivers.size() > 0) { 252 needToPull.push_back(make_pair(&pair.first, receivers)); 253 } 254 } 255 } 256 for (const auto& pullInfo : needToPull) { 257 vector<shared_ptr<LogEvent>> data; 258 PullResult pullResult = 259 PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey, elapsedTimeNs, &data) 260 ? PullResult::PULL_RESULT_SUCCESS 261 : PullResult::PULL_RESULT_FAIL; 262 if (pullResult == PullResult::PULL_RESULT_FAIL) { 263 VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs); 264 } 265 266 // Convention is to mark pull atom timestamp at request time. 267 // If we pull at t0, puller starts at t1, finishes at t2, and send back 268 // at t3, we mark t0 as its timestamp, which should correspond to its 269 // triggering event, such as condition change at t0. 270 // Here the triggering event is alarm fired from AlarmManager. 271 // In ValueMetricProducer and GaugeMetricProducer we do same thing 272 // when pull on condition change, etc. 273 for (auto& event : data) { 274 event->setElapsedTimestampNs(elapsedTimeNs); 275 event->setLogdWallClockTimestampNs(wallClockNs); 276 } 277 278 for (const auto& receiverInfo : pullInfo.second) { 279 sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote(); 280 if (receiverPtr != nullptr) { 281 receiverPtr->onDataPulled(data, pullResult, elapsedTimeNs); 282 // We may have just come out of a coma, compute next pull time. 283 int numBucketsAhead = 284 (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs; 285 receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs; 286 minNextPullTimeNs = min(receiverInfo->nextPullTimeNs, minNextPullTimeNs); 287 } else { 288 VLOG("receiver already gone."); 289 } 290 } 291 } 292 293 VLOG("mNextPullTimeNs: %lld updated to %lld", (long long)mNextPullTimeNs, 294 (long long)minNextPullTimeNs); 295 mNextPullTimeNs = minNextPullTimeNs; 296 updateAlarmLocked(); 297 } 298 ForceClearPullerCache()299 int StatsPullerManager::ForceClearPullerCache() { 300 ATRACE_CALL(); 301 std::lock_guard<std::mutex> _l(mLock); 302 int totalCleared = 0; 303 for (const auto& pulledAtom : kAllPullAtomInfo) { 304 totalCleared += pulledAtom.second->ForceClearCache(); 305 } 306 return totalCleared; 307 } 308 ClearPullerCacheIfNecessary(int64_t timestampNs)309 int StatsPullerManager::ClearPullerCacheIfNecessary(int64_t timestampNs) { 310 ATRACE_CALL(); 311 std::lock_guard<std::mutex> _l(mLock); 312 int totalCleared = 0; 313 for (const auto& pulledAtom : kAllPullAtomInfo) { 314 totalCleared += pulledAtom.second->ClearCacheIfNecessary(timestampNs); 315 } 316 return totalCleared; 317 } 318 RegisterPullAtomCallback(const int uid,const int32_t atomTag,const int64_t coolDownNs,const int64_t timeoutNs,const vector<int32_t> & additiveFields,const shared_ptr<IPullAtomCallback> & callback)319 void StatsPullerManager::RegisterPullAtomCallback(const int uid, const int32_t atomTag, 320 const int64_t coolDownNs, const int64_t timeoutNs, 321 const vector<int32_t>& additiveFields, 322 const shared_ptr<IPullAtomCallback>& callback) { 323 ATRACE_CALL(); 324 std::lock_guard<std::mutex> _l(mLock); 325 VLOG("RegisterPullerCallback: adding puller for tag %d", atomTag); 326 327 if (callback == nullptr) { 328 ALOGW("SetPullAtomCallback called with null callback for atom %d.", atomTag); 329 return; 330 } 331 332 int64_t actualCoolDownNs = coolDownNs < kMinCoolDownNs ? kMinCoolDownNs : coolDownNs; 333 int64_t actualTimeoutNs = timeoutNs > kMaxTimeoutNs ? kMaxTimeoutNs : timeoutNs; 334 335 sp<StatsCallbackPuller> puller = new StatsCallbackPuller(atomTag, callback, actualCoolDownNs, 336 actualTimeoutNs, additiveFields); 337 PullerKey key = {.uid = uid, .atomTag = atomTag}; 338 auto it = kAllPullAtomInfo.find(key); 339 if (it != kAllPullAtomInfo.end()) { 340 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag, 341 /*registered=*/false); 342 } 343 kAllPullAtomInfo[key] = puller; 344 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag, /*registered=*/true); 345 } 346 UnregisterPullAtomCallback(const int uid,const int32_t atomTag)347 void StatsPullerManager::UnregisterPullAtomCallback(const int uid, const int32_t atomTag) { 348 ATRACE_CALL(); 349 std::lock_guard<std::mutex> _l(mLock); 350 PullerKey key = {.uid = uid, .atomTag = atomTag}; 351 if (kAllPullAtomInfo.find(key) != kAllPullAtomInfo.end()) { 352 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag, 353 /*registered=*/false); 354 kAllPullAtomInfo.erase(key); 355 } 356 } 357 358 } // namespace statsd 359 } // namespace os 360 } // namespace android 361