Subordination
A framework for distributed programming
timer_pipeline.hh
1 #ifndef SUBORDINATION_PPL_TIMER_PIPELINE_HH
2 #define SUBORDINATION_PPL_TIMER_PIPELINE_HH
3 
4 #include <subordination/kernel/act.hh>
5 #include <subordination/ppl/basic_pipeline.hh>
6 #include <subordination/ppl/compare_time.hh>
7 #include <unistdx/ipc/semaphore>
8 
9 namespace sbn {
10 
11  namespace bits {
12 
13  template<class T>
14  using Priority_queue =
16 
17  template<class T>
18  using Priority_queue_traits =
19  priority_queue_traits<Priority_queue<T>>;
20 
21  template<class T>
22  using Timer_pipeline_base =
23  basic_pipeline<T, Priority_queue<T>,
24  Priority_queue_traits<T>>;
25 
26  }
27 
28  template<class T>
30 
32  using typename base_pipeline::kernel_type;
33  using typename base_pipeline::mutex_type;
34  using typename base_pipeline::lock_type;
35  using typename base_pipeline::sem_type;
36  using typename base_pipeline::traits_type;
37 
38  inline
39  timer_pipeline(timer_pipeline&& rhs) noexcept:
40  base_pipeline(std::move(rhs))
41  {}
42 
43  inline
44  timer_pipeline() noexcept:
45  base_pipeline(1u)
46  {}
47 
48  timer_pipeline(const timer_pipeline&) = delete;
49 
51  operator=(const timer_pipeline&) = delete;
52 
53  ~timer_pipeline() = default;
54 
55  protected:
56  void
57  do_run() override;
58 
59  private:
60  inline void
61  wait_until_kernel_arrives(lock_type& lock) {
62  this->_semaphore.wait(
63  lock,
64  [this] () {
65  return this->has_stopped() || !this->_kernels.empty();
66  }
67  );
68  }
69 
70  inline bool
71  wait_until_kernel_is_ready(lock_type& lock, kernel_type* k) {
72  return this->_semaphore.wait_until(
73  lock,
74  k->at(),
75  [this] { return this->has_stopped(); }
76  );
77  }
78 
79  };
80 
81 }
82 
83 #endif // vim:filetype=cpp
Definition: basic_pipeline.hh:40
Definition: timer_pipeline.hh:29