1 /*
2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.test.tracing.coroutines
18 
19 import android.platform.test.annotations.EnableFlags
20 import com.android.app.tracing.coroutines.createCoroutineTracingContext
21 import com.android.app.tracing.coroutines.flow.collectTraced
22 import com.android.app.tracing.coroutines.flow.filterTraced
23 import com.android.app.tracing.coroutines.flow.flowName
24 import com.android.app.tracing.coroutines.flow.mapTraced
25 import com.android.app.tracing.coroutines.launchTraced
26 import com.android.systemui.Flags.FLAG_COROUTINE_TRACING
27 import kotlin.coroutines.CoroutineContext
28 import kotlinx.coroutines.CompletableDeferred
29 import kotlinx.coroutines.DelicateCoroutinesApi
30 import kotlinx.coroutines.ExperimentalCoroutinesApi
31 import kotlinx.coroutines.cancel
32 import kotlinx.coroutines.delay
33 import kotlinx.coroutines.flow.filter
34 import kotlinx.coroutines.flow.flow
35 import kotlinx.coroutines.flow.flowOf
36 import kotlinx.coroutines.flow.flowOn
37 import kotlinx.coroutines.flow.map
38 import kotlinx.coroutines.flow.stateIn
39 import kotlinx.coroutines.flow.transform
40 import kotlinx.coroutines.newSingleThreadContext
41 import org.junit.Assert.assertEquals
42 import org.junit.Test
43 
44 @OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class)
45 @EnableFlags(FLAG_COROUTINE_TRACING)
46 class FlowTracingTest : TestBase() {
47 
48     override val extraCoroutineContext: CoroutineContext
49         get() = createCoroutineTracingContext("main", includeParentNames = true, strictMode = true)
50 
51     @Test
collectFlow1null52     fun collectFlow1() {
53         val coldFlow = flow {
54             expect(1, "main:1^")
55             delay(1)
56             expect(2, "main:1^")
57             emit(42)
58             expect(4, "main:1^")
59             delay(1)
60             expect(5, "main:1^")
61         }
62         runTest {
63             coldFlow.collect {
64                 assertEquals(42, it)
65                 expect(3, "main:1^")
66             }
67             delay(1)
68             finish(6, "main:1^")
69         }
70     }
71 
72     @Test
collectFlow2null73     fun collectFlow2() {
74         val coldFlow =
75             flow {
76                     expect(1, "main:1^")
77                     delay(1)
78                     expect(2)
79                     emit(1)
80                     expect(5, "main:1^")
81                     delay(1)
82                     finish(6)
83                 }
84                 .flowName("new-name")
85         runTest {
86             coldFlow.collect {
87                 expect(3, "main:1^")
88                 delay(1)
89                 expect(4, "main:1^")
90             }
91         }
92     }
93 
94     @Test
collectFlow3null95     fun collectFlow3() {
96         val thread1 = newSingleThreadContext("thread-#1")
97         val coldFlow =
98             flow {
99                     expect("main:1^:1^new-name")
100                     delay(1)
101                     expect("main:1^:1^new-name")
102                     emit(42)
103                     expect("main:1^:1^new-name")
104                     delay(1)
105                     expect("main:1^:1^new-name")
106                 }
107                 .flowName("new-name")
108                 .flowOn(thread1)
109         runTest {
110             coldFlow.collect {
111                 assertEquals(42, it)
112                 expect("main:1^")
113                 delay(1)
114                 expect("main:1^")
115             }
116         }
117     }
118 
119     @Test
collectFlow4null120     fun collectFlow4() {
121         val thread1 = newSingleThreadContext("thread-#1")
122         val coldFlow =
123             flow {
124                     expect("main:1^:1^new-name")
125                     delay(1)
126                     expect("main:1^:1^new-name")
127                     emit(42)
128                     expect("main:1^:1^new-name")
129                     delay(1)
130                     expect("main:1^:1^new-name")
131                 }
132                 .flowOn(thread1)
133                 .flowName("new-name")
134         runTest {
135             coldFlow.collect {
136                 assertEquals(42, it)
137                 expect("main:1^")
138                 delay(1)
139                 expect("main:1^")
140             }
141         }
142     }
143 
144     @Test
collectFlow5null145     fun collectFlow5() {
146         val thread1 = newSingleThreadContext("thread-#1")
147         val coldFlow =
148             flow {
149                     expect("main:1^:1^new-name")
150                     delay(1)
151                     expect("main:1^:1^new-name")
152                     emit(42)
153                     expect("main:1^:1^new-name")
154                     delay(1)
155                     expect("main:1^:1^new-name")
156                 }
157                 .flowName("new-name")
158                 .flowOn(thread1)
159                 .flowName("UNUSED_NAME")
160 
161         runTest {
162             coldFlow.collect {
163                 assertEquals(42, it)
164                 expect("main:1^")
165             }
166             delay(1)
167             expect("main:1^")
168         }
169     }
170 
171     @Test
collectFlow6null172     fun collectFlow6() {
173         val barrier1 = CompletableDeferred<Unit>()
174         val barrier2 = CompletableDeferred<Unit>()
175         val thread1 = newSingleThreadContext("thread-#1")
176         val thread2 = newSingleThreadContext("thread-#2")
177         val thread3 = newSingleThreadContext("thread-#3")
178         val coldFlow =
179             flow {
180                     expect(2, "main:1^:1^name-for-filter:1^name-for-map:1^name-for-emit")
181                     delay(1)
182                     expect(3, "main:1^:1^name-for-filter:1^name-for-map:1^name-for-emit")
183                     emit(42)
184                     barrier1.await()
185                     expect(9, "main:1^:1^name-for-filter:1^name-for-map:1^name-for-emit")
186                     delay(1)
187                     expect(10, "main:1^:1^name-for-filter:1^name-for-map:1^name-for-emit")
188                     barrier2.complete(Unit)
189                 }
190                 .flowName("name-for-emit")
191                 .flowOn(thread3)
192                 .map {
193                     expect(4, "main:1^:1^name-for-filter:1^name-for-map")
194                     delay(1)
195                     expect(5, "main:1^:1^name-for-filter:1^name-for-map")
196                     it
197                 }
198                 .flowName("name-for-map")
199                 .flowOn(thread2)
200                 .filter {
201                     expect(6, "main:1^:1^name-for-filter")
202                     delay(1)
203                     expect(7, "main:1^:1^name-for-filter")
204                     true
205                 }
206                 .flowName("name-for-filter")
207                 .flowOn(thread1)
208 
209         runTest {
210             expect(1, "main:1^")
211             coldFlow.collect {
212                 assertEquals(42, it)
213                 expect(8, "main:1^")
214                 barrier1.complete(Unit)
215             }
216             barrier2.await()
217             finish(11, "main:1^")
218         }
219     }
220 
221     @Test
<lambda>null222     fun collectFlow7_withIntermediateOperatorNames() = runTest {
223         expect(1, "main:1^")
224         flow {
225                 expect(2, "main:1^", "collect:do-the-assert")
226                 emit(21) // 42 / 2 = 21
227                 expect(6, "main:1^", "collect:do-the-assert")
228             }
229             .flowName("UNUSED_NAME") // unused because scope is unchanged and operators are fused
230             .mapTraced("multiply-by-3") {
231                 expect(3, "main:1^", "collect:do-the-assert", "map:multiply-by-3:transform")
232                 it * 2
233             }
234             .filterTraced("mod-2") {
235                 expect(
236                     4,
237                     "main:1^",
238                     "collect:do-the-assert",
239                     "map:multiply-by-3:emit",
240                     "filter:mod-2:predicate",
241                 )
242                 it % 2 == 0
243             }
244             .collectTraced("do-the-assert") {
245                 assertEquals(42, it)
246                 expect(
247                     5,
248                     "main:1^",
249                     "collect:do-the-assert",
250                     "map:multiply-by-3:emit",
251                     "filter:mod-2:emit",
252                     "collect:do-the-assert:emit",
253                 )
254             }
255         finish(7, "main:1^")
256     }
257 
258     @Test
<lambda>null259     fun collectFlow8_separateJobs() = runTest {
260         val flowThread = newSingleThreadContext("flow-thread")
261         expect(1, "main:1^")
262         val state =
263             flowOf(1, 2, 3, 4)
264                 .transform {
265                     expect("main:1^:1^:1^FLOW_NAME")
266                     emit(it)
267                 }
268                 .flowName("unused-name")
269                 .transform {
270                     expect("main:1^:1^:1^FLOW_NAME")
271                     emit(it)
272                 }
273                 .flowName("FLOW_NAME")
274                 .flowOn(flowThread)
275                 .transform {
276                     expect("main:1^:1^")
277                     emit(it)
278                 }
279                 .stateIn(this)
280 
281         launchTraced("LAUNCH_CALL") {
282             state.collectTraced("state-flow") {
283                 expect(2, "main:1^:2^LAUNCH_CALL", "collect:state-flow", "collect:state-flow:emit")
284             }
285         }
286 
287         delay(50)
288         finish(3, "main:1^")
289         cancel()
290     }
291 }
292