LCOV - code coverage report
Current view: top level - runtime - worker_thread.c (source / functions) Hit Total Coverage
Test: unnamed Lines: 86 226 38.1 %
Date: 2018-01-11 Functions: 10 17 58.8 %

          Line data    Source code
       1             : /*
       2             : START OF LICENSE STUB
       3             :     DeDOS: Declarative Dispersion-Oriented Software
       4             :     Copyright (C) 2017 University of Pennsylvania, Georgetown University
       5             : 
       6             :     This program is free software: you can redistribute it and/or modify
       7             :     it under the terms of the GNU General Public License as published by
       8             :     the Free Software Foundation, either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     This program is distributed in the hope that it will be useful,
      12             :     but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :     GNU General Public License for more details.
      15             : 
      16             :     You should have received a copy of the GNU General Public License
      17             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             : END OF LICENSE STUB
      19             : */
      20             : /**
      21             :  * @file worker_thread.c
      22             :  *
      23             :  * Threads that hold MSUs
      24             :  */
      25             : #include "worker_thread.h"
      26             : #include "msu_type.h"
      27             : #include "local_msu.h"
      28             : #include "logging.h"
      29             : #include "thread_message.h"
      30             : #include "msu_message.h"
      31             : #include "controller_communication.h"
      32             : 
      33             : #include <stdlib.h>
      34             : 
      35             : /** The maximum ID that can be assigned to a worker thread */
      36             : #define MAX_DEDOS_THREAD_ID 32
      37             : 
      38             : /** Static struct to keep track of worker threads */
      39             : static struct worker_thread *worker_threads[MAX_DEDOS_THREAD_ID];
      40             : 
      41             : /** Allocates and returns a new worker thread structure */
      42           1 : static void *init_worker_thread(struct dedos_thread *thread) {
      43           1 :     struct worker_thread *worker = calloc(1, sizeof(*worker));
      44           1 :     if (worker == NULL) {
      45           0 :         log_error("Error allocating worker thread");
      46           0 :         return NULL;
      47             :     }
      48           1 :     worker->thread = thread;
      49           1 :     worker->exit_signal = 0;
      50           1 :     worker_threads[thread->id] = worker;
      51           1 :     return worker;
      52             : }
      53             : 
      54           0 : void stop_all_worker_threads() {
      55             :     struct dedos_thread *d_threads[MAX_DEDOS_THREAD_ID];
      56           0 :     int n_threads=0;
      57           0 :     for (int i=0; i<MAX_DEDOS_THREAD_ID; i++) {
      58           0 :         if (worker_threads[i] != NULL) {
      59           0 :             d_threads[n_threads] = worker_threads[i]->thread;
      60           0 :             n_threads++;
      61           0 :             dedos_thread_stop(worker_threads[i]->thread);
      62             :         }
      63             :     }
      64           0 :     for (int i=0; i<n_threads; i++) {
      65           0 :         dedos_thread_join(d_threads[i]);
      66             :     }
      67           0 : }
      68             : 
      69             : /** Destroys all MSUs on a worker thread and frees the associated structure */
      70           0 : static void destroy_worker_thread(struct dedos_thread *thread, void *v_worker_thread) {
      71           0 :     struct worker_thread *wthread = v_worker_thread;
      72           0 :     for (int i=0; i < wthread->n_msus; i++) {
      73           0 :         destroy_msu(wthread->msus[i]);
      74             :     }
      75           0 :     worker_threads[thread->id] = NULL;
      76           0 :     free(v_worker_thread);
      77           0 : }
      78             : 
      79           1 : struct worker_thread *get_worker_thread(int id) {
      80           1 :     if (id > MAX_DEDOS_THREAD_ID) {
      81           0 :         log_error("Error: ID higher than maximum thread ID: %d > %d", id, MAX_DEDOS_THREAD_ID);
      82           0 :         return NULL;
      83             :     }
      84           1 :     return worker_threads[id];
      85             : }
      86             : 
      87             : /** Gets the index in worker_thread::msus at which the msu_id resides */
      88           5 : static int get_msu_index(struct worker_thread *thread, int msu_id) {
      89          12 :     for (int i=0; i<thread->n_msus; i++) {
      90          10 :         if (thread->msus[i]->id == msu_id) {
      91           3 :             return i;
      92             :         }
      93             :     }
      94           2 :     return -1;
      95             : }
      96             : 
      97             : /** Removes the MSU at the given index from the worker_thread::msus */
      98           1 : static int remove_idx_from_msu_list(struct worker_thread *thread, int idx) {
      99           1 :     if (idx >= thread->n_msus) {
     100           0 :         return -1;
     101             :     }
     102           1 :     for (int i=idx; i<thread->n_msus - 1; i++) {
     103           0 :         thread->msus[i] = thread->msus[i+1];
     104             :     }
     105           1 :     thread->msus[thread->n_msus-1] = NULL;
     106           1 :     thread->n_msus--;
     107           1 :     return 0;
     108             : }
     109             : 
     110           0 : int unregister_msu_with_thread(struct local_msu *msu) {
     111           0 :     int idx = get_msu_index(msu->thread, msu->id);
     112           0 :     if (idx == -1) {
     113           0 :         log_error("MSU %d does not exist on thread %d", msu->id, msu->thread->thread->id);
     114           0 :         return -1;
     115             :     }
     116           0 :     return remove_idx_from_msu_list(msu->thread, idx);
     117             : }
     118             : 
     119           1 : int register_msu_with_thread(struct local_msu *msu) {
     120           1 :     if (msu->thread->n_msus >= MAX_MSU_PER_THREAD) {
     121           0 :         log_error("Too many MSUs on thread %d", msu->thread->thread->id);
     122           0 :         return -1;
     123             :     }
     124           1 :     msu->thread->msus[msu->thread->n_msus] = msu;
     125           1 :     msu->thread->n_msus++;
     126           1 :     log(LOG_MSU_REGISTRATION, "Registered msu %d with thread", msu->id);
     127           1 :     return 0;
     128             : }
     129             : 
     130             : /** Creates a new MSU on this thread based on the provided message */
     131           1 : static int create_msu_on_thread(struct worker_thread *thread, struct ctrl_create_msu_msg *msg) {
     132           1 :     struct msu_type *type = get_msu_type(msg->type_id);
     133           1 :     if (type == NULL) {
     134           0 :         log_error("Failed to create MSU %d. Cannot retrieve type", msg->msu_id);
     135           0 :         return -1;
     136             :     }
     137           1 :     struct local_msu *msu = init_msu(msg->msu_id, type, thread, &msg->init_data);
     138           1 :     if (msu == NULL) {
     139           0 :         log_error("Error creating MSU %d. Not placing on thread %d",
     140             :                   msg->msu_id, thread->thread->id);
     141           0 :         return -1;
     142             :     }
     143           1 :     return 0;
     144             : }
     145             : 
     146             : /** Removes an MSU from this thread based on the provided messages */
     147           2 : static int del_msu_from_thread(struct worker_thread *thread, struct ctrl_delete_msu_msg *msg,
     148             :                                int ack_id) {
     149           2 :     int idx = get_msu_index(thread, msg->msu_id);
     150           2 :     if (idx == -1) {
     151           1 :         log_error("MSU %d does not exist on thread %d", msg->msu_id, thread->thread->id);
     152           1 :         return -1;
     153             :     }
     154           1 :     struct local_msu *msu = thread->msus[idx];
     155           1 :     if (msg->force) {
     156           0 :         destroy_msu(msu);
     157           0 :         remove_idx_from_msu_list(thread, idx);
     158             :     } else {
     159           1 :         int rtn = try_destroy_msu(msu);
     160           1 :         if (rtn == 1) {
     161           0 :             struct ctrl_delete_msu_msg *msg_cpy = malloc(sizeof(*msg_cpy));
     162           0 :             memcpy(msg_cpy, msg, sizeof(*msg));
     163             : 
     164           0 :             struct thread_msg *thread_msg = construct_thread_msg(DELETE_MSU,
     165             :                                                                  sizeof(*msg),
     166             :                                                                  msg_cpy);
     167           0 :             thread_msg->ack_id = ack_id;
     168           0 :             rtn = enqueue_thread_msg(thread_msg, &thread->thread->queue);
     169           0 :             if (rtn < 0) {
     170           0 :                 log_error("Error re-enqueing delete MSU message");
     171             :             }
     172             :         } else {
     173           1 :             remove_idx_from_msu_list(thread, idx);
     174             :         }
     175             :     }
     176           1 :     return 0;
     177             : }
     178             : 
     179             : /** Modifies the MSU's routes, either adding or removing a route subscription */
     180           0 : static int worker_mod_msu_route(struct worker_thread *thread, struct ctrl_msu_route_msg *msg) {
     181           0 :     int idx = get_msu_index(thread, msg->msu_id);
     182           0 :     if (idx < 0) {
     183           0 :         log_error("MSU %d does not exist on thread %d", msg->msu_id, thread->thread->id);
     184           0 :         return -1;
     185             :     }
     186           0 :     struct local_msu *msu = thread->msus[idx];
     187             :     int rtn;
     188           0 :     switch (msg->type) {
     189             :         case ADD_ROUTE:
     190           0 :             rtn = add_route_to_set(&msu->routes, msg->route_id);
     191           0 :             if (rtn < 0) {
     192           0 :                 log_error("Error adding route %d to msu %d route set", msg->route_id, msg->msu_id);
     193           0 :                 return -1;
     194             :             }
     195           0 :             log(LOG_ROUTING_CHANGES, "Added route %d to msu %d route set", 
     196             :                 msg->route_id, msg->msu_id);
     197           0 :             return 0;
     198             :         case DEL_ROUTE:
     199           0 :             rtn = rm_route_from_set(&msu->routes, msg->route_id);
     200           0 :             if (rtn < 0) {
     201           0 :                 log_error("Error removing route %d from msu %d route set", msg->route_id, msg->msu_id);
     202           0 :                 return -1;
     203             :             }
     204           0 :             log(LOG_ROUTING_CHANGES, "Removed route %d from msu %d route set", 
     205             :                 msg->route_id, msg->msu_id);
     206           0 :             return 0;
     207             :         default:
     208           0 :             log_error("Unknown route message type: %d", msg->type);
     209           0 :             return -1;
     210             :     }
     211             : }
     212             : 
     213             : /** Checks whether the size of the message is equal to the size of the target struct */
     214             : #define CHECK_MSG_SIZE(msg, target) \
     215             :     if (msg->data_size != sizeof(target)) { \
     216             :         log_warn("Message data size does not match size" \
     217             :                  "of target type " #target ); \
     218             :         break; \
     219             :     }
     220             : 
     221             : /** Processes a message which has been sent to the worker thread  */
     222           0 : static int process_worker_thread_msg(struct worker_thread *thread, struct thread_msg *msg) {
     223           0 :     int rtn = -1;
     224           0 :     switch (msg->type) {
     225             :         case CREATE_MSU:
     226           0 :             CHECK_MSG_SIZE(msg, struct ctrl_create_msu_msg);
     227           0 :             struct ctrl_create_msu_msg *create_msg = msg->data;
     228           0 :             rtn = create_msu_on_thread(thread, create_msg);
     229           0 :             if (rtn < 0) {
     230           0 :                 log_error("Error creating MSU");
     231             :             }
     232           0 :             break;
     233             :         case DELETE_MSU:
     234           0 :             CHECK_MSG_SIZE(msg, struct ctrl_delete_msu_msg);
     235           0 :             struct ctrl_delete_msu_msg *del_msg = msg->data;
     236           0 :             rtn = del_msu_from_thread(thread, del_msg, msg->ack_id);
     237           0 :             if (rtn < 0) {
     238           0 :                 log_error("Error deleting MSU");
     239             :             }
     240           0 :             break;
     241             :         case MSU_ROUTE:
     242           0 :             CHECK_MSG_SIZE(msg, struct ctrl_msu_route_msg);
     243           0 :             struct ctrl_msu_route_msg *route_msg = msg->data;
     244           0 :             rtn = worker_mod_msu_route(thread, route_msg);
     245           0 :             if (rtn < 0) {
     246           0 :                 log_error("Error modifiying MSU route");
     247             :             }
     248           0 :             break;
     249             :         case CONNECT_TO_RUNTIME:
     250             :         case SEND_TO_PEER:
     251           0 :             log_error("Message (type %d) meant for main thread send to worker thread",
     252             :                       msg->type);
     253           0 :             break;
     254             :         default:
     255           0 :             log_error("Unknown message type %d delivered to worker thread %d",
     256             :                       msg->type, thread->thread->id);
     257           0 :             break;
     258             :     }
     259           0 :     if (msg->ack_id > 0) {
     260           0 :         send_ack_message(msg->ack_id, rtn == 0);
     261           0 :         log_warn("SENT ACK %d", msg->ack_id);
     262             :     }
     263           0 :     return rtn;
     264             : }
     265             : 
     266             : /** Default amount of time to wait before sem_trywait should return */
     267             : #define DEFAULT_WAIT_TIMEOUT_S 1
     268             : 
     269             : /** Returns the difference in time in seconds, t2 - t1 */
     270          10 : static double timediff_s(struct timespec *t1, struct timespec *t2) {
     271          10 :     return (double)(t2->tv_sec - t1->tv_sec) + (double)(t2->tv_nsec - t1->tv_nsec) * 1e-9;
     272             : }
     273             : 
     274             : /** Static structure for holding current time, so it can be returned from ::next_timeout */
     275             : static struct timespec cur_time;
     276             : 
     277             : /** Returns the next time at which the worker thread should exit its semaphore wait*/
     278           1 : static struct timespec *next_timeout(struct worker_thread *thread) {
     279           1 :     if (thread->timeouts == NULL) {
     280           0 :         return NULL;
     281             :     }
     282           1 :     struct timespec *time = &thread->timeouts->time;
     283           1 :     clock_gettime(CLOCK_REALTIME_COARSE, &cur_time);
     284           1 :     double diff_s = timediff_s(time, &cur_time);
     285           1 :     if (diff_s >= 0) {
     286           0 :         struct timeout_list *old = thread->timeouts;
     287           0 :         thread->timeouts = old->next;
     288           0 :         free(old);
     289           0 :         return &cur_time;
     290             :     }
     291           1 :     if (-diff_s > DEFAULT_WAIT_TIMEOUT_S) {
     292           0 :         return NULL;
     293             :     }
     294           1 :     cur_time = *time;
     295           1 :     struct timeout_list *old = thread->timeouts;
     296           1 :     thread->timeouts = old->next;
     297           1 :     free(old);
     298           1 :     return &cur_time;
     299             : }
     300             : 
     301           6 : int enqueue_worker_timeout(struct worker_thread *thread, struct timespec *interval) {
     302           6 :     struct timeout_list *tlist = calloc(1, sizeof(*tlist));
     303           6 :     clock_gettime(CLOCK_REALTIME, &tlist->time);
     304           6 :     tlist->time.tv_sec += interval->tv_sec;
     305           6 :     tlist->time.tv_nsec += interval->tv_nsec;
     306           6 :     if (tlist->time.tv_nsec > 1e9) {
     307           3 :         tlist->time.tv_nsec -= 1e9;
     308           3 :         tlist->time.tv_sec += 1;
     309             :     }
     310           6 :     if (thread->timeouts == NULL) {
     311           2 :         thread->timeouts = tlist;
     312           2 :         log(LOG_WORKER_THREAD, "Enqueued timeout to head of queue");
     313           2 :         return 0;
     314             :     }
     315             : 
     316           4 :     double diff = timediff_s(&tlist->time, &thread->timeouts->time);
     317           4 :     if (diff > 0) {
     318           0 :         tlist->next = thread->timeouts;
     319           0 :         thread->timeouts = tlist;
     320           0 :         log(LOG_WORKER_THREAD, "Enqueued timeout to queue");
     321           0 :         return 0;
     322             :     }
     323             : 
     324           4 :     struct timeout_list *last_timeout = thread->timeouts;
     325          11 :     while (last_timeout->next != NULL) {
     326           5 :        struct timespec *next_timeout = &last_timeout->next->time;
     327           5 :        diff = timediff_s(&tlist->time, next_timeout);
     328           5 :        if (diff > 0) {
     329           2 :            tlist->next = last_timeout->next;
     330           2 :            last_timeout->next = tlist;
     331           2 :             log(LOG_WORKER_THREAD, "Enqueued timeout to queue");
     332           2 :            return 0;
     333             :        }
     334           3 :        last_timeout = last_timeout->next;
     335             :     }
     336           2 :     last_timeout->next = tlist;
     337           2 :     tlist->next = NULL;
     338           2 :     log(LOG_WORKER_THREAD, "Enqueued timeout to queue");
     339           2 :     return 0;
     340             : }
     341             : 
     342             : 
     343             : 
     344             : /** The main worker thread loop. Checks for exit signal, processes messages */
     345           0 : static int worker_thread_loop(struct dedos_thread *thread, void *v_worker_thread) {
     346           0 :     log_info("Starting worker thread loop %d (%s)",
     347             :              thread->id, thread->mode == PINNED_THREAD ? "pinned" : "unpinned");
     348             : 
     349           0 :     struct worker_thread *self = v_worker_thread;
     350             : 
     351           0 :     while (!dedos_thread_should_exit(thread)) {
     352             :         // TODO: Get context switches
     353           0 :         if (thread_wait(thread, next_timeout(self)) != 0) {
     354           0 :             log_error("Error waiting on thread semaphore");
     355           0 :             continue;
     356             :         }
     357           0 :         for (int i=0; i<self->n_msus; i++) {
     358           0 :             log(LOG_MSU_DEQUEUES, "Attempting to dequeue from msu %d (thread %d)",
     359             :                        self->msus[i]->id, thread->id);
     360           0 :             msu_dequeue(self->msus[i]);
     361             :         }
     362             :         // FIXME: Protect read of num_msgs through mutex
     363           0 :         int num_msgs = thread->queue.num_msgs;
     364           0 :         for (int i=0; i<num_msgs; i++) {
     365           0 :             struct thread_msg *msg = dequeue_thread_msg(&thread->queue);
     366           0 :             if (msg == NULL) {
     367           0 :                 log_error("Could not read message though queue is not empty!");
     368           0 :                 continue;
     369             :             }
     370           0 :             log(LOG_THREAD_MESSAGES,"Dequeued thread message on thread %d",
     371             :                        thread->id);
     372           0 :             if (process_worker_thread_msg(self, msg) != 0) {
     373           0 :                 log_error("Error processing worker thread message");
     374             :             }
     375           0 :             free(msg);
     376             :         }
     377             :     }
     378           0 :     log_info("Leaving thread %d", thread->id);
     379           0 :     return 0;
     380             : }
     381             : 
     382           0 : int create_worker_thread(unsigned int thread_id,
     383             :                          enum blocking_mode mode) {
     384           0 :     if (worker_threads[thread_id] != NULL) {
     385           0 :         log_error("Worker thread %u already exists", thread_id);
     386           0 :         return -1;
     387             :     }
     388             : 
     389           0 :     struct dedos_thread *thread = malloc(sizeof(*thread));
     390           0 :     if (thread == NULL) {
     391           0 :         log_error("Error allocating worker thread");
     392           0 :         return -1;
     393             :     }
     394           0 :     int rtn = start_dedos_thread(worker_thread_loop,
     395             :                                  init_worker_thread,
     396             :                                  destroy_worker_thread,
     397             :                                  mode,
     398             :                                  thread_id,
     399             :                                  thread);
     400           0 :     if (rtn < 0) {
     401           0 :         log_error("Error starting dedos thread %d", thread_id);
     402           0 :         return -1;
     403             :     }
     404           0 :     log(LOG_THREAD_INITS, "Created worker thread %d", thread_id);
     405           0 :     return 0;
     406             : }
     407             : 

Generated by: LCOV version 1.10