1 #ifndef OSMIUM_THREAD_QUEUE_HPP
2 #define OSMIUM_THREAD_QUEUE_HPP
37 #include <condition_variable>
45 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
54 static const std::chrono::milliseconds
max_wait{10};
79 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
80 size_t m_largest_size;
84 std::atomic<int> m_push_counter;
88 std::atomic<int> m_full_counter;
94 std::atomic<int> m_pop_counter;
98 std::atomic<int> m_empty_counter;
110 explicit Queue(
size_t max_size = 0,
const std::string& name =
"") :
111 m_max_size(max_size),
117 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
129 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
130 std::cerr <<
"queue '" << m_name
131 <<
"' with max_size=" << m_max_size
132 <<
" had largest size " << m_largest_size
133 <<
" and was full " << m_full_counter
134 <<
" times in " << m_push_counter
135 <<
" push() calls and was empty " << m_empty_counter
136 <<
" times in " << m_pop_counter
146 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
150 while (
size() >= m_max_size) {
151 std::unique_lock<std::mutex> lock{m_mutex};
152 m_space_available.wait_for(lock,
max_wait, [
this] {
155 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
160 std::lock_guard<std::mutex> lock{m_mutex};
161 m_queue.push(std::move(value));
162 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
163 if (m_largest_size < m_queue.size()) {
164 m_largest_size = m_queue.size();
167 m_data_available.notify_one();
171 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
174 std::unique_lock<std::mutex> lock{m_mutex};
175 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
176 if (m_queue.empty()) {
180 m_data_available.wait(lock, [
this] {
181 return !m_queue.empty();
183 if (!m_queue.empty()) {
184 value = std::move(m_queue.front());
188 m_space_available.notify_one();
194 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
198 std::lock_guard<std::mutex> lock{m_mutex};
199 if (m_queue.empty()) {
200 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
205 value = std::move(m_queue.front());
209 m_space_available.notify_one();
215 std::lock_guard<std::mutex> lock{m_mutex};
216 return m_queue.empty();
220 std::lock_guard<std::mutex> lock{m_mutex};
221 return m_queue.size();
230 #endif // OSMIUM_THREAD_QUEUE_HPP
size_t size() const
Definition: queue.hpp:219
bool empty() const
Definition: queue.hpp:214
std::mutex m_mutex
Definition: queue.hpp:69
~Queue()
Definition: queue.hpp:128
static const std::chrono::milliseconds max_wait
Definition: queue.hpp:54
std::queue< T > m_queue
Definition: queue.hpp:71
const std::string m_name
Name of this queue (for debugging only).
Definition: queue.hpp:67
const size_t m_max_size
Definition: queue.hpp:64
Namespace for everything in the Osmium library.
Definition: assembler.hpp:73
std::condition_variable m_space_available
Used to signal producers when queue is not full.
Definition: queue.hpp:77
bool try_pop(T &value)
Definition: queue.hpp:193
void wait_and_pop(T &value)
Definition: queue.hpp:170
void push(T value)
Definition: queue.hpp:145
Queue(size_t max_size=0, const std::string &name="")
Definition: queue.hpp:110
std::condition_variable m_data_available
Used to signal consumers when data is available in the queue.
Definition: queue.hpp:74