LCOV - code coverage report
Current view: top level - runtime - msu_message.c (source / functions) Hit Total Coverage
Test: unnamed Lines: 84 150 56.0 %
Date: 2018-01-11 Functions: 8 14 57.1 %

          Line data    Source code
       1             : /*
       2             : START OF LICENSE STUB
       3             :     DeDOS: Declarative Dispersion-Oriented Software
       4             :     Copyright (C) 2017 University of Pennsylvania, Georgetown University
       5             : 
       6             :     This program is free software: you can redistribute it and/or modify
       7             :     it under the terms of the GNU General Public License as published by
       8             :     the Free Software Foundation, either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     This program is distributed in the hope that it will be useful,
      12             :     but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :     GNU General Public License for more details.
      15             : 
      16             :     You should have received a copy of the GNU General Public License
      17             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             : END OF LICENSE STUB
      19             : */
      20             : /**
      21             :  * @file msu_msssage.c
      22             :  *
      23             :  * Messages passed to an MSU
      24             :  */
      25             : #include "msu_message.h"
      26             : #include "logging.h"
      27             : #include "communication.h"
      28             : #include "local_msu.h"
      29             : #include "uthash.h"
      30             : #include "runtime_dfg.h"
      31             : #include "profiler.h"
      32             : 
      33             : #include <stdlib.h>
      34             : 
      35           0 : int init_msu_msg_hdr(struct msu_msg_hdr *hdr, struct msu_msg_key *key) {
      36           0 :     memset(hdr, '\0', sizeof(*hdr));
      37           0 :     hdr->key = *key;
      38           0 :     return 0;
      39             : }
      40             : 
      41           0 : unsigned int msu_msg_sender_type(struct msg_provinance *prov) {
      42           0 :     return prov->sender.type_id;
      43             : }
      44             : 
      45           2 : struct msu_provinance_item *get_provinance_item(struct msg_provinance *p, struct msu_type *type) {
      46           3 :     for (int i=0; i < p->path_len && i < MAX_PATH_LEN; i++) {
      47           3 :         if (p->path[i].type_id == type->id) {
      48           2 :             return &p->path[i];
      49             :         }
      50             :     }
      51           0 :     return NULL;
      52             : }
      53             : 
      54           0 : int set_msg_key(int32_t id, struct msu_msg_key *key) {
      55           0 :     memcpy(&key->key, &id, sizeof(id));
      56           0 :     key->key_len = sizeof(id);
      57           0 :     key->id = id;
      58           0 :     key->group_id = -1;
      59           0 :     return 0;
      60             : }
      61             : 
      62           0 : int seed_msg_key(void *seed, size_t seed_size, struct msu_msg_key *key) {
      63           0 :     HASH_VALUE(seed, seed_size, key->id);
      64           0 :     if (seed_size > sizeof(key->key)) {
      65           0 :         log_warn("Key length too large for composite key!");
      66           0 :         seed_size = sizeof(key->key);
      67             :     }
      68           0 :     memcpy(&key->key, seed, seed_size);
      69           0 :     key->key_len = seed_size;
      70           0 :     key->group_id = -1;
      71           0 :     return 0;
      72             : }
      73             : 
      74           2 : int add_provinance(struct msg_provinance *prov, struct local_msu *sender) {
      75           2 :     prov->sender.type_id = sender->type->id;
      76           2 :     prov->sender.msu_id = sender->id;
      77           2 :     prov->sender.runtime_id = local_runtime_id();
      78           2 :     if (prov->path_len == 0) {
      79           1 :         prov->origin = prov->sender;
      80             :     }
      81             : 
      82             :     int i;
      83           3 :     for (i = 0; i < MAX_PATH_LEN && i < prov->path_len; i++) {
      84           1 :         if (prov->path[i].type_id == sender->type->id) {
      85           0 :             break;
      86             :         }
      87             :     }
      88           2 :     if (i >= MAX_PATH_LEN) {
      89           0 :         log_warn("Cannot record provinance in path: Path too short");
      90           0 :         return 1;
      91             :     }
      92             : 
      93           2 :     prov->path[i] = prov->sender;
      94           2 :     if (i == prov->path_len) {
      95           2 :         prov->path_len++;
      96             :     }
      97           2 :     log(LOG_ADD_PROVINANCE, "Path len of prov %p is now %d", prov, prov->path_len);
      98           2 :     return 0;
      99             : }
     100             : 
     101           3 : int enqueue_msu_msg(struct msg_queue *q, struct msu_msg *data) {
     102           3 :     struct timespec zero = {};
     103           3 :     return schedule_msu_msg(q, data, &zero);
     104             : }
     105             : 
     106           3 : int schedule_msu_msg(struct msg_queue *q, struct msu_msg *data, struct timespec *interval) {
     107           3 :     struct dedos_msg *msg = malloc(sizeof(*msg));
     108           3 :     if (msg == NULL) {
     109           0 :         log_perror("Error allocating dedos message");
     110           0 :         return -1;
     111             :     }
     112           3 :     msg->type = MSU_MSG;
     113           3 :     msg->data_size = sizeof(*data);
     114           3 :     msg->data = data;
     115             : 
     116             :     PROFILE_EVENT(data->hdr, PROF_ENQUEUE);
     117           3 :     if (schedule_msg(q, msg, interval) != 0) {
     118           0 :         log_error("Error MSU message to MSU");
     119           0 :         return -1;
     120             :     }
     121           3 :     return 0;
     122             : }
     123             : 
     124           4 : struct msu_msg *dequeue_msu_msg(struct msg_queue *q) {
     125           4 :     struct dedos_msg *msg = dequeue_msg(q);
     126           4 :     if (msg == NULL) {
     127           1 :         return NULL;
     128             :     }
     129             : 
     130           3 :     if (msg->type != MSU_MSG) {
     131           0 :         log_error("Received non-MSU message on msu queue!");
     132           0 :         return NULL;
     133             :     }
     134             : 
     135           3 :     if (msg->data_size != sizeof(struct msu_msg)) {
     136           0 :         log_warn("Data size of dequeued MSU message (%d) does not meet expected (%d)",
     137             :                  (int)msg->data_size, (int)sizeof(struct msu_msg));
     138           0 :         return NULL;
     139             :     }
     140             : 
     141           3 :     struct msu_msg *msu_msg = msg->data;
     142           3 :     free(msg);
     143             :     PROFILE_EVENT(msu_msg->hdr, PROF_DEQUEUE);
     144           3 :     return msu_msg;
     145             : }
     146             : 
     147           2 : int read_msu_msg_hdr(int fd, struct msu_msg_hdr *hdr) {
     148           2 :     if (read_payload(fd, sizeof(*hdr), hdr) != 0) {
     149           0 :         log_error("Error reading msu msg header from fd %d", fd);
     150           0 :         return -1;
     151             :     }
     152             : 
     153           2 :     return 0;
     154             : }
     155             : 
     156           0 : void destroy_msu_msg_and_contents(struct msu_msg *msg) {
     157           0 :     free(msg->data);
     158           0 :     free(msg);
     159           0 : }
     160             : 
     161           0 : struct msu_msg *create_msu_msg(struct msu_msg_hdr *hdr, 
     162             :                                      size_t data_size, 
     163             :                                      void *data) {
     164           0 :     struct msu_msg *msg = malloc(sizeof(*msg));
     165           0 :     msg->hdr = *hdr;
     166           0 :     msg->data_size = data_size;
     167           0 :     msg->data = data;
     168           0 :     return msg;
     169             : }
     170             : 
     171           2 : struct msu_msg *read_msu_msg(struct local_msu *msu, int fd, size_t size) {
     172           2 :     if (size < sizeof(struct msu_msg_hdr)) {
     173           0 :         log_error("Size of incoming message is not big enough to fit header");
     174           0 :         return NULL;
     175             :     }
     176           2 :     log(LOG_MSU_MSG_READ, "Reading header from %d", fd);
     177           2 :     struct msu_msg *msg = malloc(sizeof(*msg));
     178           2 :     if (msg == NULL) {
     179           0 :         log_perror("Error allocating MSU message");
     180           0 :         return NULL;
     181             :     }
     182           2 :     if (read_msu_msg_hdr(fd, &msg->hdr) != 0) {
     183           0 :         log_error("Error reading msu message header");
     184           0 :         free(msg);
     185           0 :         return NULL;
     186             :     }
     187             :     PROFILE_EVENT(msg->hdr, PROF_REMOTE_RECV);
     188           2 :     size_t data_size = size - sizeof(msg->hdr);
     189           2 :     void *data = NULL;
     190           2 :     if (data_size > 0) {
     191           2 :         data = malloc(data_size);
     192           2 :         if (data == NULL) {
     193           0 :             log_perror("Error allocating msu msg of size %d", (int)data_size);
     194           0 :             free(msg);
     195           0 :             return NULL;
     196             :         }
     197           2 :         log(LOG_MSU_MSG_READ, "Reading payload of size %d from %d", (int)data_size, fd);
     198           2 :         if (read_payload(fd, data_size, data) != 0) {
     199           0 :             log_perror("Error reading msu msg payload of size %d", (int)data_size);
     200           0 :             free(msg);
     201           0 :             free(data);
     202           0 :             return NULL;
     203             :         }
     204           2 :         log(LOG_MSU_MSG_DESERIALIZE, "Deserialized MSU message of size %d", (int)data_size);
     205             :     }
     206           2 :     msg->data_size = data_size;
     207           2 :     if (msu->type->deserialize) {
     208           1 :         msg->data = msu->type->deserialize(msu, data_size, data, &msg->data_size);
     209           1 :         free(data);
     210             :     } else {
     211           1 :         msg->data = data;
     212             :     }
     213           2 :     return msg;
     214             : }
     215             : 
     216           2 : void *serialize_msu_msg(struct msu_msg *msg, struct msu_type *dst_type, size_t *size_out) {
     217             : 
     218           4 :     serialization_fn serializer = (msg->hdr.error_flag == 0 ? 
     219           2 :                                    dst_type->serialize : dst_type->serialize_error);
     220             : 
     221           2 :     if (serializer != NULL) {
     222           1 :         void *payload = NULL;
     223           1 :         ssize_t payload_size = serializer(dst_type, msg, &payload);
     224           1 :         log(LOG_SERIALIZE,
     225             :                    "Serialized message into payload of size %d using %s serialization",
     226             :                    (int)payload_size, dst_type->name);
     227           1 :         if (payload_size < 0) {
     228           0 :             log_error("Error running destination type %s's serialize function",
     229             :                       dst_type->name);
     230           0 :             return NULL;
     231             :         }
     232           1 :         size_t serialized_size = sizeof(struct msu_msg_hdr) + payload_size;
     233           1 :         void *output = malloc(serialized_size);
     234           1 :         if (output == NULL) {
     235           0 :             log_error("Could not allocate serialized MSU msg");
     236           0 :             free(payload);
     237           0 :             return NULL;
     238             :         }
     239           1 :         memcpy(output, &msg->hdr, sizeof(msg->hdr));
     240           1 :         memcpy(output + sizeof(msg->hdr), payload, payload_size);
     241             : 
     242           1 :         free(payload);
     243             : 
     244           1 :         *size_out = serialized_size;
     245           1 :         return output;
     246             :     } else {
     247           1 :         size_t serialized_size = sizeof(struct msu_msg_hdr) + msg->data_size;
     248           1 :         void *output = malloc(serialized_size);
     249             : 
     250           1 :         if (output == NULL) {
     251           0 :             log_error("Could not allocate serialized MSU msg");
     252           0 :             return NULL;
     253             :         }
     254             : 
     255           1 :         memcpy(output, &msg->hdr, sizeof(msg->hdr));
     256           1 :         memcpy(output + sizeof(msg->hdr), msg->data, msg->data_size);
     257             : 
     258           1 :         *size_out = serialized_size;
     259           1 :         return output;
     260             :     }
     261             : }

Generated by: LCOV version 1.10