Subordination
A framework for distributed programming
kstream.hh
1 #ifndef SUBORDINATION_KERNEL_KSTREAM_HH
2 #define SUBORDINATION_KERNEL_KSTREAM_HH
3 
4 #include <cassert>
5 
6 #include <unistdx/base/log_message>
7 #include <unistdx/net/socket_address>
8 #include <unistdx/net/pstream>
9 
10 #include <subordination/base/error.hh>
11 #include <subordination/kernel/foreign_kernel.hh>
12 #include <subordination/kernel/kernel_error.hh>
13 #include <subordination/kernel/kernel_type_registry.hh>
14 #include <subordination/kernel/kernelbuf.hh>
15 #include <subordination/ppl/kernel_proto_flag.hh>
16 
17 namespace sbn {
18 
19  namespace bits {
20 
21  template <class Router>
22  struct no_forward: public Router {
23  void
24  operator()(foreign_kernel*) {
25  assert(false);
26  }
27  };
28 
29  template <class Router>
30  struct forward_to_child: public Router {
31  void
32  operator()(foreign_kernel* hdr) {
33  this->forward_child(hdr);
34  }
35  };
36 
37  template <class Router>
38  struct forward_to_parent: public Router {
39  void
40  operator()(foreign_kernel* hdr) {
41  this->forward_parent(hdr);
42  }
43  };
44 
45  template <class T>
46  struct no_router {
47 
48  void
49  send_local(T* rhs) {}
50 
51  void
52  send_remote(T*) {}
53 
54  void
55  forward(foreign_kernel*) {}
56 
57  void
58  forward_child(foreign_kernel*) {}
59 
60  void
61  forward_parent(foreign_kernel*) {}
62 
63  };
64  }
65 
66  template<class T>
67  struct kstream: public sys::pstream {
68 
69  typedef T kernel_type;
70 
71  using sys::pstream::operator<<;
72  using sys::pstream::operator>>;
73 
74  kstream() = default;
75  inline explicit
76  kstream(sys::packetbuf* buf): sys::pstream(buf) {}
77  kstream(kstream&&) = default;
78 
79  inline kstream&
80  operator<<(foreign_kernel* k) {
81  return operator<<(*k);
82  }
83 
84  inline kstream&
85  operator<<(foreign_kernel& k) {
86  this->write_foreign(k);
87  return *this;
88  }
89 
90  inline kstream&
91  operator<<(kernel_type* k) {
92  return operator<<(*k);
93  }
94 
95  kstream&
96  operator<<(kernel_type& k) {
97  this->write_native(k);
98  if (k.carries_parent()) {
99  // embed parent into the packet
100  kernel_type* parent = k.parent();
101  if (!parent) {
102  throw std::invalid_argument("parent is null");
103  }
104  this->write_native(*parent);
105  }
106  return *this;
107  }
108 
109  kstream&
110  operator>>(kernel_type*& k) {
111  k = this->read_native();
112  if (k->carries_parent()) {
113  kernel_type* parent = this->read_native();
114  k->parent(parent);
115  }
116  return *this;
117  }
118 
119  kstream&
120  operator>>(foreign_kernel& k) {
121  this->read_foreign(k);
122  return *this;
123  }
124 
125  private:
126 
127  inline void
128  write_native(kernel_type& k) {
129  auto type = types.find(typeid(k));
130  if (type == types.end()) {
131  throw std::invalid_argument("kernel type is null");
132  }
133  *this << type->id();
134  k.write(*this);
135  }
136 
137  inline kernel_type*
138  read_native() {
139  return types.read_object(*this);
140  }
141 
142  inline void
143  write_foreign(foreign_kernel& k) {
144  k.write(*this);
145  }
146 
147  inline void
148  read_foreign(foreign_kernel& k) {
149  k.read(*this);
150  }
151 
152  };
153 
154 }
155 
156 #endif // vim:filetype=cpp
Definition: kstream.hh:67
Definition: foreign_kernel.hh:9
T forward(T... args)