1 /*
2 * Copyright 2018 Google
3 * SPDX-License-Identifier: MIT
4 */
5 #include "aemu/base/ring_buffer.h"
6
7 #include <errno.h>
8 #include <string.h>
9 #ifdef _MSC_VER
10 #include "aemu/base/msvc.h"
11 #else
12 #include <sys/time.h>
13 #endif
14
15 #ifdef __x86_64__
16 #include <emmintrin.h>
17 #endif
18
19 #ifdef _WIN32
20 #include <windows.h>
21 #else
22 #include <sched.h>
23 #include <unistd.h>
24 #endif
25
26 #define RING_BUFFER_MASK (RING_BUFFER_SIZE - 1)
27
28 #define RING_BUFFER_VERSION 1
29
ring_buffer_init(struct ring_buffer * r)30 void ring_buffer_init(struct ring_buffer* r) {
31 r->host_version = 1;
32 r->write_pos = 0;
33 r->read_pos = 0;
34
35 r->read_live_count = 0;
36 r->read_yield_count = 0;
37 r->read_sleep_us_count = 0;
38
39 r->state = 0;
40 }
41
get_ring_pos(uint32_t index)42 static uint32_t get_ring_pos(uint32_t index) {
43 return index & RING_BUFFER_MASK;
44 }
45
ring_buffer_can_write(const struct ring_buffer * r,uint32_t bytes)46 bool ring_buffer_can_write(const struct ring_buffer* r, uint32_t bytes) {
47 uint32_t read_view;
48 __atomic_load(&r->read_pos, &read_view, __ATOMIC_SEQ_CST);
49 return get_ring_pos(read_view - r->write_pos - 1) >= bytes;
50 }
51
ring_buffer_can_read(const struct ring_buffer * r,uint32_t bytes)52 bool ring_buffer_can_read(const struct ring_buffer* r, uint32_t bytes) {
53 uint32_t write_view;
54 __atomic_load(&r->write_pos, &write_view, __ATOMIC_SEQ_CST);
55 return get_ring_pos(write_view - r->read_pos) >= bytes;
56 }
57
ring_buffer_write(struct ring_buffer * r,const void * data,uint32_t step_size,uint32_t steps)58 long ring_buffer_write(
59 struct ring_buffer* r, const void* data, uint32_t step_size, uint32_t steps) {
60 const uint8_t* data_bytes = (const uint8_t*)data;
61 uint32_t i;
62
63 for (i = 0; i < steps; ++i) {
64 if (!ring_buffer_can_write(r, step_size)) {
65 errno = -EAGAIN;
66 return (long)i;
67 }
68
69 // Needs to be split up into 2 writes for the edge case.
70 uint32_t available_at_end =
71 RING_BUFFER_SIZE - get_ring_pos(r->write_pos);
72
73 if (step_size > available_at_end) {
74 uint32_t remaining = step_size - available_at_end;
75 memcpy(
76 &r->buf[get_ring_pos(r->write_pos)],
77 data_bytes + i * step_size,
78 available_at_end);
79 memcpy(
80 &r->buf[get_ring_pos(r->write_pos + available_at_end)],
81 data_bytes + i * step_size + available_at_end,
82 remaining);
83 } else {
84 memcpy(
85 &r->buf[get_ring_pos(r->write_pos)],
86 data_bytes + i * step_size,
87 step_size);
88 }
89
90 __atomic_add_fetch(&r->write_pos, step_size, __ATOMIC_SEQ_CST);
91 }
92
93 errno = 0;
94 return (long)steps;
95 }
96
ring_buffer_read(struct ring_buffer * r,void * data,uint32_t step_size,uint32_t steps)97 long ring_buffer_read(
98 struct ring_buffer* r, void* data, uint32_t step_size, uint32_t steps) {
99 uint8_t* data_bytes = (uint8_t*)data;
100 uint32_t i;
101
102 for (i = 0; i < steps; ++i) {
103 if (!ring_buffer_can_read(r, step_size)) {
104 errno = -EAGAIN;
105 return (long)i;
106 }
107
108 // Needs to be split up into 2 reads for the edge case.
109 uint32_t available_at_end =
110 RING_BUFFER_SIZE - get_ring_pos(r->read_pos);
111
112 if (step_size > available_at_end) {
113 uint32_t remaining = step_size - available_at_end;
114 memcpy(
115 data_bytes + i * step_size,
116 &r->buf[get_ring_pos(r->read_pos)],
117 available_at_end);
118 memcpy(
119 data_bytes + i * step_size + available_at_end,
120 &r->buf[get_ring_pos(r->read_pos + available_at_end)],
121 remaining);
122 } else {
123 memcpy(
124 data_bytes + i * step_size,
125 &r->buf[get_ring_pos(r->read_pos)],
126 step_size);
127 }
128
129 __atomic_add_fetch(&r->read_pos, step_size, __ATOMIC_SEQ_CST);
130 }
131
132 errno = 0;
133 return (long)steps;
134 }
135
ring_buffer_advance_write(struct ring_buffer * r,uint32_t step_size,uint32_t steps)136 long ring_buffer_advance_write(
137 struct ring_buffer* r, uint32_t step_size, uint32_t steps) {
138 uint32_t i;
139
140 for (i = 0; i < steps; ++i) {
141 if (!ring_buffer_can_write(r, step_size)) {
142 errno = -EAGAIN;
143 return (long)i;
144 }
145
146 __atomic_add_fetch(&r->write_pos, step_size, __ATOMIC_SEQ_CST);
147 }
148
149 errno = 0;
150 return (long)steps;
151 }
152
ring_buffer_advance_read(struct ring_buffer * r,uint32_t step_size,uint32_t steps)153 long ring_buffer_advance_read(
154 struct ring_buffer* r, uint32_t step_size, uint32_t steps) {
155 uint32_t i;
156
157 for (i = 0; i < steps; ++i) {
158 if (!ring_buffer_can_read(r, step_size)) {
159 errno = -EAGAIN;
160 return (long)i;
161 }
162
163 __atomic_add_fetch(&r->read_pos, step_size, __ATOMIC_SEQ_CST);
164 }
165
166 errno = 0;
167 return (long)steps;
168 }
169
ring_buffer_calc_shift(uint32_t size)170 uint32_t ring_buffer_calc_shift(uint32_t size) {
171 uint32_t shift = 0;
172 while ((1 << shift) < size) {
173 ++shift;
174 }
175
176 // if size is not a power of 2,
177 if ((1 << shift) > size) {
178 --shift;
179 }
180 return shift;
181 }
182
ring_buffer_view_init(struct ring_buffer * r,struct ring_buffer_view * v,uint8_t * buf,uint32_t size)183 void ring_buffer_view_init(
184 struct ring_buffer* r,
185 struct ring_buffer_view* v,
186 uint8_t* buf,
187 uint32_t size) {
188
189 uint32_t shift = ring_buffer_calc_shift(size);
190
191 ring_buffer_init(r);
192
193 v->buf = buf;
194 v->size = (1 << shift);
195 v->mask = (1 << shift) - 1;
196 }
197
ring_buffer_init_view_only(struct ring_buffer_view * v,uint8_t * buf,uint32_t size)198 void ring_buffer_init_view_only(
199 struct ring_buffer_view* v,
200 uint8_t* buf,
201 uint32_t size) {
202
203 uint32_t shift = ring_buffer_calc_shift(size);
204
205 v->buf = buf;
206 v->size = (1 << shift);
207 v->mask = (1 << shift) - 1;
208 }
209
ring_buffer_view_get_ring_pos(const struct ring_buffer_view * v,uint32_t index)210 uint32_t ring_buffer_view_get_ring_pos(
211 const struct ring_buffer_view* v,
212 uint32_t index) {
213 return index & v->mask;
214 }
215
ring_buffer_view_can_write(const struct ring_buffer * r,const struct ring_buffer_view * v,uint32_t bytes)216 bool ring_buffer_view_can_write(
217 const struct ring_buffer* r,
218 const struct ring_buffer_view* v,
219 uint32_t bytes) {
220 uint32_t read_view;
221 __atomic_load(&r->read_pos, &read_view, __ATOMIC_SEQ_CST);
222 return ring_buffer_view_get_ring_pos(
223 v, read_view - r->write_pos - 1) >= bytes;
224 }
225
ring_buffer_view_can_read(const struct ring_buffer * r,const struct ring_buffer_view * v,uint32_t bytes)226 bool ring_buffer_view_can_read(
227 const struct ring_buffer* r,
228 const struct ring_buffer_view* v,
229 uint32_t bytes) {
230 uint32_t write_view;
231 __atomic_load(&r->write_pos, &write_view, __ATOMIC_SEQ_CST);
232 return ring_buffer_view_get_ring_pos(
233 v, write_view - r->read_pos) >= bytes;
234 }
235
ring_buffer_available_read(const struct ring_buffer * r,const struct ring_buffer_view * v)236 uint32_t ring_buffer_available_read(
237 const struct ring_buffer* r,
238 const struct ring_buffer_view* v) {
239 uint32_t write_view;
240 __atomic_load(&r->write_pos, &write_view, __ATOMIC_SEQ_CST);
241 if (v) {
242 return ring_buffer_view_get_ring_pos(
243 v, write_view - r->read_pos);
244 } else {
245 return get_ring_pos(write_view - r->read_pos);
246 }
247 }
248
ring_buffer_available_write(const struct ring_buffer * r,const struct ring_buffer_view * v)249 uint32_t ring_buffer_available_write(
250 const struct ring_buffer* r,
251 const struct ring_buffer_view* v) {
252 uint32_t read_view;
253 __atomic_load(&r->read_pos, &read_view, __ATOMIC_SEQ_CST);
254 if (v) {
255 return ring_buffer_view_get_ring_pos(
256 v, read_view - r->write_pos - 1);
257 } else {
258 return get_ring_pos(read_view - r->write_pos - 1);
259 }
260 }
261
ring_buffer_copy_contents(const struct ring_buffer * r,const struct ring_buffer_view * v,uint32_t wanted_bytes,uint8_t * res)262 int ring_buffer_copy_contents(
263 const struct ring_buffer* r,
264 const struct ring_buffer_view* v,
265 uint32_t wanted_bytes,
266 uint8_t* res) {
267
268 uint32_t total_available =
269 ring_buffer_available_read(r, v);
270 uint32_t available_at_end = 0;
271
272 if (v) {
273 available_at_end =
274 v->size - ring_buffer_view_get_ring_pos(v, r->read_pos);
275 } else {
276 available_at_end =
277 RING_BUFFER_SIZE - get_ring_pos(r->write_pos);
278 }
279
280 if (total_available < wanted_bytes) {
281 return -1;
282 }
283
284 if (v) {
285 if (wanted_bytes > available_at_end) {
286 uint32_t remaining = wanted_bytes - available_at_end;
287 memcpy(res,
288 &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos)],
289 available_at_end);
290 memcpy(res + available_at_end,
291 &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos + available_at_end)],
292 remaining);
293 } else {
294 memcpy(res,
295 &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos)],
296 wanted_bytes);
297 }
298 } else {
299 if (wanted_bytes > available_at_end) {
300 uint32_t remaining = wanted_bytes - available_at_end;
301 memcpy(res,
302 &r->buf[get_ring_pos(r->read_pos)],
303 available_at_end);
304 memcpy(res + available_at_end,
305 &r->buf[get_ring_pos(r->read_pos + available_at_end)],
306 remaining);
307 } else {
308 memcpy(res,
309 &r->buf[get_ring_pos(r->read_pos)],
310 wanted_bytes);
311 }
312 }
313 return 0;
314 }
315
ring_buffer_view_write(struct ring_buffer * r,struct ring_buffer_view * v,const void * data,uint32_t step_size,uint32_t steps)316 long ring_buffer_view_write(
317 struct ring_buffer* r,
318 struct ring_buffer_view* v,
319 const void* data, uint32_t step_size, uint32_t steps) {
320
321 uint8_t* data_bytes = (uint8_t*)data;
322 uint32_t i;
323
324 for (i = 0; i < steps; ++i) {
325 if (!ring_buffer_view_can_write(r, v, step_size)) {
326 errno = -EAGAIN;
327 return (long)i;
328 }
329
330 // Needs to be split up into 2 writes for the edge case.
331 uint32_t available_at_end =
332 v->size - ring_buffer_view_get_ring_pos(v, r->write_pos);
333
334 if (step_size > available_at_end) {
335 uint32_t remaining = step_size - available_at_end;
336 memcpy(
337 &v->buf[ring_buffer_view_get_ring_pos(v, r->write_pos)],
338 data_bytes + i * step_size,
339 available_at_end);
340 memcpy(
341 &v->buf[ring_buffer_view_get_ring_pos(v, r->write_pos + available_at_end)],
342 data_bytes + i * step_size + available_at_end,
343 remaining);
344 } else {
345 memcpy(
346 &v->buf[ring_buffer_view_get_ring_pos(v, r->write_pos)],
347 data_bytes + i * step_size,
348 step_size);
349 }
350
351 __atomic_add_fetch(&r->write_pos, step_size, __ATOMIC_SEQ_CST);
352 }
353
354 errno = 0;
355 return (long)steps;
356
357 }
358
ring_buffer_view_read(struct ring_buffer * r,struct ring_buffer_view * v,void * data,uint32_t step_size,uint32_t steps)359 long ring_buffer_view_read(
360 struct ring_buffer* r,
361 struct ring_buffer_view* v,
362 void* data, uint32_t step_size, uint32_t steps) {
363 uint8_t* data_bytes = (uint8_t*)data;
364 uint32_t i;
365
366 for (i = 0; i < steps; ++i) {
367 if (!ring_buffer_view_can_read(r, v, step_size)) {
368 errno = -EAGAIN;
369 return (long)i;
370 }
371
372 // Needs to be split up into 2 reads for the edge case.
373 uint32_t available_at_end =
374 v->size - ring_buffer_view_get_ring_pos(v, r->read_pos);
375
376 if (step_size > available_at_end) {
377 uint32_t remaining = step_size - available_at_end;
378 memcpy(
379 data_bytes + i * step_size,
380 &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos)],
381 available_at_end);
382 memcpy(
383 data_bytes + i * step_size + available_at_end,
384 &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos + available_at_end)],
385 remaining);
386 } else {
387 memcpy(data_bytes + i * step_size,
388 &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos)],
389 step_size);
390 }
391 __atomic_add_fetch(&r->read_pos, step_size, __ATOMIC_SEQ_CST);
392 }
393
394 errno = 0;
395 return (long)steps;
396 }
397
ring_buffer_yield()398 void ring_buffer_yield() {
399 #ifdef _WIN32
400 _mm_pause();
401 #else
402 sched_yield();
403 #endif
404 }
405
ring_buffer_sleep()406 static void ring_buffer_sleep() {
407 #ifdef _WIN32
408 Sleep(2);
409 #else
410 usleep(2000);
411 #endif
412 }
413
ring_buffer_curr_us()414 static uint64_t ring_buffer_curr_us() {
415 uint64_t res;
416 struct timeval tv;
417 gettimeofday(&tv, NULL);
418 res = tv.tv_sec * 1000000ULL + tv.tv_usec;
419 return res;
420 }
421
422 static const uint32_t yield_backoff_us = 1000;
423 static const uint32_t sleep_backoff_us = 2000;
424
ring_buffer_wait_write(const struct ring_buffer * r,const struct ring_buffer_view * v,uint32_t bytes,uint64_t timeout_us)425 bool ring_buffer_wait_write(
426 const struct ring_buffer* r,
427 const struct ring_buffer_view* v,
428 uint32_t bytes,
429 uint64_t timeout_us) {
430
431 uint64_t start_us = ring_buffer_curr_us();
432 uint64_t curr_wait_us;
433
434 bool can_write =
435 v ? ring_buffer_view_can_write(r, v, bytes) :
436 ring_buffer_can_write(r, bytes);
437
438 while (!can_write) {
439 #ifdef __x86_64
440 _mm_pause();
441 #endif
442 curr_wait_us = ring_buffer_curr_us() - start_us;
443
444 if (curr_wait_us > yield_backoff_us) {
445 ring_buffer_yield();
446 }
447
448 if (curr_wait_us > sleep_backoff_us) {
449 ring_buffer_sleep();
450 }
451
452 if (curr_wait_us > timeout_us) {
453 return false;
454 }
455
456 can_write =
457 v ? ring_buffer_view_can_write(r, v, bytes) :
458 ring_buffer_can_write(r, bytes);
459 }
460
461 return true;
462 }
463
ring_buffer_wait_read(const struct ring_buffer * r,const struct ring_buffer_view * v,uint32_t bytes,uint64_t timeout_us)464 bool ring_buffer_wait_read(
465 const struct ring_buffer* r,
466 const struct ring_buffer_view* v,
467 uint32_t bytes,
468 uint64_t timeout_us) {
469
470 uint64_t start_us = ring_buffer_curr_us();
471 uint64_t curr_wait_us;
472
473 bool can_read =
474 v ? ring_buffer_view_can_read(r, v, bytes) :
475 ring_buffer_can_read(r, bytes);
476
477 while (!can_read) {
478 // TODO(bohu): find aarch64 equivalent
479 #ifdef __x86_64
480 _mm_pause();
481 #endif
482 curr_wait_us = ring_buffer_curr_us() - start_us;
483
484 if (curr_wait_us > yield_backoff_us) {
485 ring_buffer_yield();
486 ((struct ring_buffer*)r)->read_yield_count++;
487 }
488
489 if (curr_wait_us > sleep_backoff_us) {
490 ring_buffer_sleep();
491 ((struct ring_buffer*)r)->read_sleep_us_count += 2000;
492 }
493
494 if (curr_wait_us > timeout_us) {
495 return false;
496 }
497
498 can_read =
499 v ? ring_buffer_view_can_read(r, v, bytes) :
500 ring_buffer_can_read(r, bytes);
501 }
502
503 ((struct ring_buffer*)r)->read_live_count++;
504 return true;
505 }
506
get_step_size(struct ring_buffer * r,struct ring_buffer_view * v,uint32_t bytes)507 static uint32_t get_step_size(
508 struct ring_buffer* r,
509 struct ring_buffer_view* v,
510 uint32_t bytes) {
511
512 uint32_t available = v ? (v->size >> 1) : (RING_BUFFER_SIZE >> 1);
513 uint32_t res = available < bytes ? available : bytes;
514
515 return res;
516 }
517
ring_buffer_write_fully(struct ring_buffer * r,struct ring_buffer_view * v,const void * data,uint32_t bytes)518 void ring_buffer_write_fully(
519 struct ring_buffer* r,
520 struct ring_buffer_view* v,
521 const void* data,
522 uint32_t bytes) {
523 ring_buffer_write_fully_with_abort(r, v, data, bytes, 0, 0);
524 }
525
ring_buffer_read_fully(struct ring_buffer * r,struct ring_buffer_view * v,void * data,uint32_t bytes)526 void ring_buffer_read_fully(
527 struct ring_buffer* r,
528 struct ring_buffer_view* v,
529 void* data,
530 uint32_t bytes) {
531 ring_buffer_read_fully_with_abort(r, v, data, bytes, 0, 0);
532 }
533
ring_buffer_write_fully_with_abort(struct ring_buffer * r,struct ring_buffer_view * v,const void * data,uint32_t bytes,uint32_t abort_value,const volatile uint32_t * abort_ptr)534 uint32_t ring_buffer_write_fully_with_abort(
535 struct ring_buffer* r,
536 struct ring_buffer_view* v,
537 const void* data,
538 uint32_t bytes,
539 uint32_t abort_value,
540 const volatile uint32_t* abort_ptr) {
541
542 uint32_t candidate_step = get_step_size(r, v, bytes);
543 uint32_t processed = 0;
544
545 uint8_t* dst = (uint8_t*)data;
546
547 while (processed < bytes) {
548 if (bytes - processed < candidate_step) {
549 candidate_step = bytes - processed;
550 }
551
552 long processed_here = 0;
553 ring_buffer_wait_write(r, v, candidate_step, (uint64_t)(-1));
554
555 if (v) {
556 processed_here = ring_buffer_view_write(r, v, dst + processed, candidate_step, 1);
557 } else {
558 processed_here = ring_buffer_write(r, dst + processed, candidate_step, 1);
559 }
560
561 processed += processed_here ? candidate_step : 0;
562
563 if (abort_ptr && (abort_value == *abort_ptr)) {
564 return processed;
565 }
566 }
567
568 return processed;
569 }
570
ring_buffer_read_fully_with_abort(struct ring_buffer * r,struct ring_buffer_view * v,void * data,uint32_t bytes,uint32_t abort_value,const volatile uint32_t * abort_ptr)571 uint32_t ring_buffer_read_fully_with_abort(
572 struct ring_buffer* r,
573 struct ring_buffer_view* v,
574 void* data,
575 uint32_t bytes,
576 uint32_t abort_value,
577 const volatile uint32_t* abort_ptr) {
578
579 uint32_t candidate_step = get_step_size(r, v, bytes);
580 uint32_t processed = 0;
581
582 uint8_t* dst = (uint8_t*)data;
583
584 while (processed < bytes) {
585 #ifdef __x86_64
586 _mm_pause();
587 #endif
588 if (bytes - processed < candidate_step) {
589 candidate_step = bytes - processed;
590 }
591
592 long processed_here = 0;
593 ring_buffer_wait_read(r, v, candidate_step, (uint64_t)(-1));
594
595 if (v) {
596 processed_here = ring_buffer_view_read(r, v, dst + processed, candidate_step, 1);
597 } else {
598 processed_here = ring_buffer_read(r, dst + processed, candidate_step, 1);
599 }
600
601 processed += processed_here ? candidate_step : 0;
602
603 if (abort_ptr && (abort_value == *abort_ptr)) {
604 return processed;
605 }
606 }
607
608 return processed;
609 }
610
ring_buffer_sync_init(struct ring_buffer * r)611 void ring_buffer_sync_init(struct ring_buffer* r) {
612 __atomic_store_n(&r->state, RING_BUFFER_SYNC_PRODUCER_IDLE, __ATOMIC_SEQ_CST);
613 }
614
ring_buffer_producer_acquire(struct ring_buffer * r)615 bool ring_buffer_producer_acquire(struct ring_buffer* r) {
616 uint32_t expected_idle = RING_BUFFER_SYNC_PRODUCER_IDLE;
617 bool success = __atomic_compare_exchange_n(
618 &r->state,
619 &expected_idle,
620 RING_BUFFER_SYNC_PRODUCER_ACTIVE,
621 false /* strong */,
622 __ATOMIC_SEQ_CST,
623 __ATOMIC_SEQ_CST);
624 return success;
625 }
626
ring_buffer_producer_acquire_from_hangup(struct ring_buffer * r)627 bool ring_buffer_producer_acquire_from_hangup(struct ring_buffer* r) {
628 uint32_t expected_hangup = RING_BUFFER_SYNC_CONSUMER_HUNG_UP;
629 bool success = __atomic_compare_exchange_n(
630 &r->state,
631 &expected_hangup,
632 RING_BUFFER_SYNC_PRODUCER_ACTIVE,
633 false /* strong */,
634 __ATOMIC_SEQ_CST,
635 __ATOMIC_SEQ_CST);
636 return success;
637 }
638
ring_buffer_producer_wait_hangup(struct ring_buffer * r)639 void ring_buffer_producer_wait_hangup(struct ring_buffer* r) {
640 while (__atomic_load_n(&r->state, __ATOMIC_SEQ_CST) !=
641 RING_BUFFER_SYNC_CONSUMER_HUNG_UP) {
642 ring_buffer_yield();
643 }
644 }
645
ring_buffer_producer_idle(struct ring_buffer * r)646 void ring_buffer_producer_idle(struct ring_buffer* r) {
647 __atomic_store_n(&r->state, RING_BUFFER_SYNC_PRODUCER_IDLE, __ATOMIC_SEQ_CST);
648 }
649
ring_buffer_consumer_hangup(struct ring_buffer * r)650 bool ring_buffer_consumer_hangup(struct ring_buffer* r) {
651 uint32_t expected_idle = RING_BUFFER_SYNC_PRODUCER_IDLE;
652 bool success = __atomic_compare_exchange_n(
653 &r->state,
654 &expected_idle,
655 RING_BUFFER_SYNC_CONSUMER_HANGING_UP,
656 false /* strong */,
657 __ATOMIC_SEQ_CST,
658 __ATOMIC_SEQ_CST);
659 return success;
660 }
661
ring_buffer_consumer_wait_producer_idle(struct ring_buffer * r)662 void ring_buffer_consumer_wait_producer_idle(struct ring_buffer* r) {
663 while (__atomic_load_n(&r->state, __ATOMIC_SEQ_CST) !=
664 RING_BUFFER_SYNC_PRODUCER_IDLE) {
665 ring_buffer_yield();
666 }
667 }
668
ring_buffer_consumer_hung_up(struct ring_buffer * r)669 void ring_buffer_consumer_hung_up(struct ring_buffer* r) {
670 __atomic_store_n(&r->state, RING_BUFFER_SYNC_CONSUMER_HUNG_UP, __ATOMIC_SEQ_CST);
671 }
672