1 #ifndef SUBORDINATION_PPL_KERNEL_PROTOCOL_HH     2 #define SUBORDINATION_PPL_KERNEL_PROTOCOL_HH     8 #include <unistdx/base/delete_each>     9 #include <unistdx/ipc/process>    11 #include <subordination/base/queue_popper.hh>    12 #include <subordination/kernel/foreign_kernel.hh>    13 #include <subordination/kernel/kernel_header.hh>    14 #include <subordination/kernel/kernel_instance_registry.hh>    15 #include <subordination/kernel/kstream.hh>    16 #include <subordination/ppl/application.hh>    17 #include <subordination/ppl/kernel_proto_flag.hh>    24         class Forward=bits::no_forward<Router>,
    26         class Traits=deque_traits<Kernels>>
    30         typedef T kernel_type;
    31         typedef Router router_type;
    32         typedef Forward forward_type;
    33         typedef Kernels pool_type;
    34         typedef Traits traits_type;
    39         typedef typename T::id_type id_type;
    40         typedef typename stream_type::ipacket_guard ipacket_guard;
    41         typedef sys::opacket_guard<stream_type> opacket_guard;
    43         typedef typename pool_type::iterator kernel_iterator;
    48         sys::socket_address _endpoint;
    50         application_type _thisapp = this_application::get_id();
    54         pool_type _downstream;
    55         forward_type _forward;
    57         const char* _name = 
"proto";
    78             if (k->moves_downstream() && !k->to()) {
    79                 if (k->isset(kernel_flag::parent_is_id) || k->carries_parent()) {
    80                     if (k->carries_parent()) {
    86                 this->log(
"send local kernel _", *k);
    88                 router_type::send_local(k);
    91             bool delete_kernel = this->save_kernel(k);
    93             this->log(
"send _ to _", *k, this->_endpoint);
    95             this->write_kernel(k, stream);
    99                 if (k->moves_downstream() && k->carries_parent()) {
   108             bool delete_kernel = this->save_kernel(k);
   119         receive_kernels(stream_type& stream) noexcept {
   120             while (stream.read_packet()) {
   122                     if (kernel_type* k = this->read_kernel(stream)) {
   123                         bool ok = this->receive_kernel(k);
   126                             this->log(
"no principal found for _", *k);
   128                             k->principal(k->parent());
   129                             this->
send(k, stream);
   131                             router_type::send_local(k);
   134                 } 
catch (
const kernel_error& err) {
   136                 } 
catch (
const error& err) {
   139                     log_read_error(err.
what());
   141                     log_read_error(
"<unknown>");
   147         recover_kernels(
bool down) {
   149             this->log(
"recover kernels upstream _ downstream _",
   150                       this->_upstream.size(), this->_downstream.size());
   152             this->do_recover_kernels(this->_upstream);
   154                 this->do_recover_kernels(this->_downstream);
   162         write_kernel(kernel_type* k, stream_type& stream) noexcept {
   164                 opacket_guard g(stream);
   165                 stream.begin_packet();
   166                 this->do_write_kernel(*k, stream);
   168             } 
catch (
const kernel_error& err) {
   169                 log_write_error(err);
   170             } 
catch (
const error& err) {
   171                 log_write_error(err);
   173                 log_write_error(err.
what());
   175                 log_write_error(
"<unknown>");
   180         do_write_kernel(kernel_type& k, stream_type& stream) {
   181             if (this->has_src_and_dest()) {
   182                 k.header().prepend_source_and_destination();
   184             stream << k.header();
   189         kernel_goes_in_upstream_buffer(
const kernel_type* rhs) noexcept {
   190             return this->saves_upstream_kernels() &&
   191                    (rhs->moves_upstream() || rhs->moves_somewhere());
   195         kernel_goes_in_downstream_buffer(
const kernel_type* rhs) noexcept {
   196             return this->saves_downstream_kernels() &&
   197                    rhs->moves_downstream() &&
   198                    rhs->carries_parent();
   204         read_kernel(stream_type& stream) {
   206             ipacket_guard g(stream.rdbuf());
   207             foreign_kernel* hdr = 
new foreign_kernel;
   208             kernel_type* k = 
nullptr;
   209             stream >> hdr->header();
   210             if (this->has_other_application()) {
   211                 hdr->setapp(this->other_application_id());
   212                 hdr->aptr(this->_otheraptr);
   214             if (this->_endpoint) {
   215                 hdr->from(this->_endpoint);
   216                 hdr->prepend_source_and_destination();
   219             this->log(
"recv _", hdr->header());
   221             if (hdr->app() != this->_thisapp) {
   226                 k->setapp(hdr->app());
   227                 if (hdr->has_source_and_destination()) {
   228                     k->from(hdr->from());
   231                     k->from(this->_endpoint);
   233                 if (k->carries_parent()) {
   234                     k->parent()->setapp(hdr->app());
   242         receive_kernel(kernel_type* k) {
   244             if (k->moves_downstream()) {
   245                 this->plug_parent(k);
   246             } 
else if (k->principal_id()) {
   247                 instances_guard g(instances);
   248                 auto result = instances.find(k->principal_id());
   249                 if (result == instances.end()) {
   250                     k->return_code(exit_code::no_principal_found);
   253                 k->principal(result->second);
   256             this->log(
"recv _", *k);
   262         plug_parent(kernel_type* k) {
   266             kernel_iterator pos = this->find_kernel(k, this->_upstream);
   267             if (pos == this->_upstream.end()) {
   268                 if (k->carries_parent()) {
   269                     k->principal(k->parent());
   270                     this->log(
"recover parent for _", *k);
   271                     kernel_iterator result2 =
   272                         this->find_kernel(k, this->_downstream);
   273                     if (result2 != this->_downstream.end()) {
   274                         kernel_type* old = *result2;
   275                         this->log(
"delete _", *old);
   276                         delete old->parent();
   278                         this->_downstream.erase(result2);
   281                     this->log(
"parent not found for _", *k);
   286                 kernel_type* orig = *pos;
   287                 k->parent(orig->parent());
   288                 k->principal(k->parent());
   290                 this->_upstream.erase(pos);
   292                 this->log(
"plug parent for _", *k);
   298         find_kernel(kernel_type* k, pool_type& pool) {
   302                 [k] (kernel_type* rhs) { 
return rhs->id() == k->id(); }
   309         save_kernel(kernel_type* k) {
   310             bool delete_kernel = 
false;
   311             if (kernel_goes_in_upstream_buffer(k)) {
   312                 if (k->is_native()) {
   313                     this->ensure_has_id(k->parent());
   314                     this->ensure_has_id(k);
   317                 this->log(
"save parent for _", *k);
   319                 traits_type::push(this->_upstream, k);
   321             if (kernel_goes_in_downstream_buffer(k)) {
   323                 this->log(
"save parent for _", *k);
   325                 traits_type::push(this->_downstream, k);
   327             if (!k->moves_everywhere()) {
   328                 delete_kernel = 
true;
   330             return delete_kernel;
   334         do_recover_kernels(pool_type& rhs) noexcept {
   339                 [
this] (kernel_type* rhs) {
   341                         this->recover_kernel(rhs);
   343                         this->
log(
"failed to recover kernel _", *rhs);
   351         recover_kernel(kernel_type* k) {
   353             this->log(
"try to recover _", k->id());
   355             const bool native = k->is_native();
   356             if (k->moves_upstream() && !k->to()) {
   358                 this->log(
"recover _", *k);
   361                     router_type::send_remote(k);
   363                     router_type::forward_parent(dynamic_cast<foreign_kernel*>(k));
   365             } 
else if (k->moves_somewhere() || (k->moves_upstream() && k->to())) {
   367                 this->log(
"destination is unreachable for _", *k);
   370                 k->return_code(exit_code::endpoint_not_connected);
   371                 k->principal(k->parent());
   373                     router_type::send_local(k);
   375                     this->_forward(dynamic_cast<foreign_kernel*>(k));
   377             } 
else if (k->moves_downstream() && k->carries_parent()) {
   379                 this->log(
"restore parent _", *k);
   382                     router_type::send_local(k);
   384                     this->_forward(dynamic_cast<foreign_kernel*>(k));
   387                 this->log(
"bad kernel in sent buffer: _", *k);
   394         ensure_has_id(kernel_type* k) {
   396                 k->id(this->generate_id());
   401         generate_id() noexcept {
   402             return ++this->_counter;
   407         log_write_error(
const E& err) {
   408             this->log(
"write error _", err);
   413         log_read_error(
const E& err) {
   414             this->log(
"read error _", err);
   417         template <
class ... Args>
   419         log(
const Args& ... args) {
   420             sys::log_message(this->_name, args ...);
   426         set_name(
const char* rhs) noexcept {
   431         setf(kernel_proto_flag rhs) noexcept {
   436         unsetf(kernel_proto_flag rhs) noexcept {
   437             this->_flags &= ~rhs;
   440         inline kernel_proto_flag
   441         flags() const noexcept {
   446         has_src_and_dest() const noexcept {
   447             return this->_flags &
   448                 kernel_proto_flag::prepend_source_and_destination;
   452         prepends_application() const noexcept {
   453             return this->_flags & kernel_proto_flag::prepend_application;
   457         saves_upstream_kernels() const noexcept {
   458             return this->_flags & kernel_proto_flag::save_upstream_kernels;
   462         saves_downstream_kernels() const noexcept {
   463             return this->_flags & kernel_proto_flag::save_downstream_kernels;
   467         has_other_application() const noexcept {
   468             return this->_otheraptr;
   472         set_other_application(
const application* rhs) noexcept {
   473             this->_otheraptr = rhs;
   476         inline application_type
   477         other_application_id() const noexcept {
   478             return this->_otheraptr->id();
   482         set_endpoint(
const sys::socket_address& rhs) noexcept {
   483             this->_endpoint = rhs;
   486         inline const sys::socket_address&
   487         socket_address() const noexcept {
   488             return this->_endpoint;
   495 #endif // vim:filetype=cpp 
Definition: application.hh:25
 
Definition: kstream.hh:67
 
void send(kernel_type *k, stream_type &stream)
Definition: kernel_protocol.hh:74
 
Definition: foreign_kernel.hh:9
 
Definition: kernel_protocol.hh:27
 
Input iterator that removes element from the container on increment.
Definition: queue_popper.hh:20
 
Definition: kernel_proto_flag.hh:6