Subordination
A framework for distributed programming
basic_socket_pipeline.hh
1 #ifndef SUBORDINATION_PPL_BASIC_SOCKET_PIPELINE_HH
2 #define SUBORDINATION_PPL_BASIC_SOCKET_PIPELINE_HH
3 
4 #include <algorithm>
5 #include <cassert>
6 #include <memory>
7 #include <mutex>
8 #include <queue>
9 #include <thread>
10 #include <unordered_map>
11 #include <vector>
12 
13 #include <unistdx/base/log_message>
14 //#include <unistdx/base/recursive_spin_mutex>
15 //#include <unistdx/base/simple_lock>
16 //#include <unistdx/base/spin_mutex>
17 #include <unistdx/io/fildesbuf>
18 #include <unistdx/io/poller>
19 #include <unistdx/net/pstream>
20 
21 #include <subordination/base/container_traits.hh>
22 #include <subordination/base/static_lock.hh>
23 #include <subordination/kernel/kstream.hh>
24 #include <subordination/ppl/basic_handler.hh>
25 #include <subordination/ppl/basic_pipeline.hh>
26 
27 namespace sbn {
28 
29  template<class T,
30  class Kernels=std::queue<T*>,
31  class Traits=queue_traits<Kernels>,
32  class Threads=std::vector<std::thread>>
33  using Proxy_pipeline_base = basic_pipeline<T, Kernels, Traits, Threads,
35  std::unique_lock<std::
36  recursive_mutex>,
37 // sys::recursive_spin_mutex, sys::simple_lock<sys::recursive_spin_mutex>,
38  sys::event_poller>;
39 
40  template<class T>
42 
43  public:
48  typedef typename handler_container_type::const_iterator
49  handler_const_iterator;
50  typedef typename handler_type::clock_type clock_type;
51  typedef typename handler_type::time_point time_point;
52  typedef typename handler_type::duration duration;
53 
55  using typename base_pipeline::kernel_type;
56  using typename base_pipeline::mutex_type;
57  using typename base_pipeline::lock_type;
58  using typename base_pipeline::sem_type;
59  using typename base_pipeline::kernel_pool;
60 
61  private:
63 
64  private:
65  mutex_type* _othermutex = nullptr;
66 
67  protected:
68  handler_container_type _handlers;
69  duration _start_timeout = duration::zero();
70 
71  public:
72 
74  base_pipeline(std::move(rhs)),
75  _start_timeout(rhs._start_timeout) { }
76 
78  base_pipeline(1u) {
79  this->emplace_notify_handler(std::make_shared<basic_handler>());
80  }
81 
82  ~basic_socket_pipeline() = default;
83 
85 
87  operator=(const basic_socket_pipeline&) = delete;
88 
89  inline void
90  set_other_mutex(mutex_type* rhs) noexcept {
91  this->_othermutex = rhs;
92  }
93 
94  inline mutex_type*
95  other_mutex() noexcept {
96  return this->_othermutex;
97  }
98 
99  inline mutex_type*
100  mutex() noexcept {
101  return &this->_mutex;
102  }
103 
104  protected:
105 
106  inline sem_type&
107  poller() noexcept {
108  return this->_semaphore;
109  }
110 
111  inline const sem_type&
112  poller() const noexcept {
113  return this->_semaphore;
114  }
115 
116  void
117  emplace_handler(const sys::epoll_event& ev, const event_handler_ptr& ptr) {
118  // N.B. we have two file descriptors (for the pipe)
119  // in the process handler, so do not use emplace here
120  this->log("add _, ev=_", *ptr, ev);
121  this->_handlers[ev.fd()] = ptr;
122  this->poller().insert(ev);
123  }
124 
125  template <class X>
126  void
127  emplace_handler(const sys::epoll_event& ev, const std::shared_ptr<X>& ptr) {
128  this->emplace_handler(ev, std::static_pointer_cast<handler_type>(ptr));
129  }
130 
131  void
132  emplace_notify_handler(const event_handler_ptr& ptr) {
133  sys::fd_type fd = this->poller().pipe_in();
134  this->log("add _", *ptr);
135  this->_handlers.emplace(fd, ptr);
136  }
137 
138  template <class X>
139  void
140  emplace_notify_handler(const std::shared_ptr<X>& ptr) {
141  this->emplace_notify_handler(
142  std::static_pointer_cast<handler_type>(ptr)
143  );
144  }
145 
146  void
147  do_run() override {
148  static_lock_type lock(&this->_mutex, this->_othermutex);
149  while (!this->has_stopped()) {
150  bool timeout = false;
151  if (this->_start_timeout > duration::zero()) {
152  handler_const_iterator result =
153  this->handler_with_min_start_time_point();
154  if (result != this->_handlers.end()) {
155  timeout = true;
156  const time_point tp = result->second->start_time_point()
157  + this->_start_timeout;
158  this->poller().wait_until(lock, tp);
159  }
160  }
161  auto process = [this,timeout] () {
162  this->process_kernels();
163  this->handle_events();
164  this->flush_buffers(timeout);
165  return this->has_stopped();
166  };
167  if (timeout) {
168  process();
169  } else {
170  this->poller().wait(lock, process);
171  /*
172  this->poller().wait(
173  lock,
174  [this] () { return this->has_stopped(); }
175  );*/
176  }
177  }
178  }
179 
180  void
181  run(Thread_context*) override {
182  do_run();
187  }
188 
189  inline void
190  set_start_timeout(const duration& rhs) noexcept {
191  this->_start_timeout = rhs;
192  }
193 
194  virtual void
195  process_kernels() = 0;
196 
197  private:
198 
199  void
200  flush_buffers(bool timeout) {
201  const time_point now = timeout
202  ? clock_type::now()
203  : time_point(duration::zero());
204  handler_const_iterator first = this->_handlers.begin();
205  handler_const_iterator last = this->_handlers.end();
206  while (first != last) {
207  handler_type& h = *first->second;
208  if (h.has_stopped() || (timeout && this->is_timed_out(
209  h,
210  now
211  ))) {
212  this->log("remove _ (_)", h, h.has_stopped() ? "stop" : "timeout");
213  h.remove(this->poller());
214  first = this->_handlers.erase(first);
215  } else {
216  first->second->flush();
217  ++first;
218  }
219  }
220  }
221 
222  handler_const_iterator
223  handler_with_min_start_time_point() const noexcept {
224  handler_const_iterator first = this->_handlers.begin();
225  handler_const_iterator last = this->_handlers.end();
226  handler_const_iterator result = last;
227  while (first != last) {
228  handler_type& h = *first->second;
229  if (h.is_starting() && h.has_start_time_point()) {
230  if (result == last) {
231  result = first;
232  } else {
233  const auto old_t = result->second->start_time_point();
234  const auto new_t = h.start_time_point();
235  if (new_t < old_t) {
236  result = first;
237  }
238  }
239  }
240  ++first;
241  }
242  if (result != last) {
243  this->log("min _", *result->second);
244  }
245  return result;
246  }
247 
248  void
249  handle_events() {
250  for (const sys::epoll_event& ev : this->poller()) {
251  auto result = this->_handlers.find(ev.fd());
252  if (result == this->_handlers.end()) {
253  this->log("unable to process fd _", ev.fd());
254  } else {
255  handler_type& h = *result->second;
256  // process event by calling event handler function
257  try {
258  h.handle(ev);
259  } catch (const std::exception& err) {
260  this->log("failed to process fd _: _", ev.fd(), err.what());
261  }
262  if (!ev) {
263  this->log("remove _ (bad event _)", h, ev);
264  h.remove(this->poller());
265  this->_handlers.erase(result);
266  }
267  }
268  }
269  }
270 
271  bool
272  is_timed_out(const handler_type& rhs, const time_point& now) {
273  return rhs.is_starting() &&
274  rhs.start_time_point() + this->_start_timeout <= now;
275  }
276 
277  };
278 
279 }
280 
281 #endif // vim:filetype=cpp
Definition: basic_pipeline.hh:40
Definition: basic_handler.hh:13
T end(T... args)
Definition: basic_socket_pipeline.hh:41
T what(T... args)
T erase(T... args)
void run(Thread_context *) override
Definition: basic_socket_pipeline.hh:181
Definition: thread_context.hh:10
Definition: static_lock.hh:7
T find(T... args)
T begin(T... args)
T emplace(T... args)