LCOV - code coverage report
Current view: top level - runtime - local_msu.c (source / functions) Hit Total Coverage
Test: unnamed Lines: 122 180 67.8 %
Date: 2018-01-11 Functions: 14 15 93.3 %

          Line data    Source code
       1             : /*
       2             : START OF LICENSE STUB
       3             :     DeDOS: Declarative Dispersion-Oriented Software
       4             :     Copyright (C) 2017 University of Pennsylvania, Georgetown University
       5             : 
       6             :     This program is free software: you can redistribute it and/or modify
       7             :     it under the terms of the GNU General Public License as published by
       8             :     the Free Software Foundation, either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     This program is distributed in the hope that it will be useful,
      12             :     but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :     GNU General Public License for more details.
      15             : 
      16             :     You should have received a copy of the GNU General Public License
      17             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             : END OF LICENSE STUB
      19             : */
      20             : /**
      21             :  * @file local_msu.c
      22             :  * Defines the structures and functions used by MSUs on the local machine
      23             :  */
      24             : 
      25             : #include "local_msu.h"
      26             : #include "routing_strategies.h"
      27             : #include "logging.h"
      28             : #include "rt_stats.h"
      29             : #include "msu_message.h"
      30             : #include "inter_runtime_messages.h"
      31             : #include "thread_message.h"
      32             : #include "msu_state.h"
      33             : #include "msu_calls.h"
      34             : 
      35             : #include <stdlib.h>
      36             : #define __USE_GNU // For some reason, necessary for RUSAGE_THREAD
      37             : #include <sys/resource.h>
      38             : #include <sys/time.h>
      39             : /**
      40             :  * MOVEME: MAX_MSU_ID
      41             :  * Defines the maximum ID that can be assigned to an MSU.
      42             :  * Necessary becaues MSUs are indexed by ID in the local registry.
      43             :  */
      44             : #define MAX_MSU_ID 1024
      45             : 
      46             : // TODO: This lock might not be useful, as I believe this registry will
      47             : // TODO: only ever be accessed from the socket handler thread
      48             : 
      49             : /** Lock to protect access to local msu registry */
      50             : pthread_rwlock_t msu_registry_lock;
      51             : /** Mapping of MSU ID to the specific instance of the local MSU */
      52             : struct local_msu *local_msu_registry[MAX_MSU_ID];
      53             : 
      54             : /**
      55             :  * Allocates the memory associated with an MSU structure
      56             :  * @return Pointer to allocated MSU
      57             :  */
      58          11 : static struct local_msu *msu_alloc() {
      59          11 :     struct local_msu *msu = calloc(1, sizeof(*msu));
      60          11 :     if (msu == NULL) {
      61           0 :         log_error("Failed to allocate MSU");
      62           0 :         return NULL;
      63             :     }
      64          11 :     return msu;
      65             : }
      66             : 
      67             : /**
      68             :  * Frees the memory associated with an MSU structure,
      69             :  * including any routes, messages in its queue, or states
      70             :  */
      71           2 : static void msu_free(struct local_msu *msu) {
      72           2 :     struct msu_msg *msg = dequeue_msu_msg(&msu->queue);
      73           4 :     while (msg != NULL) {
      74           0 :         if (msg->data_size > 0) {
      75           0 :             free(msg->data);
      76             :         }
      77           0 :         free(msg);
      78           0 :         msg = dequeue_msu_msg(&msu->queue);
      79             :     }
      80           2 :     msu_free_all_state(msu);
      81           2 :     free(msu->routes.routes);
      82           2 :     free(msu);
      83           2 : }
      84             : 
      85             : /**
      86             :  * Removes an MSU from the local MSU registry
      87             :  * @param id ID of the MSU to remove
      88             :  * @return 0 on success, -1 on error
      89             :  */
      90           4 : static int rm_from_local_registry(int id) {
      91           4 :     if (pthread_rwlock_wrlock(&msu_registry_lock) != 0) {
      92           0 :         log_perror("Error opening write lock on msu registry");
      93           0 :         return -1;
      94             :     }
      95           4 :     int rtn = 0;
      96           4 :     if (local_msu_registry[id] == NULL) {
      97           1 :         log_warn("MSU with id %d does not exist and cannot be deleted", id);
      98           1 :         rtn = -1;
      99             :     } else {
     100           3 :         local_msu_registry[id] = NULL;
     101             :     }
     102           4 :     if (pthread_rwlock_unlock(&msu_registry_lock) != 0) {
     103           0 :         log_perror("Error unlocking msu registry");
     104           0 :         return -1;
     105             :     }
     106           4 :     return rtn;
     107             : }
     108             : 
     109             : /**
     110             :  * Adds an MSU to the local registry so it can be referred to elsewhere by ID
     111             :  * @param MSU the local MSU to add to the registry
     112             :  * @return 0 on success, -1 on error
     113             :  */
     114          13 : static int add_to_local_registry(struct local_msu *msu) {
     115          13 :     if (pthread_rwlock_wrlock(&msu_registry_lock) != 0) {
     116           0 :         log_perror("Error opening write lock on msu registry");
     117           0 :         return -1;
     118             :     }
     119          13 :     int rtn = 0;
     120          13 :     if (local_msu_registry[msu->id] != NULL) {
     121           1 :         log_error("MSU with id %d already exists and cannot be added to registry", msu->id);
     122           1 :         rtn = -1;
     123             :     } else {
     124          12 :         local_msu_registry[msu->id] = msu;
     125             :     }
     126          13 :     if (pthread_rwlock_unlock(&msu_registry_lock) != 0) {
     127           0 :         log_perror("Error unlocking msu registry");
     128           0 :         return -1;
     129             :     }
     130          13 :     return rtn;
     131             : }
     132             : 
     133             : 
     134           5 : struct local_msu *get_local_msu(unsigned int id) {
     135           5 :     if (id >= MAX_MSU_ID) {
     136           0 :         log_error("MSU id %u too high!", id);
     137           0 :         return NULL;
     138             :     }
     139           5 :     if (pthread_rwlock_rdlock(&msu_registry_lock) != 0) {
     140           0 :         log_perror("Error opening read lock on MSU registry");
     141           0 :         return NULL;
     142             :     }
     143           5 :     struct local_msu *msu = local_msu_registry[id];
     144           5 :     if (pthread_rwlock_unlock(&msu_registry_lock) != 0) {
     145           0 :         log_perror("Error unlocking msu registry");
     146           0 :         return NULL;
     147             :     }
     148           5 :     if (msu == NULL) {
     149           1 :         log_error("Could not get local msu with id %d. Not registered.", id);
     150             :     }
     151           5 :     return msu;
     152             : }
     153             : 
     154             : /** The stat IDs that are associated with an MSU, to be registered on MSU creation */
     155             : static enum stat_id MSU_STAT_IDS[] = {
     156             :     MSU_QUEUE_LEN,
     157             :     MSU_ITEMS_PROCESSED,
     158             :     MSU_EXEC_TIME,
     159             :     MSU_IDLE_TIME,
     160             :     MSU_MEM_ALLOC,
     161             :     MSU_NUM_STATES,
     162             :     MSU_ERROR_CNT,
     163             :     MSU_UCPUTIME,
     164             :     MSU_SCPUTIME,
     165             :     MSU_MINFLT,
     166             :     MSU_MAJFLT,
     167             :     MSU_VCSW,
     168             :     MSU_IVCSW
     169             : };
     170             : 
     171             : #define NUM_MSU_STAT_IDS sizeof(MSU_STAT_IDS) / sizeof(enum stat_id)
     172             : 
     173             : /**
     174             :  * Initializes the stat IDS that are relevant to an MSU
     175             :  * @param msu_id ID of the msu to register
     176             :   */
     177          11 : static void init_msu_stats(int msu_id) {
     178         154 :     for (int i=0; i<NUM_MSU_STAT_IDS; i++) {
     179         143 :         if (init_stat_item(MSU_STAT_IDS[i], msu_id) != 0) {
     180           0 :             log_warn("Could not initialize stat item %d for msu %d", MSU_STAT_IDS[i], msu_id);
     181             :         }
     182             :     }
     183          11 : }
     184             : 
     185             : /**
     186             :  * Unregisters the stat IDS that are relevant to an MSU
     187             :  * @param msu_id ID of the MSU to register
     188             :  */
     189           2 : static void unregister_msu_stats(int msu_id) {
     190          28 :     for (int i=0; i < NUM_MSU_STAT_IDS; i++) {
     191          26 :         if (remove_stat_item(MSU_STAT_IDS[i], msu_id) != 0) {
     192           0 :             log_warn("Could not remove stat item %d for msu %d",
     193             :                      MSU_STAT_IDS[i], msu_id);
     194             :         }
     195             :     }
     196           2 : }
     197             : 
     198          11 : struct local_msu *init_msu(unsigned int id,
     199             :                            struct msu_type *type,
     200             :                            struct worker_thread *thread,
     201             :                            struct msu_init_data *data) {
     202          11 :     struct local_msu *msu = msu_alloc();
     203          11 :     init_msu_stats(id);
     204          11 :     msu->thread = thread;
     205          11 :     msu->id = id;
     206          11 :     msu->type = type;
     207          11 :     msu->scheduling_weight = 0;
     208             : 
     209          11 :     if (init_msg_queue(&msu->queue, &thread->thread->sem) != 0) {
     210           0 :         msu_free(msu);
     211           0 :         log_error("Error initializing msu queue");
     212           0 :         return NULL;
     213             :     }
     214             : 
     215             :     // Must be done before running init function, or the msu cannot enqueue to itself
     216          11 :     int rtn = register_msu_with_thread(msu);
     217          11 :     if (rtn < 0) {
     218           0 :         log_error("Error registering MSU With thread");
     219           0 :         msu_free(msu);
     220           0 :         return NULL;
     221             :     }
     222             : 
     223             :     // TODO: Unregister if creation fails
     224          11 :     log_info("Initializing msu (ID: %d, type: %s, data: '%s')", id, type->name,
     225             :              data->init_data);
     226             : 
     227             :     // Run the MSU's type-specific init function if it has one
     228          11 :     if (type->init) {
     229           7 :         if (type->init(msu, data) != 0) {
     230           0 :             log_error("Error running MSU %d (type: %s) type-specific initialization function",
     231             :                       id, type->name);
     232           0 :             unregister_msu_with_thread(msu);
     233           0 :             msu_free(msu);
     234           0 :             return NULL;
     235             :         }
     236             :     }
     237             : 
     238          11 :     rtn = add_to_local_registry(msu);
     239          11 :     if (rtn < 0) {
     240           0 :         log_error("Error adding MSU to local registry");
     241           0 :         msu_free(msu);
     242           0 :         unregister_msu_with_thread(msu);
     243           0 :         return NULL;
     244             :     }
     245             : 
     246          11 :     return msu;
     247             : }
     248             : 
     249           0 : int try_destroy_msu(struct local_msu *msu) {
     250           0 :     if (msu_num_states(msu) > 0) {
     251           0 :         return 1;
     252             :     }
     253           0 :     destroy_msu(msu);
     254           0 :     return 0;
     255             : }
     256             : 
     257           2 : void destroy_msu(struct local_msu *msu) {
     258           2 :     int id = msu->id;
     259           2 :     char *type = msu->type->name;
     260           2 :     unregister_msu_stats(msu->id);
     261           2 :     if (msu->type->destroy) {
     262           1 :         msu->type->destroy(msu);
     263             :     }
     264           2 :     msu_free(msu);
     265           2 :     rm_from_local_registry(id);
     266           2 :     log_info("Removed msu (ID: %d, Type: %s)", id, type);
     267           2 : }
     268             : 
     269             : /**
     270             :  * Calls type-specific MSU receive function and records execution time
     271             :  * @param msu MSU to receive data
     272             :  * @param data Data to be sent to MSU
     273             :  * @return 0 on success, -1 on error
     274             :  */
     275           2 : static int msu_receive(struct local_msu *msu, struct msu_msg *msg) {
     276             : 
     277           2 :     record_end_time(MSU_IDLE_TIME, msu->id);
     278           2 :     record_start_time(MSU_EXEC_TIME, msu->id);
     279           2 :     int rtn = msu->type->receive(msu, msg);
     280           2 :     record_end_time(MSU_EXEC_TIME, msu->id);
     281           2 :     record_start_time(MSU_IDLE_TIME, msu->id);
     282             : 
     283           2 :     if (rtn != 0) {
     284           0 :         log_error("Error executing MSU %d (%s) receive function",
     285             :                   msu->id, msu->type->name);
     286           0 :         return -1;
     287             :     }
     288           2 :     return 0;
     289             : }
     290             : 
     291           1 : static inline int gather_metrics_before(struct rusage *before) {
     292           1 :     int rtn = getrusage(RUSAGE_THREAD, before);
     293           1 :     if (rtn < 0) {
     294           0 :         log_error("Error getting MSU rusage");
     295             :     }
     296           1 :     return rtn;
     297             : }
     298             : 
     299             : #define RECORD_DIFF(dstat, rstat, id) \
     300             :     increment_stat(dstat, id, after.rstat - before->rstat)
     301             : 
     302             : #define RECORD_TIMEDIFF(dstat, rstat, id) \
     303             :     increment_stat(dstat, id, ((double)after.rstat.tv_sec + after.rstat.tv_usec * 1e-6) - \
     304             :                               ((double)before->rstat.tv_sec + before->rstat.tv_usec * 1e-6))
     305             : 
     306           1 : static inline void record_metrics(struct rusage *before, int msu_id) {
     307             :     struct rusage after;
     308           1 :     int rtn = getrusage(RUSAGE_THREAD, &after);
     309           1 :     if (rtn < 0) {
     310           0 :         log_error("Error getting MSU rusage");
     311           1 :         return;
     312             :     }
     313             : 
     314           1 :     RECORD_TIMEDIFF(MSU_UCPUTIME, ru_utime, msu_id);
     315           1 :     RECORD_TIMEDIFF(MSU_SCPUTIME, ru_stime, msu_id);
     316           1 :     RECORD_DIFF(MSU_MINFLT, ru_minflt, msu_id);
     317           1 :     RECORD_DIFF(MSU_MAJFLT, ru_majflt, msu_id);
     318           1 :     RECORD_DIFF(MSU_VCSW, ru_nvcsw, msu_id);
     319           1 :     RECORD_DIFF(MSU_IVCSW, ru_nivcsw, msu_id);
     320             : }
     321             : 
     322             : 
     323           2 : int msu_dequeue(struct local_msu *msu) {
     324           2 :     struct msu_msg *msg = dequeue_msu_msg(&msu->queue);
     325           2 :     if (msg) {
     326           1 :         if (msg->hdr.error_flag) {
     327           0 :             if (msu->type->receive_error == NULL) {
     328           0 :                 log_error("MSU %d received error with no handler", msu->id);
     329           0 :                 return -1;
     330             :             }
     331           0 :             int rtn = msu->type->receive_error(msu, msg);
     332           0 :             if (rtn < 0) {
     333           0 :                 log_error("Error executing MSU error receive function");
     334           0 :                 return -1;
     335             :             }
     336             :         } else {
     337           1 :             log(LOG_MSU_DEQUEUES, "Dequeued MSU message %p for msu %d", msg, msu->id);
     338             :             struct rusage before;
     339           1 :             record_stat(MSU_QUEUE_LEN, msu->id, msu->queue.num_msgs, false);
     340           1 :             int gather_err = gather_metrics_before(&before);
     341           1 :             int rtn = msu_receive(msu, msg);
     342           1 :             if (gather_err == 0) {
     343           1 :                 record_metrics(&before, msu->id);
     344             :             }
     345           1 :             increment_stat(MSU_ITEMS_PROCESSED, msu->id, 1);
     346           1 :             free(msg);
     347           1 :             return rtn;
     348             :         }
     349             :     }
     350           1 :     return 1;
     351             : }
     352             : 
     353           1 : int msu_error(struct local_msu *msu, struct msu_msg_hdr *hdr, int broadcast) {
     354             : 
     355           1 :     increment_stat(MSU_ERROR_CNT, msu->id, 1);
     356             : 
     357           1 :     if (!broadcast) {
     358           0 :         return 0;
     359             :     }
     360             : 
     361           3 :     for (int i=0; i < hdr->provinance.path_len; i++) {
     362           2 :         struct msu_provinance_item *upstream = &hdr->provinance.path[i];
     363           2 :         struct msu_type *up_type = get_msu_type(upstream->type_id);
     364           2 :         if (up_type == NULL) {
     365           0 :             log_error("Cannot get type %d", upstream->type_id);
     366           1 :             continue;
     367             :         }
     368           2 :         if (up_type->receive_error == NULL) {
     369           1 :             continue;
     370             :         }
     371             :         struct msu_endpoint receiver;
     372           1 :         int rtn = init_msu_endpoint(upstream->msu_id, upstream->runtime_id, &receiver);
     373           1 :         if (rtn < 0) {
     374           0 :             log_error("Error initializing msu endpoint");
     375           0 :             continue;
     376             :         }
     377           1 :         rtn = call_msu_error(msu, &receiver, up_type, hdr, 0, NULL);
     378           1 :         if (rtn < 0) {
     379           0 :             log_error("Error calling MSU endpoint for error report");
     380           0 :             continue;
     381             :         }
     382             :     }
     383           1 :     return 0;
     384             : }

Generated by: LCOV version 1.10