Subordination
A framework for distributed programming
basic_pipeline.hh
1 #ifndef SUBORDINATION_PPL_BASIC_PIPELINE_HH
2 #define SUBORDINATION_PPL_BASIC_PIPELINE_HH
3 
4 #include <cassert>
5 #include <queue>
6 #include <thread>
7 #include <vector>
8 
9 #include <unistdx/base/simple_lock>
10 #include <unistdx/base/spin_mutex>
11 #include <unistdx/ipc/process>
12 #include <unistdx/ipc/thread_semaphore>
13 #include <unistdx/util/system>
14 
15 #include <subordination/base/container_traits.hh>
16 #include <subordination/base/queue_popper.hh>
17 #include <subordination/base/queue_pusher.hh>
18 #include <subordination/base/thread_name.hh>
19 #include <subordination/kernel/kernel_type.hh>
20 #include <subordination/ppl/pipeline_base.hh>
21 #include <subordination/ppl/thread_context.hh>
22 
23 namespace sbn {
24 
25  void
26  graceful_shutdown(int ret);
27 
28  int
29  wait_and_return();
30 
31  template<
32  class T,
33  class Kernels=std::queue<T*>,
34  class Traits=queue_traits<Kernels>,
35  class Threads=std::vector<std::thread>,
36  class Mutex=sys::spin_mutex,
37  class Lock=sys::simple_lock<Mutex>,
38  class Semaphore=sys::thread_semaphore
39  >
40  class basic_pipeline: public pipeline_base {
41 
42  public:
43  typedef T kernel_type;
44  typedef Kernels kernel_pool;
45  typedef Threads thread_pool;
46  typedef Mutex mutex_type;
47  typedef Lock lock_type;
48  typedef Semaphore sem_type;
49  typedef Traits traits_type;
50 
51  protected:
54 
55  protected:
56  kernel_pool _kernels;
57  thread_pool _threads;
58  mutable mutex_type _mutex;
59  mutable sem_type _semaphore;
60 
61  public:
62  basic_pipeline() = default;
63 
64  inline explicit
65  basic_pipeline(unsigned concurrency) noexcept:
66  _kernels(),
67  _threads(std::max(1u, concurrency)),
68  _mutex(),
69  _semaphore()
70  {}
71 
72  inline
73  basic_pipeline(basic_pipeline&& rhs) noexcept:
74  _kernels(std::move(rhs._kernels)),
75  _threads(std::move(rhs._threads)),
76  _mutex(),
77  _semaphore()
78  {}
79 
80  inline
81  ~basic_pipeline() {
82  // ensure that kernels inserted without starting
83  // a pipeline are deleted
84  kernel_sack sack;
85  this->collect_kernels(std::back_inserter(sack));
86  }
87 
88  basic_pipeline(const basic_pipeline&) = delete;
89 
91  operator=(const basic_pipeline&) = delete;
92 
93  void
94  send(kernel_type* k) {
95  #ifndef NDEBUG
96  this->log("send _", *k);
97  #endif
98  lock_type lock(this->_mutex);
99  traits_type::push(this->_kernels, k);
100  this->_semaphore.notify_one();
101  }
102 
103  void
104  send(kernel_type** kernels, size_t n) {
105  lock_type lock(this->_mutex);
106  #ifndef NDEBUG
107  assert(
108  not std::any_of(
109  kernels,
110  kernels+n,
111  std::mem_fn(&kernel_type::moves_downstream)
112  )
113  );
114  #endif
115  std::copy_n(kernels, n, queue_pusher(this->_kernels));
116  this->_semaphore.notify_one();
117  }
118 
119  void
120  start() {
121  this->setstate(pipeline_state::started);
122  unsigned thread_no = this->_number;
123  for (std::thread& thr : this->_threads) {
124  thr = std::thread(
125  [this,thread_no] () {
126  try {
127  sys::this_process::name(this->_name);
128  } catch (...) {
129  }
130  this_thread::name = this->_name;
131  this_thread::number = thread_no;
132  this->run(&this_thread::context);
133  }
134  );
135  ++thread_no;
136  }
137  }
138 
139  void
140  stop() {
141  lock_type lock(this->_mutex);
142  this->xstop();
143  const size_t nthreads = this->_threads.size();
144  for (size_t i=0; i<nthreads; ++i) {
145  this->_semaphore.notify_one();
146  }
147  }
148 
149  void
150  wait() {
151  #ifndef NDEBUG
152  this->log("wait()");
153  #endif
154  for (std::thread& thr : this->_threads) {
155  if (thr.joinable()) {
156  thr.join();
157  }
158  }
159  }
160 
161  inline unsigned
162  concurrency() const noexcept {
163  return this->_threads.size();
164  }
165 
166  protected:
167 
168  inline void
169  xstop() {
170  this->setstate(pipeline_state::stopped);
171  }
172 
173  virtual void
174  do_run() = 0;
175 
176  virtual void
177  run(Thread_context* context) {
178  if (context) {
179  Thread_context_guard lock(*context);
180  context->register_thread();
181  }
182  #ifndef NDEBUG
183  this->log("start");
184  #endif
185  this->do_run();
186  if (context) {
187  this->collect_kernels(*context);
188  }
189  }
190 
191  private:
192 
193  template<class It>
194  void
195  collect_kernels(It sack) {
196  using namespace std::placeholders;
197  std::for_each(
198  queue_popper(this->_kernels),
199  queue_popper(),
200  [sack] (kernel_type* rhs) { rhs->mark_as_deleted(sack); }
201  );
202  }
203 
204  void
205  collect_kernels(Thread_context& context) {
206  // Recursively collect kernel pointers to the sack
207  // and delete them all at once. Collection process
208  // is fully serial to prevent multiple deletions
209  // and access to unitialised values.
210  Thread_context_guard lock(context);
211  //std::clog << "global_barrier #1" << std::endl;
212  context.wait(lock);
213  //std::clog << "global_barrier #1 end" << std::endl;
214  kernel_sack sack;
215  this->collect_kernels(std::back_inserter(sack));
216  // simple barrier for all threads participating in deletion
217  //std::cout << "global_barrier #2" << std::endl;
218  //context.global_barrier(lock);
219  //std::cout << "global_barrier #2 end" << std::endl;
220  // destructors of scoped variables
221  // will destroy all kernels automatically
222  }
223 
224  };
225 
226 }
227 #endif // vim:filetype=cpp
Definition: basic_pipeline.hh:40
Definition: pipeline_base.hh:17
Definition: thread_context.hh:10
Input iterator that removes element from the container on increment.
Definition: queue_popper.hh:20