LCOV - code coverage report
Current view: top level - runtime - msu_calls.c (source / functions) Hit Total Coverage
Test: unnamed Lines: 13 123 10.6 %
Date: 2018-01-11 Functions: 2 10 20.0 %

          Line data    Source code
       1             : /*
       2             : START OF LICENSE STUB
       3             :     DeDOS: Declarative Dispersion-Oriented Software
       4             :     Copyright (C) 2017 University of Pennsylvania, Georgetown University
       5             : 
       6             :     This program is free software: you can redistribute it and/or modify
       7             :     it under the terms of the GNU General Public License as published by
       8             :     the Free Software Foundation, either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     This program is distributed in the hope that it will be useful,
      12             :     but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :     GNU General Public License for more details.
      15             : 
      16             :     You should have received a copy of the GNU General Public License
      17             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             : END OF LICENSE STUB
      19             : */
      20             : /**
      21             :  * @file msu_calls.c
      22             :  * Defines methods used for calling MSUs from other MSUs
      23             :  */
      24             : 
      25             : #include "msu_calls.h"
      26             : #include "logging.h"
      27             : #include "thread_message.h"
      28             : #include "output_thread.h"
      29             : #include "routing_strategies.h"
      30             : #include "profiler.h"
      31             : 
      32             : /**
      33             :  * Calls type-specific MSU serialization function and enqueues data into main
      34             :  * thread queue so it can be forwarded to remote MSU
      35             :  * @param msg Message to be enqueued to the remote MSU
      36             :  * @param dst_type MSU type of the destination
      37             :  * @param dst The specific MSU destination
      38             :  * @return 0 on success, -1 on error
      39             :  */
      40           0 : static int enqueue_for_remote_send(struct msu_msg *msg,
      41             :                                    struct msu_type *dst_type,
      42             :                                    struct msu_endpoint *dst) {
      43             :     size_t size;
      44             :     // Is it okay that I malloc within the serialize fn but have to free here on error?
      45           0 :     void *serialized = serialize_msu_msg(msg, dst_type, &size);
      46           0 :     if (serialized == NULL) {
      47           0 :         log_error("Error serializing MSU msg for delivery to msu %d", dst->id);
      48           0 :         return -1;
      49             :     }
      50             : 
      51           0 :     struct thread_msg *thread_msg = init_send_thread_msg(dst->runtime_id,
      52           0 :                                                          dst->id,
      53             :                                                          size,
      54             :                                                          serialized);
      55             : 
      56           0 :     if (thread_msg == NULL) {
      57           0 :         log_error("Error creating thread message for sending MSU msg to msu %d",
      58             :                   dst->id);
      59           0 :         free(serialized);
      60           0 :         return -1;
      61             :     }
      62             : 
      63             :     PROFILE_EVENT(msg->hdr, PROF_REMOTE_SEND);
      64           0 :     int rtn = enqueue_for_output(thread_msg);
      65           0 :     if (rtn < 0) {
      66           0 :         log_error("Error enqueuing MSU msg for msu %u on main thread", dst->id);
      67           0 :         destroy_thread_msg(thread_msg);
      68           0 :         free(serialized);
      69           0 :         return -1;
      70             :     }
      71             : 
      72           0 :     log(LOG_MSU_ENQUEUES, "Enqueued message for remote send to dst %d on runtime %d",
      73             :                dst->id, dst->runtime_id);
      74           0 :     return 0;
      75             : }
      76             : 
      77           0 : int call_local_msu(struct local_msu *sender, struct local_msu *dest,
      78             :                 struct msu_msg_hdr *hdr, size_t data_size, void *data) {
      79           0 :     struct msu_msg *msg = create_msu_msg(hdr, data_size, data);
      80           0 :     log(LOG_MSU_ENQUEUES, "Enqueing data %p directly to destination %d",
      81             :                msg->data, dest->id);
      82             : 
      83           0 :     int rtn = add_provinance(&msg->hdr.provinance, sender);
      84           0 :     if (rtn < 0) {
      85           0 :         log_warn("Could not add provinance to message %p", msg);
      86             :     }
      87             : 
      88           0 :     rtn = enqueue_msu_msg(&dest->queue, msg);
      89           0 :     if (rtn < 0) {
      90           0 :         log_error("Error enqueing data %p to local MSU %d", msg->data, dest->id);
      91           0 :         free(msg);
      92           0 :         return -1;
      93             :     }
      94           0 :     return 0;
      95             : }
      96             : 
      97           1 : int schedule_local_msu_call(struct local_msu *sender, struct local_msu *dest, struct timespec *interval,
      98             :                             struct msu_msg_hdr *hdr, size_t data_size, void *data) {
      99           1 :     struct msu_msg *msg = create_msu_msg(hdr, data_size, data);
     100           1 :     log(LOG_MSU_ENQUEUES, "Enqueing data %p directly to destination %d",
     101             :                msg->data, dest->id);
     102             : 
     103           1 :     int rtn = add_provinance(&msg->hdr.provinance, sender);
     104           1 :     if (rtn < 0) {
     105           0 :         log_warn("Could not add provinance to message %p", msg);
     106             :     }
     107             : 
     108           1 :     rtn = schedule_msu_msg(&dest->queue, msg, interval);
     109           1 :     if (rtn < 0) {
     110           0 :         log_error("Error enqueing data %p to local MSU %d", msg->data, dest->id);
     111           0 :         free(msg);
     112           0 :         return -1;
     113             :     }
     114           1 :     rtn = enqueue_worker_timeout(dest->thread, interval);
     115           1 :     if (rtn < 0) {
     116           0 :         log_warn("Error enqueing timeout to worker thread");
     117             :     }
     118           1 :     return 0;
     119             : }
     120             : 
     121           1 : int schedule_local_msu_init_call(struct local_msu *sender, struct local_msu *dest, struct timespec *interval,
     122             :                       struct msu_msg_key *key, size_t data_size, void *data) {
     123             :     struct msu_msg_hdr hdr;
     124           1 :     if (init_msu_msg_hdr(&hdr, key) != 0) {
     125           0 :         log_error("Error initializing message header");
     126           0 :         return -1;
     127             :     }
     128             :     SET_PROFILING(hdr);
     129             :     PROFILE_EVENT(hdr, PROF_DEDOS_ENTRY);
     130           1 :     return schedule_local_msu_call(sender, dest, interval, &hdr, data_size, data);
     131             : }
     132             : 
     133             : 
     134           0 : int init_call_local_msu(struct local_msu *sender, struct local_msu *dest,
     135             :                         struct msu_msg_key *key, size_t data_size, void *data) {
     136             :     struct msu_msg_hdr hdr;
     137           0 :     if (init_msu_msg_hdr(&hdr, key) != 0) {
     138           0 :         log_error("Error initializing message header");
     139           0 :         return -1;
     140             :     }
     141             :     SET_PROFILING(hdr);
     142             :     PROFILE_EVENT(hdr, PROF_DEDOS_ENTRY);
     143           0 :     return call_local_msu(sender, dest, &hdr, data_size, data);
     144             : }
     145             : 
     146           0 : int call_msu_type(struct local_msu *sender, struct msu_type *dst_type,
     147             :                   struct msu_msg_hdr *hdr, size_t data_size, void *data) {
     148             : 
     149           0 :     struct msu_msg *msg = create_msu_msg(hdr, data_size, data);
     150             : 
     151           0 :     int rtn = add_provinance(&msg->hdr.provinance, sender);
     152           0 :     if (rtn < 0) {
     153           0 :         log_warn("Could not add provinance to message %p", msg);
     154             :     }
     155             : 
     156           0 :     log(LOG_MSU_ENQUEUES, "Sending data %p to destination type %s",
     157             :                msg->data, dst_type->name);
     158             : 
     159             :     struct msu_endpoint dst;
     160           0 :     if (dst_type->route) {
     161           0 :         rtn = dst_type->route(dst_type, sender, msg, &dst);
     162             :     } else {
     163           0 :         rtn = default_routing(dst_type, sender, msg, &dst);
     164             :     }
     165             : 
     166           0 :     if (rtn < 0) {
     167           0 :         log_error("Could not find destination endpoint of type %s from msu %d (%s). "
     168             :                   "Dropping message %p",
     169             :                   dst_type->name, sender->id, sender->type->name, msg);
     170           0 :         free(msg);
     171           0 :         return -1;
     172             :     }
     173             : 
     174           0 :     msg->hdr.key.group_id = dst.route_id;
     175             : 
     176           0 :     switch (dst.locality) {
     177             :         case MSU_IS_LOCAL:
     178           0 :             rtn = enqueue_msu_msg(dst.queue, msg);
     179           0 :             if (rtn < 0) {
     180           0 :                 log_error("Error enqueuing data %p to local MSU %d", msg->data, dst.id);
     181           0 :                 free(msg);
     182           0 :                 return -1;
     183             :             }
     184           0 :             log(LOG_MSU_ENQUEUES, "Enqueued data %p to local msu %d", msg->data, dst.id);
     185           0 :             return 0;
     186             :         case MSU_IS_REMOTE:
     187           0 :             rtn = enqueue_for_remote_send(msg, dst_type, &dst);
     188           0 :             if (rtn < 0) {
     189           0 :                 log_error("Error sending data %p to remote MSU %d", msg->data, dst.id);
     190           0 :                 return -1.i;
     191             :             }
     192           0 :             log(LOG_MSU_ENQUEUES, "Sending data %p to remote msu %d", msg->data, dst.id);
     193             : 
     194             :             // Since the data has been sent to a remote MSU, we can now
     195             :             // free the msu message from this runtime's memory
     196           0 :             destroy_msu_msg_and_contents(msg);
     197           0 :             return 0;
     198             :         default:
     199           0 :             log_error("Unknown MSU locality for msu %d (from %d): %d",
     200             :                       dst.id, sender->id, dst.locality);
     201           0 :             return -1;
     202             :     }
     203             : }
     204             : 
     205           0 : int init_call_msu_type(struct local_msu *sender, struct msu_type *dst_type,
     206             :                        struct msu_msg_key *key, size_t data_size, void *data) {
     207             :     struct msu_msg_hdr hdr;
     208           0 :     if (init_msu_msg_hdr(&hdr, key) != 0) {
     209           0 :         log_error("Error initializing message header");
     210           0 :         return -1;
     211             :     }
     212             :     SET_PROFILING(hdr);
     213             :     PROFILE_EVENT(hdr, PROF_DEDOS_ENTRY);
     214           0 :     return call_msu_type(sender, dst_type, &hdr, data_size, data);
     215             : }
     216             : 
     217           0 : int call_msu_endpoint(struct local_msu *sender, struct msu_endpoint *endpoint,
     218             :                       struct msu_type *endpoint_type,
     219             :                       struct msu_msg_hdr *hdr, size_t data_size, void *data) {
     220           0 :     struct msu_msg *msg = create_msu_msg(hdr, data_size, data);
     221             : 
     222             : 
     223           0 :     int rtn = add_provinance(&msg->hdr.provinance, sender);
     224           0 :     if (rtn < 0) {
     225           0 :         log_warn("Could not add provinance to message %p", msg);
     226             :     }
     227             : 
     228           0 :     log(LOG_MSU_ENQUEUES, "Sending data %p to destination endpoint %d",
     229             :         msg->data, endpoint->id);
     230             : 
     231           0 :     switch (endpoint->locality) {
     232             :         case MSU_IS_LOCAL:
     233           0 :             rtn = enqueue_msu_msg(endpoint->queue, msg);
     234           0 :             if (rtn < 0) {
     235           0 :                 log_error("Error enqueuing data %p to local msu %d", msg->data, endpoint->id);
     236           0 :                 free(msg);
     237           0 :                 return -1;
     238             :             }
     239           0 :             log(LOG_MSU_ENQUEUES, "Enqueued data %p to local msu %d", msg->data, endpoint->id);
     240           0 :             return 0;
     241             :         case MSU_IS_REMOTE:
     242           0 :             rtn = enqueue_for_remote_send(msg, endpoint_type, endpoint);
     243           0 :             if (rtn < 0) {
     244           0 :                 log_error("Error enqueueing data %p towards remote msu %d", 
     245             :                           msg->data, endpoint->id);
     246           0 :                 free(msg);
     247           0 :                 return -1;
     248             :             }
     249           0 :             log(LOG_MSU_ENQUEUES, "Enqueued data %p toward remote msu %d",
     250             :                  msg->data, endpoint->id);
     251           0 :             return 0;
     252             :         default:
     253           0 :             log_error("Unknown MSU locality: %d", endpoint->locality);
     254           0 :             return -1;
     255             :     }
     256             : }
     257             : 
     258           0 : int init_call_msu_endpoint(struct local_msu *sender, struct msu_endpoint *endpoint,
     259             :                            struct msu_type *endpoint_type,
     260             :                            struct msu_msg_key *key, size_t data_size, void *data) {
     261             :     struct msu_msg_hdr hdr;
     262           0 :     if (init_msu_msg_hdr(&hdr, key) != 0) {
     263           0 :         log_error("Error initializing message header");
     264           0 :         return -1;
     265             :     }
     266             :     SET_PROFILING(hdr);
     267             :     PROFILE_EVENT(hdr, PROF_DEDOS_ENTRY);
     268           0 :     return call_msu_endpoint(sender, endpoint, endpoint_type, &hdr, data_size, data);
     269             : }
     270             : 
     271           0 : int call_msu_error(struct local_msu *sender, struct msu_endpoint *endpoint,
     272             :                    struct msu_type *endpoint_type,
     273             :                    struct msu_msg_hdr *hdr, size_t data_size, void *data) {
     274           0 :     hdr->error_flag = -1;
     275           0 :     return call_msu_endpoint(sender, endpoint, endpoint_type, hdr, data_size, data);
     276             : }
     277             : 
     278             : 
     279             : 

Generated by: LCOV version 1.10