xref: /aosp_15_r20/external/zstd/contrib/pzstd/utils/WorkQueue.h (revision 01826a4963a0d8a59bc3812d29bdf0fb76416722)
1*01826a49SYabin Cui /*
2*01826a49SYabin Cui  * Copyright (c) Meta Platforms, Inc. and affiliates.
3*01826a49SYabin Cui  * All rights reserved.
4*01826a49SYabin Cui  *
5*01826a49SYabin Cui  * This source code is licensed under both the BSD-style license (found in the
6*01826a49SYabin Cui  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7*01826a49SYabin Cui  * in the COPYING file in the root directory of this source tree).
8*01826a49SYabin Cui  */
9*01826a49SYabin Cui #pragma once
10*01826a49SYabin Cui 
11*01826a49SYabin Cui #include "utils/Buffer.h"
12*01826a49SYabin Cui 
13*01826a49SYabin Cui #include <atomic>
14*01826a49SYabin Cui #include <cassert>
15*01826a49SYabin Cui #include <cstddef>
16*01826a49SYabin Cui #include <condition_variable>
17*01826a49SYabin Cui #include <cstddef>
18*01826a49SYabin Cui #include <functional>
19*01826a49SYabin Cui #include <mutex>
20*01826a49SYabin Cui #include <queue>
21*01826a49SYabin Cui 
22*01826a49SYabin Cui namespace pzstd {
23*01826a49SYabin Cui 
24*01826a49SYabin Cui /// Unbounded thread-safe work queue.
25*01826a49SYabin Cui template <typename T>
26*01826a49SYabin Cui class WorkQueue {
27*01826a49SYabin Cui   // Protects all member variable access
28*01826a49SYabin Cui   std::mutex mutex_;
29*01826a49SYabin Cui   std::condition_variable readerCv_;
30*01826a49SYabin Cui   std::condition_variable writerCv_;
31*01826a49SYabin Cui   std::condition_variable finishCv_;
32*01826a49SYabin Cui 
33*01826a49SYabin Cui   std::queue<T> queue_;
34*01826a49SYabin Cui   bool done_;
35*01826a49SYabin Cui   std::size_t maxSize_;
36*01826a49SYabin Cui 
37*01826a49SYabin Cui   // Must have lock to call this function
full()38*01826a49SYabin Cui   bool full() const {
39*01826a49SYabin Cui     if (maxSize_ == 0) {
40*01826a49SYabin Cui       return false;
41*01826a49SYabin Cui     }
42*01826a49SYabin Cui     return queue_.size() >= maxSize_;
43*01826a49SYabin Cui   }
44*01826a49SYabin Cui 
45*01826a49SYabin Cui  public:
46*01826a49SYabin Cui   /**
47*01826a49SYabin Cui    * Constructs an empty work queue with an optional max size.
48*01826a49SYabin Cui    * If `maxSize == 0` the queue size is unbounded.
49*01826a49SYabin Cui    *
50*01826a49SYabin Cui    * @param maxSize The maximum allowed size of the work queue.
51*01826a49SYabin Cui    */
done_(false)52*01826a49SYabin Cui   WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
53*01826a49SYabin Cui 
54*01826a49SYabin Cui   /**
55*01826a49SYabin Cui    * Push an item onto the work queue.  Notify a single thread that work is
56*01826a49SYabin Cui    * available.  If `finish()` has been called, do nothing and return false.
57*01826a49SYabin Cui    * If `push()` returns false, then `item` has not been moved from.
58*01826a49SYabin Cui    *
59*01826a49SYabin Cui    * @param item  Item to push onto the queue.
60*01826a49SYabin Cui    * @returns     True upon success, false if `finish()` has been called.  An
61*01826a49SYabin Cui    *               item was pushed iff `push()` returns true.
62*01826a49SYabin Cui    */
push(T && item)63*01826a49SYabin Cui   bool push(T&& item) {
64*01826a49SYabin Cui     {
65*01826a49SYabin Cui       std::unique_lock<std::mutex> lock(mutex_);
66*01826a49SYabin Cui       while (full() && !done_) {
67*01826a49SYabin Cui         writerCv_.wait(lock);
68*01826a49SYabin Cui       }
69*01826a49SYabin Cui       if (done_) {
70*01826a49SYabin Cui         return false;
71*01826a49SYabin Cui       }
72*01826a49SYabin Cui       queue_.push(std::move(item));
73*01826a49SYabin Cui     }
74*01826a49SYabin Cui     readerCv_.notify_one();
75*01826a49SYabin Cui     return true;
76*01826a49SYabin Cui   }
77*01826a49SYabin Cui 
78*01826a49SYabin Cui   /**
79*01826a49SYabin Cui    * Attempts to pop an item off the work queue.  It will block until data is
80*01826a49SYabin Cui    * available or `finish()` has been called.
81*01826a49SYabin Cui    *
82*01826a49SYabin Cui    * @param[out] item  If `pop` returns `true`, it contains the popped item.
83*01826a49SYabin Cui    *                    If `pop` returns `false`, it is unmodified.
84*01826a49SYabin Cui    * @returns          True upon success.  False if the queue is empty and
85*01826a49SYabin Cui    *                    `finish()` has been called.
86*01826a49SYabin Cui    */
pop(T & item)87*01826a49SYabin Cui   bool pop(T& item) {
88*01826a49SYabin Cui     {
89*01826a49SYabin Cui       std::unique_lock<std::mutex> lock(mutex_);
90*01826a49SYabin Cui       while (queue_.empty() && !done_) {
91*01826a49SYabin Cui         readerCv_.wait(lock);
92*01826a49SYabin Cui       }
93*01826a49SYabin Cui       if (queue_.empty()) {
94*01826a49SYabin Cui         assert(done_);
95*01826a49SYabin Cui         return false;
96*01826a49SYabin Cui       }
97*01826a49SYabin Cui       item = std::move(queue_.front());
98*01826a49SYabin Cui       queue_.pop();
99*01826a49SYabin Cui     }
100*01826a49SYabin Cui     writerCv_.notify_one();
101*01826a49SYabin Cui     return true;
102*01826a49SYabin Cui   }
103*01826a49SYabin Cui 
104*01826a49SYabin Cui   /**
105*01826a49SYabin Cui    * Sets the maximum queue size.  If `maxSize == 0` then it is unbounded.
106*01826a49SYabin Cui    *
107*01826a49SYabin Cui    * @param maxSize The new maximum queue size.
108*01826a49SYabin Cui    */
setMaxSize(std::size_t maxSize)109*01826a49SYabin Cui   void setMaxSize(std::size_t maxSize) {
110*01826a49SYabin Cui     {
111*01826a49SYabin Cui       std::lock_guard<std::mutex> lock(mutex_);
112*01826a49SYabin Cui       maxSize_ = maxSize;
113*01826a49SYabin Cui     }
114*01826a49SYabin Cui     writerCv_.notify_all();
115*01826a49SYabin Cui   }
116*01826a49SYabin Cui 
117*01826a49SYabin Cui   /**
118*01826a49SYabin Cui    * Promise that `push()` won't be called again, so once the queue is empty
119*01826a49SYabin Cui    * there will never any more work.
120*01826a49SYabin Cui    */
finish()121*01826a49SYabin Cui   void finish() {
122*01826a49SYabin Cui     {
123*01826a49SYabin Cui       std::lock_guard<std::mutex> lock(mutex_);
124*01826a49SYabin Cui       assert(!done_);
125*01826a49SYabin Cui       done_ = true;
126*01826a49SYabin Cui     }
127*01826a49SYabin Cui     readerCv_.notify_all();
128*01826a49SYabin Cui     writerCv_.notify_all();
129*01826a49SYabin Cui     finishCv_.notify_all();
130*01826a49SYabin Cui   }
131*01826a49SYabin Cui 
132*01826a49SYabin Cui   /// Blocks until `finish()` has been called (but the queue may not be empty).
waitUntilFinished()133*01826a49SYabin Cui   void waitUntilFinished() {
134*01826a49SYabin Cui     std::unique_lock<std::mutex> lock(mutex_);
135*01826a49SYabin Cui     while (!done_) {
136*01826a49SYabin Cui       finishCv_.wait(lock);
137*01826a49SYabin Cui     }
138*01826a49SYabin Cui   }
139*01826a49SYabin Cui };
140*01826a49SYabin Cui 
141*01826a49SYabin Cui /// Work queue for `Buffer`s that knows the total number of bytes in the queue.
142*01826a49SYabin Cui class BufferWorkQueue {
143*01826a49SYabin Cui   WorkQueue<Buffer> queue_;
144*01826a49SYabin Cui   std::atomic<std::size_t> size_;
145*01826a49SYabin Cui 
146*01826a49SYabin Cui  public:
queue_(maxSize)147*01826a49SYabin Cui   BufferWorkQueue(std::size_t maxSize = 0) : queue_(maxSize), size_(0) {}
148*01826a49SYabin Cui 
push(Buffer buffer)149*01826a49SYabin Cui   void push(Buffer buffer) {
150*01826a49SYabin Cui     size_.fetch_add(buffer.size());
151*01826a49SYabin Cui     queue_.push(std::move(buffer));
152*01826a49SYabin Cui   }
153*01826a49SYabin Cui 
pop(Buffer & buffer)154*01826a49SYabin Cui   bool pop(Buffer& buffer) {
155*01826a49SYabin Cui     bool result = queue_.pop(buffer);
156*01826a49SYabin Cui     if (result) {
157*01826a49SYabin Cui       size_.fetch_sub(buffer.size());
158*01826a49SYabin Cui     }
159*01826a49SYabin Cui     return result;
160*01826a49SYabin Cui   }
161*01826a49SYabin Cui 
setMaxSize(std::size_t maxSize)162*01826a49SYabin Cui   void setMaxSize(std::size_t maxSize) {
163*01826a49SYabin Cui     queue_.setMaxSize(maxSize);
164*01826a49SYabin Cui   }
165*01826a49SYabin Cui 
finish()166*01826a49SYabin Cui   void finish() {
167*01826a49SYabin Cui     queue_.finish();
168*01826a49SYabin Cui   }
169*01826a49SYabin Cui 
170*01826a49SYabin Cui   /**
171*01826a49SYabin Cui    * Blocks until `finish()` has been called.
172*01826a49SYabin Cui    *
173*01826a49SYabin Cui    * @returns The total number of bytes of all the `Buffer`s currently in the
174*01826a49SYabin Cui    *           queue.
175*01826a49SYabin Cui    */
size()176*01826a49SYabin Cui   std::size_t size() {
177*01826a49SYabin Cui     queue_.waitUntilFinished();
178*01826a49SYabin Cui     return size_.load();
179*01826a49SYabin Cui   }
180*01826a49SYabin Cui };
181*01826a49SYabin Cui }
182