OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
blocking_queue.hpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#ifndef OPENCBDC_TX_SRC_COMMON_BLOCKING_QUEUE_H_
7#define OPENCBDC_TX_SRC_COMMON_BLOCKING_QUEUE_H_
8
9#include <condition_variable>
10#include <functional>
11#include <mutex>
12#include <queue>
13
14namespace cbdc {
18 template<typename T, typename Q>
20 public:
22
25 -> blocking_queue_internal& = delete;
26
29 -> blocking_queue_internal& = delete;
30
37
41 auto push(const T& item) -> size_t {
42 auto sz = [&]() {
43 std::unique_lock<std::mutex> lck(m_mut);
44 m_buffer.push(item);
45 m_wake = true;
46 return m_buffer.size();
47 }();
48 m_cv.notify_one();
49 return sz;
50 }
51
59 [[nodiscard]] auto pop(T& item) -> bool {
60 {
61 std::unique_lock<std::mutex> lck(m_mut);
62 if(m_buffer.empty()) {
63 m_cv.wait(lck, [&] {
64 return m_wake;
65 });
66 }
67
68 bool popped{false};
69 if(!m_buffer.empty()) {
70 item = std::move(first_item<T, Q>());
71 m_buffer.pop();
72 popped = true;
73 m_wake = !m_buffer.empty();
74 }
75
76 return popped;
77 }
78 }
79
81 void clear() {
82 {
83 std::unique_lock<std::mutex> lck(m_mut);
84 m_buffer = decltype(m_buffer)();
85 m_wake = true;
86 }
87 m_cv.notify_all();
88 }
89
93 void reset() {
94 std::unique_lock l(m_mut);
95 m_wake = false;
96 }
97
98 private:
99 template<typename TT, typename QQ>
100 auto first_item() ->
101 typename std::enable_if<std::is_same<QQ, std::queue<TT>>::value,
102 const TT&>::type {
103 return m_buffer.front();
104 }
105
106 template<typename TT, typename QQ>
107 auto first_item() ->
108 typename std::enable_if<!std::is_same<QQ, std::queue<TT>>::value,
109 const TT&>::type {
110 return m_buffer.top();
111 }
112
113 Q m_buffer;
114 std::mutex m_mut;
115 std::condition_variable m_cv;
116 bool m_wake{false};
117 };
118
119 template<typename T>
121
122 template<typename T, typename C = std::less<T>>
125 std::priority_queue<T, std::vector<T>, C>>;
126}
127
128#endif // OPENCBDC_TX_SRC_COMMON_BLOCKING_QUEUE_H_
Thread-safe producer-consumer FIFO queue supporting multiple concurrent producers and consumers.
auto push(const T &item) -> size_t
Pushes an element onto the queue and notifies at most one waiting consumer.
void clear()
Clears the queue and unblocks waiting consumers.
blocking_queue_internal(blocking_queue_internal &&)=delete
auto operator=(const blocking_queue_internal &) -> blocking_queue_internal &=delete
auto operator=(blocking_queue_internal &&) -> blocking_queue_internal &=delete
void reset()
Removes the wakeup flag for consumers.
auto pop(T &item) -> bool
Pops an element from the queue.
blocking_queue_internal(const blocking_queue_internal &)=delete