rangeless::fn
aln_filter.cpp
Go to the documentation of this file.
1 #define RANGELESS_FN_ENABLE_PARALLEL 1
2 #include <fn.hpp>
3 #include <iostream>
4 
5 // A real-world -inspired example showcasing same of the fn:: functionality:
6 //
7 // Problem Statement:
8 //
9 // Given a stream of mrna-to-chromosome alignments (aln_t), sorted by gene-id,
10 // filter it as follows, lazily, i.e. fetching from the stream incrementally as-necessary:
11 //
12 // Per gene_id:
13 // 1) Drop alignments where exists an alignment with seq-id
14 // having same mrna-accession and higher mrna-version,
15 // e.g. drop ("NM_002020",5) if ("NM_002020",6) exists.
16 //
17 // 2) Realign inputs, in parallel, using supplied function realign:aln_t->alns_t
18 //
19 // 3) Keep top-scoring alignments per mrna-id.
20 //
21 // 4) Drop duplicates (same mrna-id, chr-id, chr-start, chr-stop).
22 //
23 // 5) Then select alignments for gene sharing a single common genomic-cds-start:
24 // 1) Drop alignments without a valid cds-start.
25 // 2) Prefer positions supported by more alignments.
26 // 3) Then prefer "NC_*" chr-id.
27 // 4) Then prefer lower chr-id.
28 // 5) Then prefer lower (more upstream) cds-start.
29 //
30 // 6) Sort by decreasing alignment score, increasing mrna-id.
31 //
32 // The above processing steps shall not make copies of aln_t.
33 //
34 // The aln_filter(...) below has:
35 // Number of if-statements: 0
36 // Number of loops: 0
37 // Control-flow nesting level: 0
38 // Direct use of iterators: 0
39 // Non-const variables (mutable state) declared: 0
40 // Statements mutating local state: 0
41 // Statements mutating external state: 0
42 // Compile-time for the entire example: ~2.8s.
43 //
44 namespace example
45 {
46 
47 struct aln_t
48 {
49  using accession_t = std::string;
50  using version_t = uint32_t;
51  using seq_id_t = std::pair<accession_t, version_t>;
52  using gene_id_t = int32_t;
53  using pos_t = int64_t; // genomic position on chr.
54  // signed; 1-based; 0 is invalid-pos;
55  // x and -x refer to same nucleotide position
56  // in forward and reverse orientations.
57 
58  static constexpr pos_t invalid_pos = 0;
59 
60  //-----------------------------------------------------------------------
61 
62  int64_t aln_id;
65 
70 
71  int64_t score;
72 
74  {
75  //...
76  };
77 
78  //---------------------------------------------------------------------------
79  // Will make our type move-only to assert that our filtering
80  // steps do not silently make copies under the hood.
81  // (will fail to compile if it tries to)
82 
83  aln_t(const aln_t&) = delete;
84  aln_t& operator=(const aln_t&) = delete;
85 
86  aln_t(aln_t&&) = default;
87  aln_t& operator=(aln_t&&) = default;
88 
89  aln_t() = default;
90 
91 };
92 
93 using alns_t = std::vector<aln_t>;
94 
95 
96 namespace fn = rangeless::fn;
97 using fn::operators::operator%; // arg % f % g % h; returns h(g(f(std::forward<Arg>(arg))));
98 
99 
100 //---------------------------------------------------------------------------
101 // Other similar libraries in c++ or other languages do not typically
102 // provide "filter to minimal (or maximal) elements" constructs,
103 // (fn::where_min_by fn::where_max_by), or fn::group_all_by, fn::unique_all_by,
104 // or lazy transform_in_parallel.
105 //
106 // For demonstration we'll implement them below ourselves,
107 // and will then use my::group_all_by instead of fn::group_all_by, etc.
108 namespace my
109 {
110 
111 static auto group_all_by = [](auto key_fn)
112 {
113  return [key_fn = std::move(key_fn)](auto inputs)
114  {
115  return std::move(inputs)
116  % fn::sort_by(key_fn)
117  % fn::group_adjacent_by(key_fn);
118  };
119 };
120 
121 //---------------------------------------------------------------------------
122 
123 static auto unique_all_by = [](auto key_fn)
124 {
125  return [key_fn = std::move(key_fn)](auto inputs)
126  {
127  return std::move(inputs)
128  % fn::sort_by(key_fn)
129  % fn::unique_adjacent_by(key_fn);
130  };
131 };
132 
133 //---------------------------------------------------------------------------
134 
135 static auto where_min_by = [](auto key_fn)
136 {
137  return [key_fn = std::move(key_fn)](auto inputs)
138  {
139  // NB: implementation in fn:: is more involved to avoid sort/group.
140  return std::move(inputs)
141  % fn::sort_by(key_fn) // could use fn::lazy_sort_by here, because
142  // we only need the first group below, but
143  // lazy_sort_by is not stable.
144  % fn::group_adjacent_by(key_fn)
145  % fn::take_first(1) // min-elements are in the first group
146  % fn::concat(); // [[min-elements]] -> [min-elements]
147  };
148 };
149 
150 //---------------------------------------------------------------------------
151 
152 static auto where_max_by = [](auto key_fn)
153 {
154  return my::where_min_by( fn::by::decreasing( std::move(key_fn)));
155 };
156 
157 //---------------------------------------------------------------------------
158 
159 static auto lazy_transform_in_parallel = [](auto fn,
160  size_t max_queue_size = std::thread::hardware_concurrency())
161 {
162  assert(max_queue_size >= 1);
163 
164  return [max_queue_size, fn](auto inputs) // inputs can be an lazy InputRange
165  {
166  return std::move(inputs)
167 
168  //-------------------------------------------------------------------
169  // Lazily yield std::async invocations of fn.
170 
171  % fn::transform([fn](auto inp)
172  {
173  return std::async( std::launch::async,
174  [inp = std::move(inp), fn]() mutable // mutable because inp will be moved-from
175  {
176  return fn(std::move(inp));
177  });
178  })
179 
180  //-------------------------------------------------------------------
181  // Cap the incoming sequence of tasks with a seq of `max_queue_size`-1
182  // dummy future<...>'s, such that all real tasks make it
183  // from the other end of the sliding-window in the next stage.
184 
185  % fn::append( fn::seq([i = 1UL, max_queue_size]() mutable
186  {
187  using fn_out_t = decltype( fn( std::move( *inputs.begin())));
188  return i++ < max_queue_size ? std::future<fn_out_t>() : fn::end_seq();
189  }))
190 
191  //-------------------------------------------------------------------
192  // Buffer executing async-tasks in a fixed-sized sliding window;
193  // yield the result from the oldest (front) std::future.
194 
195  % fn::sliding_window(max_queue_size)
196 
197  % fn::transform([](auto view) // a view from a window? Get out!
198  {
199  return view.begin()->get();
200  });
201  };
202 };
203 
204 
205 // for demonstration: suppose a single invocation of transform-function is too small
206 // compared to async-invocation overhead, so we want to amortize the overhead by batching:
208  size_t max_queue_size = std::thread::hardware_concurrency(),
209  size_t batch_size = 2)
210 {
211  return [=](auto inputs)
212  {
213  return std::move(inputs)
214  % fn::in_groups_of(batch_size)
215  % my::lazy_transform_in_parallel( [&](auto inputs_batch)
216  {
217  return std::move(inputs_batch)
218  % fn::transform(fn)
219  % fn::to_vector(); // NB: since fn::transform is lazy,
220  // we need to force eager-evalution
221  // within this batch-transform function.
222  }, max_queue_size)
223  % fn::concat(); // flatten the batches of outputs
224  };
225 };
226 
227 
228 } //namespace my
229 
230 
231 //---------------------------------------------------------------------------
232 
233 static alns_t realign(aln_t a) // realign stub: just return the original
234 {
235  alns_t ret;
236  ret.push_back(std::move(a));
237  return ret;
238 }
239 
240 #define LAMBDA(expr) ([&](const auto& _){ return expr; })
241 
242 
243 //---------------------------------------------------------------------------
244 // Filtering steps (5) and (6)
245 
246 static auto filter_to_unique_cds_for_gene(alns_t alns_for_gene) -> alns_t
247 {
248  return std::move(alns_for_gene)
249 
250  // (5.1) Keep alignments with valid cds-start.
251 
252  % fn::where LAMBDA( _.chr_cds_start_pos != aln_t::invalid_pos )
253 
254  //-------------------------------------------------------------------
255  // (5.2) Keep alignments with most-ubiquitous valid cds-starts.
256 
257  % my::group_all_by LAMBDA( _.chr_cds_start_pos )
258  % my::where_max_by LAMBDA( _.size() )
259  % fn::concat()
260 
261  //-------------------------------------------------------------------
262  // Filter to unique chr_cds_start_pos.
263  // (5.3) Prefer on "NC_*" chr-accession,
264  // (5.4) then lower chr-id,
265  // (5.5) then more upstream cds-start.
266 
268  std::make_tuple( _.chr_id.first.find("NC_") != 0,
269  std::cref( _.chr_id ),
270  _.chr_cds_start_pos))
271 
272 #if 1
273  //-------------------------------------------------------------------
274  // (6) Sort by decreasing alignment score, then by increasing mrna-id.
275 
276  % fn::sort_by LAMBDA(
277  std::make_pair( fn::by::decreasing( _.score),
278  std::cref( _.mrna_id) ))
279 
280 #else // alternatively, e.g if you want to use your own sort
281 
282  % [](alns_t alns)
283  {
284  gfx::timsort(
285  alns.begin(), alns.end(),
286  fn::by::make_comp([](const aln_t& a)
287  {
288  return std::make_pair(
290  std::cref( a.mrna_id));
291  });
292  return std::move(alns);
293  }
294 #endif
295  ; // end of return-statement
296 }
297 
298 
299 //---------------------------------------------------------------------------
300 
301 // Implement as lambda so can rely on the automatic return-type deduction,
302 // which will be some longwindedly-named lazy seq<...>
303 
304 static auto aln_filter = [](auto alns_seq) // alns may be a lazy input-sequence (i.e. an InputRange)
305 {
306  return std::move(alns_seq)
307 
308  //-----------------------------------------------------------------------
309  // (1) Filter to latest mRNA-version per mRNA-accession
310 
311  % fn::group_adjacent_by( std::mem_fn( &aln_t::gene_id))
312  % fn::transform( [](alns_t alns_for_gene) -> alns_t
313  {
314  return std::move(alns_for_gene)
315  % my::group_all_by LAMBDA( std::cref( _.mrna_id.first))
316  % fn::transform(
317  my::where_max_by( std::mem_fn( &aln_t::mrna_id)))
318  % fn::concat(); // un-group
319  })
320  % fn::concat()
321 
322  //-----------------------------------------------------------------------
323  // (2) Realign in parallel
324 
325  % my::batched_lazy_transform_in_parallel(realign) // aln_t -> alns_t
326  % fn::concat()
327 
328  //-----------------------------------------------------------------------
329  // Per mrna-id:
330  // (3) Keep top-scoring
331  // (4) Drop duplicates
332 
333  % fn::group_adjacent_by( std::mem_fn( &aln_t::mrna_id)) // were made adjacent in (1)
334  % fn::transform( [](alns_t alns_for_mrna) -> alns_t
335  {
336  return std::move(alns_for_mrna)
337  % my::where_max_by( std::mem_fn( &aln_t::score))
339  std::tie( _.mrna_id, _.chr_id, _.chr_start, _.chr_stop ));
340  })
341  % fn::concat()
342 
343  //-----------------------------------------------------------------------
344  // (5), (6)
345 
346  % fn::group_adjacent_by( std::mem_fn( &aln_t::gene_id))
348  % fn::concat();
349 };
350 
351  // Curiously, group-by/transform/concat pattern appears to be
352  // very common. Perhaps it needs a separate abstraction?
353 
354 } // namespace example
355 
356 //---------------------------------------------------------------------------
357 
358 int main()
359 {
360  using namespace example;
361 
362  alns_t alns{}; // normally these would come from a stream, but for the sake of example will yield from a vec.
363 
364  // GeneID:2
365  alns.push_back(aln_t{ 101, 2, {"NM_000001", 2}, {"NC_000001", 1}, 1000000, 1001000, 100100, 100}); // keep.
366  alns.push_back(aln_t{ 102, 2, {"NM_000001", 2}, {"NC_000001", 1}, 1000000, 1001000, 100100, 100}); // duplicate.
367  alns.push_back(aln_t{ 103, 2, {"NM_000001", 2}, {"NC_000001", 1}, 1000001, 1001000, 100100, 50 }); // not top-scoring for this mrna.
368  alns.push_back(aln_t{ 104, 2, {"NM_000001", 1}, {"NC_000001", 1}, 1000000, 1001000, 100100, 100}); // superceded mrna-version.
369  alns.push_back(aln_t{ 201, 2, {"NM_000002", 1}, {"NC_000001", 1}, 1000000, 1001000, 0, 100}); // no valid-CDS.
370  alns.push_back(aln_t{ 301, 2, {"NM_000003", 1}, {"NC_000001", 1}, 1000000, 1001000, 0, 100}); // no valid-CDS.
371  alns.push_back(aln_t{ 401, 2, {"NM_000004", 1}, {"NC_000001", 1}, 1000000, 1001000, 0, 100}); // no valid-CDS.
372  alns.push_back(aln_t{ 501, 2, {"NM_000005", 1}, {"NC_000001", 1}, 1000000, 1001000, 100100, 110}); // keep.
373  alns.push_back(aln_t{ 801, 2, {"NM_000008", 1}, {"NC_000001", 1}, 1000000, 1001000, 100200, 100}); // not most-supported-CDS.
374 
375  // GeneID:3
376  alns.push_back(aln_t{ 601, 3, {"NM_000005", 1}, {"NC_000001", 1}, 1000000, 1001000, 100100, 100}); // keep.
377  alns.push_back(aln_t{ 701, 3, {"NM_000007", 1}, {"NT_000001", 1}, 1000000, 1001000, 100100, 100}); // not on NC.
378 
379  namespace fn = rangeless::fn;
380  using fn::operators::operator%;
381 
382  std::vector<int64_t> kept_ids{};
383 
384  // we could just std::move(alns) instead of fn::seq(...) here,
385  // but demonstrating that input can also be a lazy seq, e.g. deserializing from an istream.
386  fn::seq([&, i = 0UL]() mutable -> aln_t
387  {
388  return i < alns.size() ? std::move(alns[i++]) : fn::end_seq();
389  })
390 
392 
393  % fn::for_each( [&](aln_t a)
394  {
395  std::cerr << a.gene_id << "\t" << a.aln_id << "\n";
396  kept_ids.push_back(a.aln_id);
397  });
398 
399  assert((kept_ids == std::vector<int64_t>{{ 501, 101, 601 }} ));
400 
401  return 0;
402 }
403 
impl::group_adjacent_by< F > group_adjacent_by(F key_fn)
Group adjacent elements.
Definition: fn.hpp:4059
static auto unique_all_by
Definition: aln_filter.cpp:123
gene_id_t gene_id
Definition: aln_filter.cpp:63
impl::where< P > where(P pred)
Filter elements.
Definition: fn.hpp:3903
static auto filter_to_unique_cds_for_gene(alns_t alns_for_gene) -> alns_t
Definition: aln_filter.cpp:246
seq_id_t chr_id
Definition: aln_filter.cpp:66
impl::group_adjacent_by< impl::chunker > in_groups_of(size_t n)
Group adjacent elements into chunks of specified size.
Definition: fn.hpp:4135
static auto where_max_by
Definition: aln_filter.cpp:152
static auto lazy_transform_in_parallel
Definition: aln_filter.cpp:159
int64_t aln_id
Definition: aln_filter.cpp:62
static auto batched_lazy_transform_in_parallel
Definition: aln_filter.cpp:207
static auto aln_filter
Definition: aln_filter.cpp:304
impl::concat concat()
Flatten the result of group_all_by or group_adjacent_by.
Definition: fn.hpp:4291
impl::sliding_window sliding_window(size_t win_size)
Definition: fn.hpp:3801
std::string accession_t
Definition: aln_filter.cpp:49
impl::sort_by< F, impl::stable_sort_tag > sort_by(F key_fn)
stable-sort and return the input.
Definition: fn.hpp:4174
static auto group_all_by
Definition: aln_filter.cpp:111
impl::take_while< impl::call_count_lt > take_first(size_t n=1)
Yield first n elements.
Definition: fn.hpp:3838
aln_t()=default
LINQ -like library of higher-order functions for data manipulation.
Definition: fn.hpp:58
impl::gt< T > decreasing(T x)
Wraps the passed value and exposes inverted operator<.
Definition: fn.hpp:898
std::pair< accession_t, version_t > seq_id_t
Definition: aln_filter.cpp:51
Return fn::end_seq() from input-range generator function to signal end-of-inputs.
Definition: fn.hpp:281
impl::seq< impl::catch_end< NullaryInvokable > > seq(NullaryInvokable gen_fn)
Adapt a generator function as InputRange.
Definition: fn.hpp:677
int main()
Definition: aln_filter.cpp:358
static constexpr pos_t invalid_pos
Definition: aln_filter.cpp:58
#define LAMBDA(expr)
Definition: aln_filter.cpp:240
seq_id_t mrna_id
Definition: aln_filter.cpp:64
impl::to_vector to_vector()
Move elements of an Iterable to std::vector.
Definition: fn.hpp:3459
impl::append< Iterable > append(Iterable next)
Yield elements of next after elements of arg.
Definition: fn.hpp:4307
pos_t chr_cds_start_pos
Definition: aln_filter.cpp:69
impl::transform< F > transform(F map_fn)
Create a seq yielding results of applying the transform functions to input-elements.
Definition: fn.hpp:3670
impl::comp< F > make_comp(F key_fn)
Make binary comparison predicate from a key-function.
Definition: fn.hpp:927
std::vector< aln_t > alns_t
Definition: aln_filter.cpp:93
int32_t gene_id_t
Definition: aln_filter.cpp:52
impl::unique_adjacent_by< F > unique_adjacent_by(F key_fn)
Keep first element from every adjacently-equal run of elements.
Definition: fn.hpp:4251
aln_t & operator=(const aln_t &)=delete
impl::for_each< F > for_each(F fn)
Definition: fn.hpp:3811
uint32_t version_t
Definition: aln_filter.cpp:50
static auto where_min_by
Definition: aln_filter.cpp:135
static alns_t realign(aln_t a)
Definition: aln_filter.cpp:233