rangeless::fn
Parallel

Functions

impl::async_wr rangeless::fn::to_async (size_t queue_size=16)
 Wrap generating seq in an async-task. More...
 
template<typename F >
impl::par_transform< F, impl::std_asyncrangeless::fn::transform_in_parallel (F map_fn)
 Parallelized version of fn::transform More...
 
template<typename F , typename Async >
impl::par_transform< F, Async > rangeless::fn::transform_in_parallel (F map_fn, Async async)
 A version of transform_in_parallel that uses a user-provided Async (e.g. backed by a fancy work-stealing thread-pool implementation). More...
 

Detailed Description

Function Documentation

◆ to_async()

impl::async_wr rangeless::fn::to_async ( size_t  queue_size = 16)
inline

Wrap generating seq in an async-task.

long i = 0, res = 0;
fn::seq([&]{ return i < 9 ? i++ : fn::end_seq(); })
% fn::transform([](long x) { return x + 1; })
% fn::to_async(42) // the generator+transform will be offloaded to an async-task
// and the elements will be yielded via 42-capacity blocking queue.
// (If we wanted the generator and transform to be offloaded to
// separate threads, we could insert another to_async() before transform()).
% fn::for_each([&](long x) {
res = res * 10 + x;
});
assert(res == 123456789);

Definition at line 7070 of file fn.hpp.

◆ transform_in_parallel() [1/2]

template<typename F >
impl::par_transform<F, impl::std_async> rangeless::fn::transform_in_parallel ( map_fn)

Parallelized version of fn::transform

Requires #define RANGELESS_FN_ENABLE_PARALLEL 1 before #include fn.hpp because it brings in "heavy" STL #includes (<future> and <thread>).

queue_capacity is the maximum number of simultaneosly-running std::async-tasks, each executing a single invocation of map_fn.

NB: if the execution time of map_fn is highly variable, having higher capacity may help, such that tasks continue to execute while we're blocked waiting on a result from a long-running task.

NB: If the tasks are too small compared to overhead of running as async-task, it may be helpful to batch them (see fn::in_groups_of), have map_fn produce a vector of outputs from a vector of inputs, and fn::concat the outputs.

map_fn is required to be thread-safe.

NB: the body of the map_fn would normally compute the result in-place, but it could also, for example, execute a subprocess do it, or offload it to a cloud or a compute-farm, etc.


Q: Why do we need this? We have parallel std::transform and std::transform_reduce in c++17?

A: Parallel std::transform requires a multi-pass ForwardRange rather than InputRange, and std::terminates if any of the tasks throws. std::transform_reduce requires ForwardRange and type-symmetric, associative, and commutative BinaryOp (making it next-to-useless).

// Example: implement parallelized gzip compressor (a-la pigz)
#define RANGELESS_FN_ENABLE_PARALLEL 1
#include "fn.hpp"
#include <util/compress/stream_util.hpp>
int main()
{
auto& istr = std::cin;
auto& ostr = std::cout;
istr.exceptions(std::ios::badbit);
ostr.exceptions(std::ios::failbit | std::ios::badbit);
namespace fn = rangeless::fn;
using fn::operators::operator%;
using bytes_t = std::string;
fn::seq([&istr]() -> bytes_t
{
auto buf = bytes_t(1000000UL, '\0');
istr.read(&buf[0], std::streamsize(buf.size()));
buf.resize(size_t(istr.gcount()));
return !buf.empty() ? std::move(buf) : fn::end_seq();
})
% fn::transform_in_parallel([](bytes_t buf) -> bytes_t
{
// compress the block.
std::ostringstream local_ostr;
ncbi::CCompressOStream{
local_ostr,
ncbi::CCompressOStream::eGZipFile } << buf;
return local_ostr.str();
}).queue_capacity( std::thread::hardware_concurrency() )
% fn::for_each([&ostr](bytes_t buf)
{
ostr.write(buf.data(), std::streamsize(buf.size()));
});
return istr.eof() && !istr.bad() ? 0 : 1;
}

See an similar examples using RaftLib or TBB

Definition at line 7158 of file fn.hpp.

◆ transform_in_parallel() [2/2]

template<typename F , typename Async >
impl::par_transform<F, Async> rangeless::fn::transform_in_parallel ( map_fn,
Async  async 
)

A version of transform_in_parallel that uses a user-provided Async (e.g. backed by a fancy work-stealing thread-pool implementation).

Async is a unary invokable having the following signature: template<typename NullaryInvokable> operator()(NullaryInvokable job) const -> std::future<decltype(job())>

NB: Async must be copy-constructible (may be passed via std::ref as appropriate).

A single-thread pool can be used to offload the transform-stage to a separate thread if transform is not parallelizeable.

auto res2 =
std::vector{{1,2,3,4,5}} // can be an InputRange
{
return std::to_string(x);
},
[&my_thread_pool](auto job) -> std::future<decltype(job())>
{
return my_thread_pool.enqueue(std::move(job));
}).queue_capacity(10)
% fn::foldl_d([](std::string out, std::string in)
{
return std::move(out) + "," + in;
});
VERIFY(res2 == ",1,2,3,4,5");

Definition at line 7194 of file fn.hpp.