xref: /aosp_15_r20/external/kotlinx.coroutines/docs/topics/shared-mutable-state-and-concurrency.md (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)
1<!--- TEST_NAME SharedStateGuideTest -->
2
3[//]: # (title: Shared mutable state and concurrency)
4
5Coroutines can be executed parallelly using a multi-threaded dispatcher like the [Dispatchers.Default]. It presents
6all the usual parallelism problems. The main problem being synchronization of access to **shared mutable state**.
7Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world,
8but others are unique.
9
10## The problem
11
12Let us launch a hundred coroutines all doing the same action a thousand times.
13We'll also measure their completion time for further comparisons:
14
15```kotlin
16suspend fun massiveRun(action: suspend () -> Unit) {
17    val n = 100  // number of coroutines to launch
18    val k = 1000 // times an action is repeated by each coroutine
19    val time = measureTimeMillis {
20        coroutineScope { // scope for coroutines
21            repeat(n) {
22                launch {
23                    repeat(k) { action() }
24                }
25            }
26        }
27    }
28    println("Completed ${n * k} actions in $time ms")
29}
30```
31
32We start with a very simple action that increments a shared mutable variable using
33multi-threaded [Dispatchers.Default].
34
35<!--- CLEAR -->
36
37```kotlin
38import kotlinx.coroutines.*
39import kotlin.system.*
40
41suspend fun massiveRun(action: suspend () -> Unit) {
42    val n = 100  // number of coroutines to launch
43    val k = 1000 // times an action is repeated by each coroutine
44    val time = measureTimeMillis {
45        coroutineScope { // scope for coroutines
46            repeat(n) {
47                launch {
48                    repeat(k) { action() }
49                }
50            }
51        }
52    }
53    println("Completed ${n * k} actions in $time ms")
54}
55
56//sampleStart
57var counter = 0
58
59fun main() = runBlocking {
60    withContext(Dispatchers.Default) {
61        massiveRun {
62            counter++
63        }
64    }
65    println("Counter = $counter")
66}
67//sampleEnd
68```
69{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
70
71> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-01.kt).
72>
73{type="note"}
74
75<!--- TEST LINES_START
76Completed 100000 actions in
77Counter =
78-->
79
80What does it print at the end? It is highly unlikely to ever print "Counter = 100000", because a hundred coroutines
81increment the `counter` concurrently from multiple threads without any synchronization.
82
83## Volatiles are of no help
84
85There is a common misconception that making a variable `volatile` solves concurrency problem. Let us try it:
86
87<!--- CLEAR -->
88
89```kotlin
90import kotlinx.coroutines.*
91import kotlin.system.*
92
93suspend fun massiveRun(action: suspend () -> Unit) {
94    val n = 100  // number of coroutines to launch
95    val k = 1000 // times an action is repeated by each coroutine
96    val time = measureTimeMillis {
97        coroutineScope { // scope for coroutines
98            repeat(n) {
99                launch {
100                    repeat(k) { action() }
101                }
102            }
103        }
104    }
105    println("Completed ${n * k} actions in $time ms")
106}
107
108//sampleStart
109@Volatile // in Kotlin `volatile` is an annotation
110var counter = 0
111
112fun main() = runBlocking {
113    withContext(Dispatchers.Default) {
114        massiveRun {
115            counter++
116        }
117    }
118    println("Counter = $counter")
119}
120//sampleEnd
121```
122{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
123
124> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-02.kt).
125>
126{type="note"}
127
128<!--- TEST LINES_START
129Completed 100000 actions in
130Counter =
131-->
132
133This code works slower, but we still don't always get "Counter = 100000" at the end, because volatile variables guarantee
134linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but
135do not provide atomicity of larger actions (increment in our case).
136
137## Thread-safe data structures
138
139The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
140linearizable, or atomic) data structure that provides all the necessary synchronization for the corresponding
141operations that needs to be performed on a shared state.
142In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations:
143
144<!--- CLEAR -->
145
146```kotlin
147import kotlinx.coroutines.*
148import java.util.concurrent.atomic.*
149import kotlin.system.*
150
151suspend fun massiveRun(action: suspend () -> Unit) {
152    val n = 100  // number of coroutines to launch
153    val k = 1000 // times an action is repeated by each coroutine
154    val time = measureTimeMillis {
155        coroutineScope { // scope for coroutines
156            repeat(n) {
157                launch {
158                    repeat(k) { action() }
159                }
160            }
161        }
162    }
163    println("Completed ${n * k} actions in $time ms")
164}
165
166//sampleStart
167val counter = AtomicInteger()
168
169fun main() = runBlocking {
170    withContext(Dispatchers.Default) {
171        massiveRun {
172            counter.incrementAndGet()
173        }
174    }
175    println("Counter = $counter")
176}
177//sampleEnd
178```
179{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
180
181> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-03.kt).
182>
183{type="note"}
184
185<!--- TEST ARBITRARY_TIME
186Completed 100000 actions in xxx ms
187Counter = 100000
188-->
189
190This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
191standard data structures and basic operations on them. However, it does not easily scale to complex
192state or to complex operations that do not have ready-to-use thread-safe implementations.
193
194## Thread confinement fine-grained
195
196_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared
197state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
198the single event-dispatch/application thread. It is easy to apply with coroutines by using a single-threaded context.
199
200<!--- CLEAR -->
201
202```kotlin
203import kotlinx.coroutines.*
204import kotlin.system.*
205
206suspend fun massiveRun(action: suspend () -> Unit) {
207    val n = 100  // number of coroutines to launch
208    val k = 1000 // times an action is repeated by each coroutine
209    val time = measureTimeMillis {
210        coroutineScope { // scope for coroutines
211            repeat(n) {
212                launch {
213                    repeat(k) { action() }
214                }
215            }
216        }
217    }
218    println("Completed ${n * k} actions in $time ms")
219}
220
221//sampleStart
222val counterContext = newSingleThreadContext("CounterContext")
223var counter = 0
224
225fun main() = runBlocking {
226    withContext(Dispatchers.Default) {
227        massiveRun {
228            // confine each increment to a single-threaded context
229            withContext(counterContext) {
230                counter++
231            }
232        }
233    }
234    println("Counter = $counter")
235}
236//sampleEnd
237```
238{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
239
240> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-04.kt).
241>
242{type="note"}
243
244<!--- TEST ARBITRARY_TIME
245Completed 100000 actions in xxx ms
246Counter = 100000
247-->
248
249This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches
250from multi-threaded [Dispatchers.Default] context to the single-threaded context using
251[withContext(counterContext)][withContext] block.
252
253## Thread confinement coarse-grained
254
255In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic
256are confined to the single thread. The following example does it like that, running each coroutine in
257the single-threaded context to start with.
258
259<!--- CLEAR -->
260
261```kotlin
262import kotlinx.coroutines.*
263import kotlin.system.*
264
265suspend fun massiveRun(action: suspend () -> Unit) {
266    val n = 100  // number of coroutines to launch
267    val k = 1000 // times an action is repeated by each coroutine
268    val time = measureTimeMillis {
269        coroutineScope { // scope for coroutines
270            repeat(n) {
271                launch {
272                    repeat(k) { action() }
273                }
274            }
275        }
276    }
277    println("Completed ${n * k} actions in $time ms")
278}
279
280//sampleStart
281val counterContext = newSingleThreadContext("CounterContext")
282var counter = 0
283
284fun main() = runBlocking {
285    // confine everything to a single-threaded context
286    withContext(counterContext) {
287        massiveRun {
288            counter++
289        }
290    }
291    println("Counter = $counter")
292}
293//sampleEnd
294```
295{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
296
297> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-05.kt).
298>
299{type="note"}
300
301<!--- TEST ARBITRARY_TIME
302Completed 100000 actions in xxx ms
303Counter = 100000
304-->
305
306This now works much faster and produces correct result.
307
308## Mutual exclusion
309
310Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_
311that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that.
312Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to
313delimit a critical section. The key difference is that `Mutex.lock()` is a suspending function. It does not block a thread.
314
315There is also [withLock] extension function that conveniently represents
316`mutex.lock(); try { ... } finally { mutex.unlock() }` pattern:
317
318<!--- CLEAR -->
319
320```kotlin
321import kotlinx.coroutines.*
322import kotlinx.coroutines.sync.*
323import kotlin.system.*
324
325suspend fun massiveRun(action: suspend () -> Unit) {
326    val n = 100  // number of coroutines to launch
327    val k = 1000 // times an action is repeated by each coroutine
328    val time = measureTimeMillis {
329        coroutineScope { // scope for coroutines
330            repeat(n) {
331                launch {
332                    repeat(k) { action() }
333                }
334            }
335        }
336    }
337    println("Completed ${n * k} actions in $time ms")
338}
339
340//sampleStart
341val mutex = Mutex()
342var counter = 0
343
344fun main() = runBlocking {
345    withContext(Dispatchers.Default) {
346        massiveRun {
347            // protect each increment with lock
348            mutex.withLock {
349                counter++
350            }
351        }
352    }
353    println("Counter = $counter")
354}
355//sampleEnd
356```
357{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
358
359> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-06.kt).
360>
361{type="note"}
362
363<!--- TEST ARBITRARY_TIME
364Completed 100000 actions in xxx ms
365Counter = 100000
366-->
367
368The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations
369where you absolutely must modify some shared state periodically, but there is no natural thread that this state
370is confined to.
371
372<!--- MODULE kotlinx-coroutines-core -->
373<!--- INDEX kotlinx.coroutines -->
374
375[Dispatchers.Default]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
376[withContext]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
377
378<!--- INDEX kotlinx.coroutines.sync -->
379
380[Mutex]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
381[Mutex.lock]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html
382[Mutex.unlock]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/unlock.html
383[withLock]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/with-lock.html
384
385<!--- END -->
386