rangeless::fn
rangeless::mt::synchronized_queue< T, BasicLockable > Class Template Reference

Optionally-bounded blocking concurrent MPMC queue. More...

#include <fn.hpp>

Inheritance diagram for rangeless::mt::synchronized_queue< T, BasicLockable >:
rangeless::mt::synchronized_queue_base

Classes

struct  close_guard
 
struct  pop_t
 
struct  push_t
 Implements insert_iterator and unary-invokable. More...
 

Public Types

using value_type = T
 
- Public Types inherited from rangeless::mt::synchronized_queue_base
enum  status { status::success, status::closed, status::timeout }
 

Public Member Functions

 synchronized_queue (size_t cap=1024)
 
 ~synchronized_queue ()=default
 
template<typename F >
auto operator>>= (F &&sink) -> decltype((void) sink(this->pop()))
 pop() the values into the provided sink-function until closed and empty. More...
 
template<typename Duration = std::chrono::milliseconds>
status try_push (value_type &&value, Duration timeout={})
 In case of success, the value will be moved-from. More...
 
template<typename Duration = std::chrono::milliseconds>
status try_pop (value_type &value, Duration timeout={})
 In case of success, the value will be move-assigned. More...
 
size_t approx_size () const noexcept
 
size_t capacity () const noexcept
 
bool closed () const noexcept
 
close_guard close () noexcept
 Return an RAII object that will close the queue in its destructor. More...
 
struct push_t
 
struct pop_t
 
push_t push = { *this }
 Blocking push. May throw queue_closed. More...
 
pop_t pop = { *this }
 Blocking pop. May throw queue_closed. More...
 

Detailed Description

template<typename T, class BasicLockable = std::mutex>
class rangeless::mt::synchronized_queue< T, BasicLockable >

Optionally-bounded blocking concurrent MPMC queue.

  • Supports not-copy-constructible/not-default-constructible value-types (just requires move-assigneable).
  • Can be used with lockables other than std::mutex, e.g. mt::atomic_lock.
  • Contention-resistant: when used with mt::atomic_lock the throughput is comparable to state-of-the-art lock-free implementations.
  • Short and simple implementation using only c++11 standard library primitives.
  • Provides RAII-based closing semantics to communicate end-of-inputs from the pushing end or failure/going-out-of-scope from the popping end.

Related:
boost::fibers::buffered_channel
boost::sync_bounded_queue
boost::lockfree::queue
tbb::concurrent_queue
moodycamel::BlockingConcurrentQueue

// A toy example to compute sum of lengths of strings in parallel.
//
// Spin-off a separate async-task that enqueues jobs
// to process a single input, and enqueues the
// futures into a synchronized queue, while accumulating
// the ready results from the queue in this thread.
using queue_t = mt::synchronized_queue<std::future<size_t> >;
queue_t queue{ 10 };
auto fut = std::async(std::launch::async,[ &queue ]
{
auto close_on_exit = queue.close();
for(std::string line; std::getline(std::cin, line); ) {
queue <<=
std::async(
std::launch::async,
[](const std::string& s) {
return s.size();
},
std::move(line));
}
});
size_t total = 0;
queue >>= [&](queue_t::value_type x) { total += x.get(); };
fut.get(); // rethrow exception, if any.

Definition at line 6413 of file fn.hpp.

Member Typedef Documentation

◆ value_type

template<typename T, class BasicLockable = std::mutex>
using rangeless::mt::synchronized_queue< T, BasicLockable >::value_type = T

Definition at line 6416 of file fn.hpp.

Constructor & Destructor Documentation

◆ synchronized_queue()

template<typename T, class BasicLockable = std::mutex>
rangeless::mt::synchronized_queue< T, BasicLockable >::synchronized_queue ( size_t  cap = 1024)
inline

Definition at line 6420 of file fn.hpp.

◆ ~synchronized_queue()

template<typename T, class BasicLockable = std::mutex>
rangeless::mt::synchronized_queue< T, BasicLockable >::~synchronized_queue ( )
default

Member Function Documentation

◆ approx_size()

template<typename T, class BasicLockable = std::mutex>
size_t rangeless::mt::synchronized_queue< T, BasicLockable >::approx_size ( ) const
inlinenoexcept

Definition at line 6688 of file fn.hpp.

◆ capacity()

template<typename T, class BasicLockable = std::mutex>
size_t rangeless::mt::synchronized_queue< T, BasicLockable >::capacity ( ) const
inlinenoexcept

Definition at line 6693 of file fn.hpp.

◆ close()

template<typename T, class BasicLockable = std::mutex>
close_guard rangeless::mt::synchronized_queue< T, BasicLockable >::close ( )
inlinenoexcept

Return an RAII object that will close the queue in its destructor.

auto guard = queue.close(); // close the queue when leaving scope
queue.close(); // close the queue now (guard's is destroyed immediately)


NB: closing is non-blocking.
Blocked calls to try_push()/try_pop() shall return with status::closed.
Blocked calls to push()/pop() shall throw queue_closed.
Subsequent calls to push()/try_push() shall do as above.
Subsequent calls to pop()/try_pop() will succeed until the queue becomes empty, and throw/return-closed thereafter.

Definition at line 6742 of file fn.hpp.

◆ closed()

template<typename T, class BasicLockable = std::mutex>
bool rangeless::mt::synchronized_queue< T, BasicLockable >::closed ( ) const
inlinenoexcept

Definition at line 6698 of file fn.hpp.

◆ operator>>=()

template<typename T, class BasicLockable = std::mutex>
template<typename F >
auto rangeless::mt::synchronized_queue< T, BasicLockable >::operator>>= ( F &&  sink) -> decltype((void)sink(this->pop()))
inline

pop() the values into the provided sink-function until closed and empty.

e.g. queue >>= [&out_it](T x){ *out_it++ = std::move(x); };
Queue is automatically closed if exiting via exception, unblocking the pushers.

Definition at line 6553 of file fn.hpp.

◆ try_pop()

template<typename T, class BasicLockable = std::mutex>
template<typename Duration = std::chrono::milliseconds>
status rangeless::mt::synchronized_queue< T, BasicLockable >::try_pop ( value_type value,
Duration  timeout = {} 
)
inline

In case of success, the value will be move-assigned.

Definition at line 6645 of file fn.hpp.

◆ try_push()

template<typename T, class BasicLockable = std::mutex>
template<typename Duration = std::chrono::milliseconds>
status rangeless::mt::synchronized_queue< T, BasicLockable >::try_push ( value_type &&  value,
Duration  timeout = {} 
)
inline

In case of success, the value will be moved-from.

Definition at line 6593 of file fn.hpp.

Friends And Related Function Documentation

◆ pop_t

template<typename T, class BasicLockable = std::mutex>
friend struct pop_t
friend

Definition at line 6536 of file fn.hpp.

◆ push_t

template<typename T, class BasicLockable = std::mutex>
friend struct push_t
friend

Definition at line 6517 of file fn.hpp.

Member Data Documentation

◆ pop

template<typename T, class BasicLockable = std::mutex>
pop_t rangeless::mt::synchronized_queue< T, BasicLockable >::pop = { *this }

Blocking pop. May throw queue_closed.

Definition at line 6540 of file fn.hpp.

◆ push

template<typename T, class BasicLockable = std::mutex>
push_t rangeless::mt::synchronized_queue< T, BasicLockable >::push = { *this }

Blocking push. May throw queue_closed.

Definition at line 6522 of file fn.hpp.


The documentation for this class was generated from the following file: