Subordination
A framework for distributed programming
kernel_protocol.hh
1 #ifndef SUBORDINATION_PPL_KERNEL_PROTOCOL_HH
2 #define SUBORDINATION_PPL_KERNEL_PROTOCOL_HH
3 
4 #include <algorithm>
5 #include <deque>
6 #include <memory>
7 
8 #include <unistdx/base/delete_each>
9 #include <unistdx/ipc/process>
10 
11 #include <subordination/base/queue_popper.hh>
12 #include <subordination/kernel/foreign_kernel.hh>
13 #include <subordination/kernel/kernel_header.hh>
14 #include <subordination/kernel/kernel_instance_registry.hh>
15 #include <subordination/kernel/kstream.hh>
16 #include <subordination/ppl/application.hh>
17 #include <subordination/ppl/kernel_proto_flag.hh>
18 
19 namespace sbn {
20 
21  template <
22  class T,
23  class Router,
24  class Forward=bits::no_forward<Router>,
25  class Kernels=std::deque<T*>,
26  class Traits=deque_traits<Kernels>>
28 
29  public:
30  typedef T kernel_type;
31  typedef Router router_type;
32  typedef Forward forward_type;
33  typedef Kernels pool_type;
34  typedef Traits traits_type;
35 
36  private:
37  typedef kstream<T> stream_type;
39  typedef typename T::id_type id_type;
40  typedef typename stream_type::ipacket_guard ipacket_guard;
41  typedef sys::opacket_guard<stream_type> opacket_guard;
43  typedef typename pool_type::iterator kernel_iterator;
44 
45  private:
48  sys::socket_address _endpoint;
50  application_type _thisapp = this_application::get_id();
52  const application* _otheraptr = 0;
53  pool_type _upstream;
54  pool_type _downstream;
55  forward_type _forward;
56  id_type _counter = 0;
57  const char* _name = "proto";
58 
59  public:
60 
61  kernel_protocol() = default;
62  kernel_protocol(kernel_protocol&&) = default;
63 
64  kernel_protocol(const kernel_protocol&) = delete;
65  kernel_protocol& operator=(const kernel_protocol&) = delete;
66  kernel_protocol& operator=(kernel_protocol&&) = delete;
67 
68  ~kernel_protocol() {
69  sys::delete_each(queue_popper(this->_upstream), queue_popper());
70  sys::delete_each(queue_popper(this->_downstream), queue_popper());
71  }
72 
73  void
74  send(kernel_type* k, stream_type& stream) {
75  // return local downstream kernels immediately
76  // TODO we need to move some kernel flags to
77  // kernel header in order to use them in routing
78  if (k->moves_downstream() && !k->to()) {
79  if (k->isset(kernel_flag::parent_is_id) || k->carries_parent()) {
80  if (k->carries_parent()) {
81  delete k->parent();
82  }
83  this->plug_parent(k);
84  }
85  #ifndef NDEBUG
86  this->log("send local kernel _", *k);
87  #endif
88  router_type::send_local(k);
89  return;
90  }
91  bool delete_kernel = this->save_kernel(k);
92  #ifndef NDEBUG
93  this->log("send _ to _", *k, this->_endpoint);
94  #endif
95  this->write_kernel(k, stream);
98  if (delete_kernel) {
99  if (k->moves_downstream() && k->carries_parent()) {
100  delete k->parent();
101  }
102  delete k;
103  }
104  }
105 
106  void
107  forward(foreign_kernel* k, stream_type& ostr) {
108  bool delete_kernel = this->save_kernel(k);
109  ostr.begin_packet();
110  ostr << k->header();
111  ostr << *k;
112  ostr.end_packet();
113  if (delete_kernel) {
114  delete k;
115  }
116  }
117 
118  void
119  receive_kernels(stream_type& stream) noexcept {
120  while (stream.read_packet()) {
121  try {
122  if (kernel_type* k = this->read_kernel(stream)) {
123  bool ok = this->receive_kernel(k);
124  if (!ok) {
125  #ifndef NDEBUG
126  this->log("no principal found for _", *k);
127  #endif
128  k->principal(k->parent());
129  this->send(k, stream);
130  } else {
131  router_type::send_local(k);
132  }
133  }
134  } catch (const kernel_error& err) {
135  log_read_error(err);
136  } catch (const error& err) {
137  log_read_error(err);
138  } catch (const std::exception& err) {
139  log_read_error(err.what());
140  } catch (...) {
141  log_read_error("<unknown>");
142  }
143  }
144  }
145 
146  void
147  recover_kernels(bool down) {
148  #ifndef NDEBUG
149  this->log("recover kernels upstream _ downstream _",
150  this->_upstream.size(), this->_downstream.size());
151  #endif
152  this->do_recover_kernels(this->_upstream);
153  if (down) {
154  this->do_recover_kernels(this->_downstream);
155  }
156  }
157 
158  private:
159 
160  // send {{{
161  void
162  write_kernel(kernel_type* k, stream_type& stream) noexcept {
163  try {
164  opacket_guard g(stream);
165  stream.begin_packet();
166  this->do_write_kernel(*k, stream);
167  stream.end_packet();
168  } catch (const kernel_error& err) {
169  log_write_error(err);
170  } catch (const error& err) {
171  log_write_error(err);
172  } catch (const std::exception& err) {
173  log_write_error(err.what());
174  } catch (...) {
175  log_write_error("<unknown>");
176  }
177  }
178 
179  void
180  do_write_kernel(kernel_type& k, stream_type& stream) {
181  if (this->has_src_and_dest()) {
182  k.header().prepend_source_and_destination();
183  }
184  stream << k.header();
185  stream << k;
186  }
187 
188  bool
189  kernel_goes_in_upstream_buffer(const kernel_type* rhs) noexcept {
190  return this->saves_upstream_kernels() &&
191  (rhs->moves_upstream() || rhs->moves_somewhere());
192  }
193 
194  bool
195  kernel_goes_in_downstream_buffer(const kernel_type* rhs) noexcept {
196  return this->saves_downstream_kernels() &&
197  rhs->moves_downstream() &&
198  rhs->carries_parent();
199  }
200  // }}}
201 
202  // receive {{{
203  kernel_type*
204  read_kernel(stream_type& stream) {
205  // eats remaining bytes on exception
206  ipacket_guard g(stream.rdbuf());
207  foreign_kernel* hdr = new foreign_kernel;
208  kernel_type* k = nullptr;
209  stream >> hdr->header();
210  if (this->has_other_application()) {
211  hdr->setapp(this->other_application_id());
212  hdr->aptr(this->_otheraptr);
213  }
214  if (this->_endpoint) {
215  hdr->from(this->_endpoint);
216  hdr->prepend_source_and_destination();
217  }
218  #ifndef NDEBUG
219  this->log("recv _", hdr->header());
220  #endif
221  if (hdr->app() != this->_thisapp) {
222  stream >> *hdr;
223  this->_forward(hdr);
224  } else {
225  stream >> k;
226  k->setapp(hdr->app());
227  if (hdr->has_source_and_destination()) {
228  k->from(hdr->from());
229  k->to(hdr->to());
230  } else {
231  k->from(this->_endpoint);
232  }
233  if (k->carries_parent()) {
234  k->parent()->setapp(hdr->app());
235  }
236  delete hdr;
237  }
238  return k;
239  }
240 
241  bool
242  receive_kernel(kernel_type* k) {
243  bool ok = true;
244  if (k->moves_downstream()) {
245  this->plug_parent(k);
246  } else if (k->principal_id()) {
247  instances_guard g(instances);
248  auto result = instances.find(k->principal_id());
249  if (result == instances.end()) {
250  k->return_code(exit_code::no_principal_found);
251  ok = false;
252  }
253  k->principal(result->second);
254  }
255  #ifndef NDEBUG
256  this->log("recv _", *k);
257  #endif
258  return ok;
259  }
260 
261  void
262  plug_parent(kernel_type* k) {
263  if (!k->has_id()) {
264  throw std::invalid_argument("downstream kernel without an id");
265  }
266  kernel_iterator pos = this->find_kernel(k, this->_upstream);
267  if (pos == this->_upstream.end()) {
268  if (k->carries_parent()) {
269  k->principal(k->parent());
270  this->log("recover parent for _", *k);
271  kernel_iterator result2 =
272  this->find_kernel(k, this->_downstream);
273  if (result2 != this->_downstream.end()) {
274  kernel_type* old = *result2;
275  this->log("delete _", *old);
276  delete old->parent();
277  delete old;
278  this->_downstream.erase(result2);
279  }
280  } else {
281  this->log("parent not found for _", *k);
282  delete k;
283  throw std::invalid_argument("parent not found");
284  }
285  } else {
286  kernel_type* orig = *pos;
287  k->parent(orig->parent());
288  k->principal(k->parent());
289  delete orig;
290  this->_upstream.erase(pos);
291  #ifndef NDEBUG
292  this->log("plug parent for _", *k);
293  #endif
294  }
295  }
296 
297  kernel_iterator
298  find_kernel(kernel_type* k, pool_type& pool) {
299  return std::find_if(
300  pool.begin(),
301  pool.end(),
302  [k] (kernel_type* rhs) { return rhs->id() == k->id(); }
303  );
304  }
305  // }}}
306 
307  // recover {{{
308  bool
309  save_kernel(kernel_type* k) {
310  bool delete_kernel = false;
311  if (kernel_goes_in_upstream_buffer(k)) {
312  if (k->is_native()) {
313  this->ensure_has_id(k->parent());
314  this->ensure_has_id(k);
315  }
316  #ifndef NDEBUG
317  this->log("save parent for _", *k);
318  #endif
319  traits_type::push(this->_upstream, k);
320  } else
321  if (kernel_goes_in_downstream_buffer(k)) {
322  #ifndef NDEBUG
323  this->log("save parent for _", *k);
324  #endif
325  traits_type::push(this->_downstream, k);
326  } else
327  if (!k->moves_everywhere()) {
328  delete_kernel = true;
329  }
330  return delete_kernel;
331  }
332 
333  void
334  do_recover_kernels(pool_type& rhs) noexcept {
335  using namespace std::placeholders;
337  queue_popper(rhs),
338  queue_popper(rhs),
339  [this] (kernel_type* rhs) {
340  try {
341  this->recover_kernel(rhs);
342  } catch (const std::exception& err) {
343  this->log("failed to recover kernel _", *rhs);
344  delete rhs;
345  }
346  }
347  );
348  }
349 
350  void
351  recover_kernel(kernel_type* k) {
352  #ifndef NDEBUG
353  this->log("try to recover _", k->id());
354  #endif
355  const bool native = k->is_native();
356  if (k->moves_upstream() && !k->to()) {
357  #ifndef NDEBUG
358  this->log("recover _", *k);
359  #endif
360  if (native) {
361  router_type::send_remote(k);
362  } else {
363  router_type::forward_parent(dynamic_cast<foreign_kernel*>(k));
364  }
365  } else if (k->moves_somewhere() || (k->moves_upstream() && k->to())) {
366  #ifndef NDEBUG
367  this->log("destination is unreachable for _", *k);
368  #endif
369  k->from(k->to());
370  k->return_code(exit_code::endpoint_not_connected);
371  k->principal(k->parent());
372  if (native) {
373  router_type::send_local(k);
374  } else {
375  this->_forward(dynamic_cast<foreign_kernel*>(k));
376  }
377  } else if (k->moves_downstream() && k->carries_parent()) {
378  #ifndef NDEBUG
379  this->log("restore parent _", *k);
380  #endif
381  if (native) {
382  router_type::send_local(k);
383  } else {
384  this->_forward(dynamic_cast<foreign_kernel*>(k));
385  }
386  } else {
387  this->log("bad kernel in sent buffer: _", *k);
388  delete k;
389  }
390  }
391  // }}}
392 
393  void
394  ensure_has_id(kernel_type* k) {
395  if (!k->has_id()) {
396  k->id(this->generate_id());
397  }
398  }
399 
400  id_type
401  generate_id() noexcept {
402  return ++this->_counter;
403  }
404 
405  template <class E>
406  void
407  log_write_error(const E& err) {
408  this->log("write error _", err);
409  }
410 
411  template <class E>
412  void
413  log_read_error(const E& err) {
414  this->log("read error _", err);
415  }
416 
417  template <class ... Args>
418  inline void
419  log(const Args& ... args) {
420  sys::log_message(this->_name, args ...);
421  }
422 
423  public:
424 
425  inline void
426  set_name(const char* rhs) noexcept {
427  this->_name = rhs;
428  }
429 
430  inline void
431  setf(kernel_proto_flag rhs) noexcept {
432  this->_flags |= rhs;
433  }
434 
435  inline void
436  unsetf(kernel_proto_flag rhs) noexcept {
437  this->_flags &= ~rhs;
438  }
439 
440  inline kernel_proto_flag
441  flags() const noexcept {
442  return this->_flags;
443  }
444 
445  inline bool
446  has_src_and_dest() const noexcept {
447  return this->_flags &
448  kernel_proto_flag::prepend_source_and_destination;
449  }
450 
451  inline bool
452  prepends_application() const noexcept {
453  return this->_flags & kernel_proto_flag::prepend_application;
454  }
455 
456  inline bool
457  saves_upstream_kernels() const noexcept {
458  return this->_flags & kernel_proto_flag::save_upstream_kernels;
459  }
460 
461  inline bool
462  saves_downstream_kernels() const noexcept {
463  return this->_flags & kernel_proto_flag::save_downstream_kernels;
464  }
465 
466  inline bool
467  has_other_application() const noexcept {
468  return this->_otheraptr;
469  }
470 
471  inline void
472  set_other_application(const application* rhs) noexcept {
473  this->_otheraptr = rhs;
474  }
475 
476  inline application_type
477  other_application_id() const noexcept {
478  return this->_otheraptr->id();
479  }
480 
481  inline void
482  set_endpoint(const sys::socket_address& rhs) noexcept {
483  this->_endpoint = rhs;
484  }
485 
486  inline const sys::socket_address&
487  socket_address() const noexcept {
488  return this->_endpoint;
489  }
490 
491  };
492 
493 }
494 
495 #endif // vim:filetype=cpp
T log(T... args)
Definition: application.hh:25
T what(T... args)
Definition: kstream.hh:67
void send(kernel_type *k, stream_type &stream)
Definition: kernel_protocol.hh:74
T find_if(T... args)
Definition: foreign_kernel.hh:9
Definition: kernel_protocol.hh:27
Input iterator that removes element from the container on increment.
Definition: queue_popper.hh:20
T for_each(T... args)
Definition: kernel_proto_flag.hh:6