1*01826a49SYabin Cui /* 2*01826a49SYabin Cui * Copyright (c) Meta Platforms, Inc. and affiliates. 3*01826a49SYabin Cui * All rights reserved. 4*01826a49SYabin Cui * 5*01826a49SYabin Cui * This source code is licensed under both the BSD-style license (found in the 6*01826a49SYabin Cui * LICENSE file in the root directory of this source tree) and the GPLv2 (found 7*01826a49SYabin Cui * in the COPYING file in the root directory of this source tree). 8*01826a49SYabin Cui */ 9*01826a49SYabin Cui #pragma once 10*01826a49SYabin Cui 11*01826a49SYabin Cui #include "utils/Buffer.h" 12*01826a49SYabin Cui 13*01826a49SYabin Cui #include <atomic> 14*01826a49SYabin Cui #include <cassert> 15*01826a49SYabin Cui #include <cstddef> 16*01826a49SYabin Cui #include <condition_variable> 17*01826a49SYabin Cui #include <cstddef> 18*01826a49SYabin Cui #include <functional> 19*01826a49SYabin Cui #include <mutex> 20*01826a49SYabin Cui #include <queue> 21*01826a49SYabin Cui 22*01826a49SYabin Cui namespace pzstd { 23*01826a49SYabin Cui 24*01826a49SYabin Cui /// Unbounded thread-safe work queue. 25*01826a49SYabin Cui template <typename T> 26*01826a49SYabin Cui class WorkQueue { 27*01826a49SYabin Cui // Protects all member variable access 28*01826a49SYabin Cui std::mutex mutex_; 29*01826a49SYabin Cui std::condition_variable readerCv_; 30*01826a49SYabin Cui std::condition_variable writerCv_; 31*01826a49SYabin Cui std::condition_variable finishCv_; 32*01826a49SYabin Cui 33*01826a49SYabin Cui std::queue<T> queue_; 34*01826a49SYabin Cui bool done_; 35*01826a49SYabin Cui std::size_t maxSize_; 36*01826a49SYabin Cui 37*01826a49SYabin Cui // Must have lock to call this function full()38*01826a49SYabin Cui bool full() const { 39*01826a49SYabin Cui if (maxSize_ == 0) { 40*01826a49SYabin Cui return false; 41*01826a49SYabin Cui } 42*01826a49SYabin Cui return queue_.size() >= maxSize_; 43*01826a49SYabin Cui } 44*01826a49SYabin Cui 45*01826a49SYabin Cui public: 46*01826a49SYabin Cui /** 47*01826a49SYabin Cui * Constructs an empty work queue with an optional max size. 48*01826a49SYabin Cui * If `maxSize == 0` the queue size is unbounded. 49*01826a49SYabin Cui * 50*01826a49SYabin Cui * @param maxSize The maximum allowed size of the work queue. 51*01826a49SYabin Cui */ done_(false)52*01826a49SYabin Cui WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {} 53*01826a49SYabin Cui 54*01826a49SYabin Cui /** 55*01826a49SYabin Cui * Push an item onto the work queue. Notify a single thread that work is 56*01826a49SYabin Cui * available. If `finish()` has been called, do nothing and return false. 57*01826a49SYabin Cui * If `push()` returns false, then `item` has not been moved from. 58*01826a49SYabin Cui * 59*01826a49SYabin Cui * @param item Item to push onto the queue. 60*01826a49SYabin Cui * @returns True upon success, false if `finish()` has been called. An 61*01826a49SYabin Cui * item was pushed iff `push()` returns true. 62*01826a49SYabin Cui */ push(T && item)63*01826a49SYabin Cui bool push(T&& item) { 64*01826a49SYabin Cui { 65*01826a49SYabin Cui std::unique_lock<std::mutex> lock(mutex_); 66*01826a49SYabin Cui while (full() && !done_) { 67*01826a49SYabin Cui writerCv_.wait(lock); 68*01826a49SYabin Cui } 69*01826a49SYabin Cui if (done_) { 70*01826a49SYabin Cui return false; 71*01826a49SYabin Cui } 72*01826a49SYabin Cui queue_.push(std::move(item)); 73*01826a49SYabin Cui } 74*01826a49SYabin Cui readerCv_.notify_one(); 75*01826a49SYabin Cui return true; 76*01826a49SYabin Cui } 77*01826a49SYabin Cui 78*01826a49SYabin Cui /** 79*01826a49SYabin Cui * Attempts to pop an item off the work queue. It will block until data is 80*01826a49SYabin Cui * available or `finish()` has been called. 81*01826a49SYabin Cui * 82*01826a49SYabin Cui * @param[out] item If `pop` returns `true`, it contains the popped item. 83*01826a49SYabin Cui * If `pop` returns `false`, it is unmodified. 84*01826a49SYabin Cui * @returns True upon success. False if the queue is empty and 85*01826a49SYabin Cui * `finish()` has been called. 86*01826a49SYabin Cui */ pop(T & item)87*01826a49SYabin Cui bool pop(T& item) { 88*01826a49SYabin Cui { 89*01826a49SYabin Cui std::unique_lock<std::mutex> lock(mutex_); 90*01826a49SYabin Cui while (queue_.empty() && !done_) { 91*01826a49SYabin Cui readerCv_.wait(lock); 92*01826a49SYabin Cui } 93*01826a49SYabin Cui if (queue_.empty()) { 94*01826a49SYabin Cui assert(done_); 95*01826a49SYabin Cui return false; 96*01826a49SYabin Cui } 97*01826a49SYabin Cui item = std::move(queue_.front()); 98*01826a49SYabin Cui queue_.pop(); 99*01826a49SYabin Cui } 100*01826a49SYabin Cui writerCv_.notify_one(); 101*01826a49SYabin Cui return true; 102*01826a49SYabin Cui } 103*01826a49SYabin Cui 104*01826a49SYabin Cui /** 105*01826a49SYabin Cui * Sets the maximum queue size. If `maxSize == 0` then it is unbounded. 106*01826a49SYabin Cui * 107*01826a49SYabin Cui * @param maxSize The new maximum queue size. 108*01826a49SYabin Cui */ setMaxSize(std::size_t maxSize)109*01826a49SYabin Cui void setMaxSize(std::size_t maxSize) { 110*01826a49SYabin Cui { 111*01826a49SYabin Cui std::lock_guard<std::mutex> lock(mutex_); 112*01826a49SYabin Cui maxSize_ = maxSize; 113*01826a49SYabin Cui } 114*01826a49SYabin Cui writerCv_.notify_all(); 115*01826a49SYabin Cui } 116*01826a49SYabin Cui 117*01826a49SYabin Cui /** 118*01826a49SYabin Cui * Promise that `push()` won't be called again, so once the queue is empty 119*01826a49SYabin Cui * there will never any more work. 120*01826a49SYabin Cui */ finish()121*01826a49SYabin Cui void finish() { 122*01826a49SYabin Cui { 123*01826a49SYabin Cui std::lock_guard<std::mutex> lock(mutex_); 124*01826a49SYabin Cui assert(!done_); 125*01826a49SYabin Cui done_ = true; 126*01826a49SYabin Cui } 127*01826a49SYabin Cui readerCv_.notify_all(); 128*01826a49SYabin Cui writerCv_.notify_all(); 129*01826a49SYabin Cui finishCv_.notify_all(); 130*01826a49SYabin Cui } 131*01826a49SYabin Cui 132*01826a49SYabin Cui /// Blocks until `finish()` has been called (but the queue may not be empty). waitUntilFinished()133*01826a49SYabin Cui void waitUntilFinished() { 134*01826a49SYabin Cui std::unique_lock<std::mutex> lock(mutex_); 135*01826a49SYabin Cui while (!done_) { 136*01826a49SYabin Cui finishCv_.wait(lock); 137*01826a49SYabin Cui } 138*01826a49SYabin Cui } 139*01826a49SYabin Cui }; 140*01826a49SYabin Cui 141*01826a49SYabin Cui /// Work queue for `Buffer`s that knows the total number of bytes in the queue. 142*01826a49SYabin Cui class BufferWorkQueue { 143*01826a49SYabin Cui WorkQueue<Buffer> queue_; 144*01826a49SYabin Cui std::atomic<std::size_t> size_; 145*01826a49SYabin Cui 146*01826a49SYabin Cui public: queue_(maxSize)147*01826a49SYabin Cui BufferWorkQueue(std::size_t maxSize = 0) : queue_(maxSize), size_(0) {} 148*01826a49SYabin Cui push(Buffer buffer)149*01826a49SYabin Cui void push(Buffer buffer) { 150*01826a49SYabin Cui size_.fetch_add(buffer.size()); 151*01826a49SYabin Cui queue_.push(std::move(buffer)); 152*01826a49SYabin Cui } 153*01826a49SYabin Cui pop(Buffer & buffer)154*01826a49SYabin Cui bool pop(Buffer& buffer) { 155*01826a49SYabin Cui bool result = queue_.pop(buffer); 156*01826a49SYabin Cui if (result) { 157*01826a49SYabin Cui size_.fetch_sub(buffer.size()); 158*01826a49SYabin Cui } 159*01826a49SYabin Cui return result; 160*01826a49SYabin Cui } 161*01826a49SYabin Cui setMaxSize(std::size_t maxSize)162*01826a49SYabin Cui void setMaxSize(std::size_t maxSize) { 163*01826a49SYabin Cui queue_.setMaxSize(maxSize); 164*01826a49SYabin Cui } 165*01826a49SYabin Cui finish()166*01826a49SYabin Cui void finish() { 167*01826a49SYabin Cui queue_.finish(); 168*01826a49SYabin Cui } 169*01826a49SYabin Cui 170*01826a49SYabin Cui /** 171*01826a49SYabin Cui * Blocks until `finish()` has been called. 172*01826a49SYabin Cui * 173*01826a49SYabin Cui * @returns The total number of bytes of all the `Buffer`s currently in the 174*01826a49SYabin Cui * queue. 175*01826a49SYabin Cui */ size()176*01826a49SYabin Cui std::size_t size() { 177*01826a49SYabin Cui queue_.waitUntilFinished(); 178*01826a49SYabin Cui return size_.load(); 179*01826a49SYabin Cui } 180*01826a49SYabin Cui }; 181*01826a49SYabin Cui } 182