1<!--- TEST_NAME SharedStateGuideTest --> 2 3[//]: # (title: Shared mutable state and concurrency) 4 5Coroutines can be executed parallelly using a multi-threaded dispatcher like the [Dispatchers.Default]. It presents 6all the usual parallelism problems. The main problem being synchronization of access to **shared mutable state**. 7Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world, 8but others are unique. 9 10## The problem 11 12Let us launch a hundred coroutines all doing the same action a thousand times. 13We'll also measure their completion time for further comparisons: 14 15```kotlin 16suspend fun massiveRun(action: suspend () -> Unit) { 17 val n = 100 // number of coroutines to launch 18 val k = 1000 // times an action is repeated by each coroutine 19 val time = measureTimeMillis { 20 coroutineScope { // scope for coroutines 21 repeat(n) { 22 launch { 23 repeat(k) { action() } 24 } 25 } 26 } 27 } 28 println("Completed ${n * k} actions in $time ms") 29} 30``` 31 32We start with a very simple action that increments a shared mutable variable using 33multi-threaded [Dispatchers.Default]. 34 35<!--- CLEAR --> 36 37```kotlin 38import kotlinx.coroutines.* 39import kotlin.system.* 40 41suspend fun massiveRun(action: suspend () -> Unit) { 42 val n = 100 // number of coroutines to launch 43 val k = 1000 // times an action is repeated by each coroutine 44 val time = measureTimeMillis { 45 coroutineScope { // scope for coroutines 46 repeat(n) { 47 launch { 48 repeat(k) { action() } 49 } 50 } 51 } 52 } 53 println("Completed ${n * k} actions in $time ms") 54} 55 56//sampleStart 57var counter = 0 58 59fun main() = runBlocking { 60 withContext(Dispatchers.Default) { 61 massiveRun { 62 counter++ 63 } 64 } 65 println("Counter = $counter") 66} 67//sampleEnd 68``` 69{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 70 71> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-01.kt). 72> 73{type="note"} 74 75<!--- TEST LINES_START 76Completed 100000 actions in 77Counter = 78--> 79 80What does it print at the end? It is highly unlikely to ever print "Counter = 100000", because a hundred coroutines 81increment the `counter` concurrently from multiple threads without any synchronization. 82 83## Volatiles are of no help 84 85There is a common misconception that making a variable `volatile` solves concurrency problem. Let us try it: 86 87<!--- CLEAR --> 88 89```kotlin 90import kotlinx.coroutines.* 91import kotlin.system.* 92 93suspend fun massiveRun(action: suspend () -> Unit) { 94 val n = 100 // number of coroutines to launch 95 val k = 1000 // times an action is repeated by each coroutine 96 val time = measureTimeMillis { 97 coroutineScope { // scope for coroutines 98 repeat(n) { 99 launch { 100 repeat(k) { action() } 101 } 102 } 103 } 104 } 105 println("Completed ${n * k} actions in $time ms") 106} 107 108//sampleStart 109@Volatile // in Kotlin `volatile` is an annotation 110var counter = 0 111 112fun main() = runBlocking { 113 withContext(Dispatchers.Default) { 114 massiveRun { 115 counter++ 116 } 117 } 118 println("Counter = $counter") 119} 120//sampleEnd 121``` 122{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 123 124> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-02.kt). 125> 126{type="note"} 127 128<!--- TEST LINES_START 129Completed 100000 actions in 130Counter = 131--> 132 133This code works slower, but we still don't always get "Counter = 100000" at the end, because volatile variables guarantee 134linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but 135do not provide atomicity of larger actions (increment in our case). 136 137## Thread-safe data structures 138 139The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized, 140linearizable, or atomic) data structure that provides all the necessary synchronization for the corresponding 141operations that needs to be performed on a shared state. 142In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations: 143 144<!--- CLEAR --> 145 146```kotlin 147import kotlinx.coroutines.* 148import java.util.concurrent.atomic.* 149import kotlin.system.* 150 151suspend fun massiveRun(action: suspend () -> Unit) { 152 val n = 100 // number of coroutines to launch 153 val k = 1000 // times an action is repeated by each coroutine 154 val time = measureTimeMillis { 155 coroutineScope { // scope for coroutines 156 repeat(n) { 157 launch { 158 repeat(k) { action() } 159 } 160 } 161 } 162 } 163 println("Completed ${n * k} actions in $time ms") 164} 165 166//sampleStart 167val counter = AtomicInteger() 168 169fun main() = runBlocking { 170 withContext(Dispatchers.Default) { 171 massiveRun { 172 counter.incrementAndGet() 173 } 174 } 175 println("Counter = $counter") 176} 177//sampleEnd 178``` 179{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 180 181> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-03.kt). 182> 183{type="note"} 184 185<!--- TEST ARBITRARY_TIME 186Completed 100000 actions in xxx ms 187Counter = 100000 188--> 189 190This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other 191standard data structures and basic operations on them. However, it does not easily scale to complex 192state or to complex operations that do not have ready-to-use thread-safe implementations. 193 194## Thread confinement fine-grained 195 196_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared 197state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to 198the single event-dispatch/application thread. It is easy to apply with coroutines by using a single-threaded context. 199 200<!--- CLEAR --> 201 202```kotlin 203import kotlinx.coroutines.* 204import kotlin.system.* 205 206suspend fun massiveRun(action: suspend () -> Unit) { 207 val n = 100 // number of coroutines to launch 208 val k = 1000 // times an action is repeated by each coroutine 209 val time = measureTimeMillis { 210 coroutineScope { // scope for coroutines 211 repeat(n) { 212 launch { 213 repeat(k) { action() } 214 } 215 } 216 } 217 } 218 println("Completed ${n * k} actions in $time ms") 219} 220 221//sampleStart 222val counterContext = newSingleThreadContext("CounterContext") 223var counter = 0 224 225fun main() = runBlocking { 226 withContext(Dispatchers.Default) { 227 massiveRun { 228 // confine each increment to a single-threaded context 229 withContext(counterContext) { 230 counter++ 231 } 232 } 233 } 234 println("Counter = $counter") 235} 236//sampleEnd 237``` 238{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 239 240> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-04.kt). 241> 242{type="note"} 243 244<!--- TEST ARBITRARY_TIME 245Completed 100000 actions in xxx ms 246Counter = 100000 247--> 248 249This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches 250from multi-threaded [Dispatchers.Default] context to the single-threaded context using 251[withContext(counterContext)][withContext] block. 252 253## Thread confinement coarse-grained 254 255In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic 256are confined to the single thread. The following example does it like that, running each coroutine in 257the single-threaded context to start with. 258 259<!--- CLEAR --> 260 261```kotlin 262import kotlinx.coroutines.* 263import kotlin.system.* 264 265suspend fun massiveRun(action: suspend () -> Unit) { 266 val n = 100 // number of coroutines to launch 267 val k = 1000 // times an action is repeated by each coroutine 268 val time = measureTimeMillis { 269 coroutineScope { // scope for coroutines 270 repeat(n) { 271 launch { 272 repeat(k) { action() } 273 } 274 } 275 } 276 } 277 println("Completed ${n * k} actions in $time ms") 278} 279 280//sampleStart 281val counterContext = newSingleThreadContext("CounterContext") 282var counter = 0 283 284fun main() = runBlocking { 285 // confine everything to a single-threaded context 286 withContext(counterContext) { 287 massiveRun { 288 counter++ 289 } 290 } 291 println("Counter = $counter") 292} 293//sampleEnd 294``` 295{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 296 297> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-05.kt). 298> 299{type="note"} 300 301<!--- TEST ARBITRARY_TIME 302Completed 100000 actions in xxx ms 303Counter = 100000 304--> 305 306This now works much faster and produces correct result. 307 308## Mutual exclusion 309 310Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_ 311that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that. 312Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to 313delimit a critical section. The key difference is that `Mutex.lock()` is a suspending function. It does not block a thread. 314 315There is also [withLock] extension function that conveniently represents 316`mutex.lock(); try { ... } finally { mutex.unlock() }` pattern: 317 318<!--- CLEAR --> 319 320```kotlin 321import kotlinx.coroutines.* 322import kotlinx.coroutines.sync.* 323import kotlin.system.* 324 325suspend fun massiveRun(action: suspend () -> Unit) { 326 val n = 100 // number of coroutines to launch 327 val k = 1000 // times an action is repeated by each coroutine 328 val time = measureTimeMillis { 329 coroutineScope { // scope for coroutines 330 repeat(n) { 331 launch { 332 repeat(k) { action() } 333 } 334 } 335 } 336 } 337 println("Completed ${n * k} actions in $time ms") 338} 339 340//sampleStart 341val mutex = Mutex() 342var counter = 0 343 344fun main() = runBlocking { 345 withContext(Dispatchers.Default) { 346 massiveRun { 347 // protect each increment with lock 348 mutex.withLock { 349 counter++ 350 } 351 } 352 } 353 println("Counter = $counter") 354} 355//sampleEnd 356``` 357{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 358 359> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-06.kt). 360> 361{type="note"} 362 363<!--- TEST ARBITRARY_TIME 364Completed 100000 actions in xxx ms 365Counter = 100000 366--> 367 368The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations 369where you absolutely must modify some shared state periodically, but there is no natural thread that this state 370is confined to. 371 372<!--- MODULE kotlinx-coroutines-core --> 373<!--- INDEX kotlinx.coroutines --> 374 375[Dispatchers.Default]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html 376[withContext]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html 377 378<!--- INDEX kotlinx.coroutines.sync --> 379 380[Mutex]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html 381[Mutex.lock]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html 382[Mutex.unlock]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/unlock.html 383[withLock]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/with-lock.html 384 385<!--- END --> 386