1 #ifndef SUBORDINATION_PPL_KERNEL_PROTOCOL_HH 2 #define SUBORDINATION_PPL_KERNEL_PROTOCOL_HH 8 #include <unistdx/base/delete_each> 9 #include <unistdx/ipc/process> 11 #include <subordination/base/queue_popper.hh> 12 #include <subordination/kernel/foreign_kernel.hh> 13 #include <subordination/kernel/kernel_header.hh> 14 #include <subordination/kernel/kernel_instance_registry.hh> 15 #include <subordination/kernel/kstream.hh> 16 #include <subordination/ppl/application.hh> 17 #include <subordination/ppl/kernel_proto_flag.hh> 24 class Forward=bits::no_forward<Router>,
26 class Traits=deque_traits<Kernels>>
30 typedef T kernel_type;
31 typedef Router router_type;
32 typedef Forward forward_type;
33 typedef Kernels pool_type;
34 typedef Traits traits_type;
39 typedef typename T::id_type id_type;
40 typedef typename stream_type::ipacket_guard ipacket_guard;
41 typedef sys::opacket_guard<stream_type> opacket_guard;
43 typedef typename pool_type::iterator kernel_iterator;
48 sys::socket_address _endpoint;
50 application_type _thisapp = this_application::get_id();
54 pool_type _downstream;
55 forward_type _forward;
57 const char* _name =
"proto";
78 if (k->moves_downstream() && !k->to()) {
79 if (k->isset(kernel_flag::parent_is_id) || k->carries_parent()) {
80 if (k->carries_parent()) {
86 this->log(
"send local kernel _", *k);
88 router_type::send_local(k);
91 bool delete_kernel = this->save_kernel(k);
93 this->log(
"send _ to _", *k, this->_endpoint);
95 this->write_kernel(k, stream);
99 if (k->moves_downstream() && k->carries_parent()) {
108 bool delete_kernel = this->save_kernel(k);
119 receive_kernels(stream_type& stream) noexcept {
120 while (stream.read_packet()) {
122 if (kernel_type* k = this->read_kernel(stream)) {
123 bool ok = this->receive_kernel(k);
126 this->log(
"no principal found for _", *k);
128 k->principal(k->parent());
129 this->
send(k, stream);
131 router_type::send_local(k);
134 }
catch (
const kernel_error& err) {
136 }
catch (
const error& err) {
139 log_read_error(err.
what());
141 log_read_error(
"<unknown>");
147 recover_kernels(
bool down) {
149 this->log(
"recover kernels upstream _ downstream _",
150 this->_upstream.size(), this->_downstream.size());
152 this->do_recover_kernels(this->_upstream);
154 this->do_recover_kernels(this->_downstream);
162 write_kernel(kernel_type* k, stream_type& stream) noexcept {
164 opacket_guard g(stream);
165 stream.begin_packet();
166 this->do_write_kernel(*k, stream);
168 }
catch (
const kernel_error& err) {
169 log_write_error(err);
170 }
catch (
const error& err) {
171 log_write_error(err);
173 log_write_error(err.
what());
175 log_write_error(
"<unknown>");
180 do_write_kernel(kernel_type& k, stream_type& stream) {
181 if (this->has_src_and_dest()) {
182 k.header().prepend_source_and_destination();
184 stream << k.header();
189 kernel_goes_in_upstream_buffer(
const kernel_type* rhs) noexcept {
190 return this->saves_upstream_kernels() &&
191 (rhs->moves_upstream() || rhs->moves_somewhere());
195 kernel_goes_in_downstream_buffer(
const kernel_type* rhs) noexcept {
196 return this->saves_downstream_kernels() &&
197 rhs->moves_downstream() &&
198 rhs->carries_parent();
204 read_kernel(stream_type& stream) {
206 ipacket_guard g(stream.rdbuf());
207 foreign_kernel* hdr =
new foreign_kernel;
208 kernel_type* k =
nullptr;
209 stream >> hdr->header();
210 if (this->has_other_application()) {
211 hdr->setapp(this->other_application_id());
212 hdr->aptr(this->_otheraptr);
214 if (this->_endpoint) {
215 hdr->from(this->_endpoint);
216 hdr->prepend_source_and_destination();
219 this->log(
"recv _", hdr->header());
221 if (hdr->app() != this->_thisapp) {
226 k->setapp(hdr->app());
227 if (hdr->has_source_and_destination()) {
228 k->from(hdr->from());
231 k->from(this->_endpoint);
233 if (k->carries_parent()) {
234 k->parent()->setapp(hdr->app());
242 receive_kernel(kernel_type* k) {
244 if (k->moves_downstream()) {
245 this->plug_parent(k);
246 }
else if (k->principal_id()) {
247 instances_guard g(instances);
248 auto result = instances.find(k->principal_id());
249 if (result == instances.end()) {
250 k->return_code(exit_code::no_principal_found);
253 k->principal(result->second);
256 this->log(
"recv _", *k);
262 plug_parent(kernel_type* k) {
266 kernel_iterator pos = this->find_kernel(k, this->_upstream);
267 if (pos == this->_upstream.end()) {
268 if (k->carries_parent()) {
269 k->principal(k->parent());
270 this->log(
"recover parent for _", *k);
271 kernel_iterator result2 =
272 this->find_kernel(k, this->_downstream);
273 if (result2 != this->_downstream.end()) {
274 kernel_type* old = *result2;
275 this->log(
"delete _", *old);
276 delete old->parent();
278 this->_downstream.erase(result2);
281 this->log(
"parent not found for _", *k);
286 kernel_type* orig = *pos;
287 k->parent(orig->parent());
288 k->principal(k->parent());
290 this->_upstream.erase(pos);
292 this->log(
"plug parent for _", *k);
298 find_kernel(kernel_type* k, pool_type& pool) {
302 [k] (kernel_type* rhs) {
return rhs->id() == k->id(); }
309 save_kernel(kernel_type* k) {
310 bool delete_kernel =
false;
311 if (kernel_goes_in_upstream_buffer(k)) {
312 if (k->is_native()) {
313 this->ensure_has_id(k->parent());
314 this->ensure_has_id(k);
317 this->log(
"save parent for _", *k);
319 traits_type::push(this->_upstream, k);
321 if (kernel_goes_in_downstream_buffer(k)) {
323 this->log(
"save parent for _", *k);
325 traits_type::push(this->_downstream, k);
327 if (!k->moves_everywhere()) {
328 delete_kernel =
true;
330 return delete_kernel;
334 do_recover_kernels(pool_type& rhs) noexcept {
339 [
this] (kernel_type* rhs) {
341 this->recover_kernel(rhs);
343 this->
log(
"failed to recover kernel _", *rhs);
351 recover_kernel(kernel_type* k) {
353 this->log(
"try to recover _", k->id());
355 const bool native = k->is_native();
356 if (k->moves_upstream() && !k->to()) {
358 this->log(
"recover _", *k);
361 router_type::send_remote(k);
363 router_type::forward_parent(dynamic_cast<foreign_kernel*>(k));
365 }
else if (k->moves_somewhere() || (k->moves_upstream() && k->to())) {
367 this->log(
"destination is unreachable for _", *k);
370 k->return_code(exit_code::endpoint_not_connected);
371 k->principal(k->parent());
373 router_type::send_local(k);
375 this->_forward(dynamic_cast<foreign_kernel*>(k));
377 }
else if (k->moves_downstream() && k->carries_parent()) {
379 this->log(
"restore parent _", *k);
382 router_type::send_local(k);
384 this->_forward(dynamic_cast<foreign_kernel*>(k));
387 this->log(
"bad kernel in sent buffer: _", *k);
394 ensure_has_id(kernel_type* k) {
396 k->id(this->generate_id());
401 generate_id() noexcept {
402 return ++this->_counter;
407 log_write_error(
const E& err) {
408 this->log(
"write error _", err);
413 log_read_error(
const E& err) {
414 this->log(
"read error _", err);
417 template <
class ... Args>
419 log(
const Args& ... args) {
420 sys::log_message(this->_name, args ...);
426 set_name(
const char* rhs) noexcept {
431 setf(kernel_proto_flag rhs) noexcept {
436 unsetf(kernel_proto_flag rhs) noexcept {
437 this->_flags &= ~rhs;
440 inline kernel_proto_flag
441 flags() const noexcept {
446 has_src_and_dest() const noexcept {
447 return this->_flags &
448 kernel_proto_flag::prepend_source_and_destination;
452 prepends_application() const noexcept {
453 return this->_flags & kernel_proto_flag::prepend_application;
457 saves_upstream_kernels() const noexcept {
458 return this->_flags & kernel_proto_flag::save_upstream_kernels;
462 saves_downstream_kernels() const noexcept {
463 return this->_flags & kernel_proto_flag::save_downstream_kernels;
467 has_other_application() const noexcept {
468 return this->_otheraptr;
472 set_other_application(
const application* rhs) noexcept {
473 this->_otheraptr = rhs;
476 inline application_type
477 other_application_id() const noexcept {
478 return this->_otheraptr->id();
482 set_endpoint(
const sys::socket_address& rhs) noexcept {
483 this->_endpoint = rhs;
486 inline const sys::socket_address&
487 socket_address() const noexcept {
488 return this->_endpoint;
495 #endif // vim:filetype=cpp
Definition: application.hh:25
Definition: kstream.hh:67
void send(kernel_type *k, stream_type &stream)
Definition: kernel_protocol.hh:74
Definition: foreign_kernel.hh:9
Definition: kernel_protocol.hh:27
Input iterator that removes element from the container on increment.
Definition: queue_popper.hh:20
Definition: kernel_proto_flag.hh:6