1 /* 2 * Copyright (C) 2024 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.test.tracing.coroutines 18 19 import android.platform.test.annotations.EnableFlags 20 import com.android.app.tracing.coroutines.createCoroutineTracingContext 21 import com.android.app.tracing.coroutines.flow.collectTraced 22 import com.android.app.tracing.coroutines.flow.filterTraced 23 import com.android.app.tracing.coroutines.flow.flowName 24 import com.android.app.tracing.coroutines.flow.mapTraced 25 import com.android.app.tracing.coroutines.launchTraced 26 import com.android.systemui.Flags.FLAG_COROUTINE_TRACING 27 import kotlin.coroutines.CoroutineContext 28 import kotlinx.coroutines.CompletableDeferred 29 import kotlinx.coroutines.DelicateCoroutinesApi 30 import kotlinx.coroutines.ExperimentalCoroutinesApi 31 import kotlinx.coroutines.cancel 32 import kotlinx.coroutines.delay 33 import kotlinx.coroutines.flow.filter 34 import kotlinx.coroutines.flow.flow 35 import kotlinx.coroutines.flow.flowOf 36 import kotlinx.coroutines.flow.flowOn 37 import kotlinx.coroutines.flow.map 38 import kotlinx.coroutines.flow.stateIn 39 import kotlinx.coroutines.flow.transform 40 import kotlinx.coroutines.newSingleThreadContext 41 import org.junit.Assert.assertEquals 42 import org.junit.Test 43 44 @OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class) 45 @EnableFlags(FLAG_COROUTINE_TRACING) 46 class FlowTracingTest : TestBase() { 47 48 override val extraCoroutineContext: CoroutineContext 49 get() = createCoroutineTracingContext("main", includeParentNames = true, strictMode = true) 50 51 @Test collectFlow1null52 fun collectFlow1() { 53 val coldFlow = flow { 54 expect(1, "main:1^") 55 delay(1) 56 expect(2, "main:1^") 57 emit(42) 58 expect(4, "main:1^") 59 delay(1) 60 expect(5, "main:1^") 61 } 62 runTest { 63 coldFlow.collect { 64 assertEquals(42, it) 65 expect(3, "main:1^") 66 } 67 delay(1) 68 finish(6, "main:1^") 69 } 70 } 71 72 @Test collectFlow2null73 fun collectFlow2() { 74 val coldFlow = 75 flow { 76 expect(1, "main:1^") 77 delay(1) 78 expect(2) 79 emit(1) 80 expect(5, "main:1^") 81 delay(1) 82 finish(6) 83 } 84 .flowName("new-name") 85 runTest { 86 coldFlow.collect { 87 expect(3, "main:1^") 88 delay(1) 89 expect(4, "main:1^") 90 } 91 } 92 } 93 94 @Test collectFlow3null95 fun collectFlow3() { 96 val thread1 = newSingleThreadContext("thread-#1") 97 val coldFlow = 98 flow { 99 expect("main:1^:1^new-name") 100 delay(1) 101 expect("main:1^:1^new-name") 102 emit(42) 103 expect("main:1^:1^new-name") 104 delay(1) 105 expect("main:1^:1^new-name") 106 } 107 .flowName("new-name") 108 .flowOn(thread1) 109 runTest { 110 coldFlow.collect { 111 assertEquals(42, it) 112 expect("main:1^") 113 delay(1) 114 expect("main:1^") 115 } 116 } 117 } 118 119 @Test collectFlow4null120 fun collectFlow4() { 121 val thread1 = newSingleThreadContext("thread-#1") 122 val coldFlow = 123 flow { 124 expect("main:1^:1^new-name") 125 delay(1) 126 expect("main:1^:1^new-name") 127 emit(42) 128 expect("main:1^:1^new-name") 129 delay(1) 130 expect("main:1^:1^new-name") 131 } 132 .flowOn(thread1) 133 .flowName("new-name") 134 runTest { 135 coldFlow.collect { 136 assertEquals(42, it) 137 expect("main:1^") 138 delay(1) 139 expect("main:1^") 140 } 141 } 142 } 143 144 @Test collectFlow5null145 fun collectFlow5() { 146 val thread1 = newSingleThreadContext("thread-#1") 147 val coldFlow = 148 flow { 149 expect("main:1^:1^new-name") 150 delay(1) 151 expect("main:1^:1^new-name") 152 emit(42) 153 expect("main:1^:1^new-name") 154 delay(1) 155 expect("main:1^:1^new-name") 156 } 157 .flowName("new-name") 158 .flowOn(thread1) 159 .flowName("UNUSED_NAME") 160 161 runTest { 162 coldFlow.collect { 163 assertEquals(42, it) 164 expect("main:1^") 165 } 166 delay(1) 167 expect("main:1^") 168 } 169 } 170 171 @Test collectFlow6null172 fun collectFlow6() { 173 val barrier1 = CompletableDeferred<Unit>() 174 val barrier2 = CompletableDeferred<Unit>() 175 val thread1 = newSingleThreadContext("thread-#1") 176 val thread2 = newSingleThreadContext("thread-#2") 177 val thread3 = newSingleThreadContext("thread-#3") 178 val coldFlow = 179 flow { 180 expect(2, "main:1^:1^name-for-filter:1^name-for-map:1^name-for-emit") 181 delay(1) 182 expect(3, "main:1^:1^name-for-filter:1^name-for-map:1^name-for-emit") 183 emit(42) 184 barrier1.await() 185 expect(9, "main:1^:1^name-for-filter:1^name-for-map:1^name-for-emit") 186 delay(1) 187 expect(10, "main:1^:1^name-for-filter:1^name-for-map:1^name-for-emit") 188 barrier2.complete(Unit) 189 } 190 .flowName("name-for-emit") 191 .flowOn(thread3) 192 .map { 193 expect(4, "main:1^:1^name-for-filter:1^name-for-map") 194 delay(1) 195 expect(5, "main:1^:1^name-for-filter:1^name-for-map") 196 it 197 } 198 .flowName("name-for-map") 199 .flowOn(thread2) 200 .filter { 201 expect(6, "main:1^:1^name-for-filter") 202 delay(1) 203 expect(7, "main:1^:1^name-for-filter") 204 true 205 } 206 .flowName("name-for-filter") 207 .flowOn(thread1) 208 209 runTest { 210 expect(1, "main:1^") 211 coldFlow.collect { 212 assertEquals(42, it) 213 expect(8, "main:1^") 214 barrier1.complete(Unit) 215 } 216 barrier2.await() 217 finish(11, "main:1^") 218 } 219 } 220 221 @Test <lambda>null222 fun collectFlow7_withIntermediateOperatorNames() = runTest { 223 expect(1, "main:1^") 224 flow { 225 expect(2, "main:1^", "collect:do-the-assert") 226 emit(21) // 42 / 2 = 21 227 expect(6, "main:1^", "collect:do-the-assert") 228 } 229 .flowName("UNUSED_NAME") // unused because scope is unchanged and operators are fused 230 .mapTraced("multiply-by-3") { 231 expect(3, "main:1^", "collect:do-the-assert", "map:multiply-by-3:transform") 232 it * 2 233 } 234 .filterTraced("mod-2") { 235 expect( 236 4, 237 "main:1^", 238 "collect:do-the-assert", 239 "map:multiply-by-3:emit", 240 "filter:mod-2:predicate", 241 ) 242 it % 2 == 0 243 } 244 .collectTraced("do-the-assert") { 245 assertEquals(42, it) 246 expect( 247 5, 248 "main:1^", 249 "collect:do-the-assert", 250 "map:multiply-by-3:emit", 251 "filter:mod-2:emit", 252 "collect:do-the-assert:emit", 253 ) 254 } 255 finish(7, "main:1^") 256 } 257 258 @Test <lambda>null259 fun collectFlow8_separateJobs() = runTest { 260 val flowThread = newSingleThreadContext("flow-thread") 261 expect(1, "main:1^") 262 val state = 263 flowOf(1, 2, 3, 4) 264 .transform { 265 expect("main:1^:1^:1^FLOW_NAME") 266 emit(it) 267 } 268 .flowName("unused-name") 269 .transform { 270 expect("main:1^:1^:1^FLOW_NAME") 271 emit(it) 272 } 273 .flowName("FLOW_NAME") 274 .flowOn(flowThread) 275 .transform { 276 expect("main:1^:1^") 277 emit(it) 278 } 279 .stateIn(this) 280 281 launchTraced("LAUNCH_CALL") { 282 state.collectTraced("state-flow") { 283 expect(2, "main:1^:2^LAUNCH_CALL", "collect:state-flow", "collect:state-flow:emit") 284 } 285 } 286 287 delay(50) 288 finish(3, "main:1^") 289 cancel() 290 } 291 } 292