LCOV - code coverage report
Current view: top level - runtime - dedos_threads.c (source / functions) Hit Total Coverage
Test: unnamed Lines: 107 134 79.9 %
Date: 2018-01-11 Functions: 11 11 100.0 %

          Line data    Source code
       1             : /*
       2             : START OF LICENSE STUB
       3             :     DeDOS: Declarative Dispersion-Oriented Software
       4             :     Copyright (C) 2017 University of Pennsylvania, Georgetown University
       5             : 
       6             :     This program is free software: you can redistribute it and/or modify
       7             :     it under the terms of the GNU General Public License as published by
       8             :     the Free Software Foundation, either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     This program is distributed in the hope that it will be useful,
      12             :     but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :     GNU General Public License for more details.
      15             : 
      16             :     You should have received a copy of the GNU General Public License
      17             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             : END OF LICENSE STUB
      19             : */
      20             : /**
      21             :  * @file dedos_threads.c
      22             :  *
      23             :  * Control spawned threads with message queue within DeDOS
      24             :  */
      25             : 
      26             : /** Needed for CPU_SET etc. */
      27             : #define _GNU_SOURCE
      28             : 
      29             : #include "dedos_threads.h"
      30             : #include "thread_message.h"
      31             : #include "output_thread.h"
      32             : #include "logging.h"
      33             : #include "rt_stats.h"
      34             : 
      35             : #include <stdbool.h>
      36             : #include <stdlib.h>
      37             : #include <sched.h>
      38             : #include <sys/resource.h>
      39             : 
      40             : /** The maximum identifier that can be used for a dedos_thread */
      41             : #define MAX_DEDOS_THREAD_ID 32
      42             : /** The maximum number of cores that can be present on a node */
      43             : #define MAX_CORES 16
      44             : 
      45             : /** The index at which to store the dedos_thread handling sending messages */
      46             : #define OUTPUT_THREAD_INDEX MAX_DEDOS_THREAD_ID + 1
      47             : 
      48             : /** Static structure to hold created dedos_thread's */
      49             : static struct dedos_thread *dedos_threads[MAX_DEDOS_THREAD_ID + 2];
      50             : /** Keep track of which cores have been assigned to threads */
      51             : static int pinned_cores[MAX_CORES];
      52             : 
      53             : enum stat_id thread_stat_items[] = {
      54             :     THREAD_UCPUTIME,
      55             :     THREAD_SCPUTIME,
      56             :     THREAD_MAXRSS,
      57             :     THREAD_MINFLT,
      58             :     THREAD_MAJFLT,
      59             :     THREAD_VCSW,
      60             :     THREAD_IVCSW
      61             : };
      62             : 
      63             : #define N_THREAD_STAT_ITEMS sizeof(thread_stat_items) / sizeof(*thread_stat_items)
      64             : 
      65             : /** Initilizes the stat items associated with a thread */
      66           6 : static inline void init_thread_stat_items(int id) {
      67          48 :     for (int i=0; i < N_THREAD_STAT_ITEMS; i++) {
      68          42 :         init_stat_item(thread_stat_items[i], id);
      69             :     }
      70           6 : }
      71             : 
      72           3 : struct dedos_thread *get_dedos_thread(int id) {
      73           3 :     if (id == OUTPUT_THREAD_ID) {
      74           0 :         id = MAX_DEDOS_THREAD_ID;
      75           3 :     } else if (id < 0) {
      76           0 :         log_error("Dedos thread ID cannot be negative! Provided: %d", id);
      77           0 :         return NULL;
      78           3 :     } else if (id > MAX_DEDOS_THREAD_ID) {
      79           0 :         log_error("Requested thread ID too high. Maximum is %d", MAX_DEDOS_THREAD_ID);
      80           0 :         return NULL;
      81             :     }
      82           3 :     if (dedos_threads[id] == NULL) {
      83           1 :         log_error("Dedos thread with id %d is not initialized", id);
      84           1 :         return NULL;
      85             :     }
      86           2 :     return dedos_threads[id];
      87             : }
      88             : 
      89             : /**
      90             :  * Initializes a dedos_thread structure to contain the appropriate fields
      91             :  */
      92           6 : static int init_dedos_thread(struct dedos_thread *thread,
      93             :                       enum thread_mode mode,
      94             :                       int id) {
      95           6 :     thread->id = id;
      96           6 :     thread->mode = mode;
      97           6 :     sem_init(&thread->sem, 0, 0);
      98           6 :     pthread_mutex_init(&thread->exit_lock, NULL);
      99           6 :     thread->exit_signal = 0;
     100             : 
     101           6 :     init_thread_stat_items(thread->id);
     102             : 
     103             : 
     104           6 :     if (init_msg_queue(&thread->queue, &thread->sem) != 0) {
     105           0 :         log_error("Error initializing message queue for dedos thread");
     106           0 :         return -1;
     107             :     }
     108           6 :     log(LOG_DEDOS_THREADS, "Initialized thread %d (mode: %s, addr: %p)",
     109             :                id, mode == PINNED_THREAD ? "pinned" : "unpinned", thread);
     110           6 :     if (id == OUTPUT_THREAD_ID) {
     111           0 :         id = OUTPUT_THREAD_INDEX;
     112             :     }
     113           6 :     dedos_threads[id] = thread;
     114           6 :     return 0;
     115             : }
     116             : 
     117             : /** Structure which holds the initialization info for a dedos_thread */
     118             : struct thread_init {
     119             :     dedos_thread_fn thread_fn;
     120             :     dedos_thread_init_fn init_fn;
     121             :     dedos_thread_destroy_fn destroy_fn;
     122             :     struct dedos_thread *self;
     123             :     sem_t sem;
     124             : };
     125             : 
     126             : /** Pins the thread with the pthread id `ptid` to the first unused core */
     127           3 : static int pin_thread(pthread_t ptid) {
     128             :     int cpu_id;
     129           3 :     int num_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     130           3 :     for (cpu_id = 0; cpu_id < num_cpu && pinned_cores[cpu_id] == 1; cpu_id++);
     131           3 :     if (cpu_id == num_cpu) {
     132           1 :         log_warn("No cores available to pin thread");
     133           1 :         return -1;
     134             :     }
     135             : 
     136             :     cpu_set_t cpuset;
     137           2 :     CPU_ZERO(&cpuset);
     138           2 :     CPU_SET(cpu_id, &cpuset);
     139           2 :     int s = pthread_setaffinity_np(ptid, sizeof(cpuset), &cpuset);
     140           2 :     if (s != 0) {
     141           0 :         log_warn("pthread_setaffinity_np returned error %d", s);
     142           0 :         return -1;
     143             :     }
     144           2 :     pinned_cores[cpu_id] = 1;
     145           2 :     log(LOG_DEDOS_THREADS, "Successfully pinned pthread %d", (int)ptid);
     146           2 :     return 0;
     147             : }
     148             : 
     149           2 : void dedos_thread_stop(struct dedos_thread *thread) {
     150           2 :     log_info("Signaling thread %d to exit", thread->id);
     151           2 :     pthread_mutex_lock(&thread->exit_lock);
     152           2 :     thread->exit_signal = 1;
     153           2 :     sem_post(&thread->sem);
     154           2 :     pthread_mutex_unlock(&thread->exit_lock);
     155           2 : }
     156             : 
     157             : 
     158           2 : void dedos_thread_join(struct dedos_thread *thread) {
     159           2 :     pthread_join(thread->pthread, NULL);
     160           2 :     free(thread);
     161           2 : }
     162             : 
     163           4 : int dedos_thread_should_exit(struct dedos_thread *thread) {
     164           4 :     pthread_mutex_lock(&thread->exit_lock);
     165           4 :     int exit_signal = thread->exit_signal;
     166           4 :     pthread_mutex_unlock(&thread->exit_lock);
     167           4 :     return exit_signal;
     168             : }
     169             : 
     170             : /**
     171             :  * The actual function passed to pthread_create() that starts a new thread.
     172             :  * Initilizes the appropriate structures, posts to the start semaphore,
     173             :  * then calls the appropriate function.
     174             :  * @param thread_init_v Pointer to the thread_init structure
     175             :  */
     176           5 : static void *dedos_thread_starter(void *thread_init_v) {
     177           5 :     struct thread_init *init = thread_init_v;
     178             : 
     179             :     // Have to get things out of the thread_init, because
     180             :     // it will be destroyed externally once sem_post() is complete
     181           5 :     struct dedos_thread *thread = init->self;
     182           5 :     dedos_thread_fn thread_fn = init->thread_fn;
     183           5 :     dedos_thread_destroy_fn destroy_fn = init->destroy_fn;
     184             : 
     185           5 :     void *init_rtn = NULL;
     186           5 :     if (init->init_fn) {
     187           5 :         init_rtn = init->init_fn(thread);
     188             :     }
     189             : 
     190           5 :     if (thread->mode == PINNED_THREAD) {
     191           3 :         if (pin_thread(thread->pthread) != 0) {
     192           1 :             log_warn("Could not pin thread %d", thread->id);
     193             :         }
     194             :     }
     195             : 
     196           5 :     sem_post(&init->sem);
     197           5 :     log(LOG_DEDOS_THREADS, "Started thread %d (mode: %s, addr: %p)",
     198             :                thread->id, thread-> mode == PINNED_THREAD ? "pinned" : "unpinned", thread);
     199             : 
     200           5 :     int rtn = thread_fn(thread, init_rtn);
     201           5 :     log(LOG_DEDOS_THREADS, "Thread %d ended.", thread->id);
     202             : 
     203           5 :     if (destroy_fn) {
     204           5 :         destroy_fn(thread, init_rtn);
     205             :     }
     206             : 
     207           5 :     return (void*)(intptr_t)rtn;
     208             : }
     209             : 
     210           2 : static inline void gather_thread_metrics(struct dedos_thread *thread) {
     211             :     struct rusage usage;
     212           2 :     int rtn = getrusage(RUSAGE_THREAD, &usage);
     213           2 :     if (rtn < 0) {
     214           0 :         log_error("Error getting thread %d rusage", thread->id);
     215           2 :         return;
     216             :     }
     217           2 :     int id = thread->id;
     218           2 :     record_stat(THREAD_UCPUTIME, id, (double)usage.ru_utime.tv_sec + usage.ru_utime.tv_usec * 1e-6, 1);
     219           2 :     record_stat(THREAD_SCPUTIME, id, (double)usage.ru_stime.tv_sec + usage.ru_stime.tv_usec * 1e-6, 1);
     220           2 :     record_stat(THREAD_MAXRSS, id, usage.ru_maxrss, 0);
     221           2 :     record_stat(THREAD_MINFLT, id, usage.ru_minflt, 0);
     222           2 :     record_stat(THREAD_MAJFLT, id, usage.ru_majflt, 0);
     223           2 :     record_stat(THREAD_VCSW, id, usage.ru_nvcsw, 1);
     224           2 :     record_stat(THREAD_IVCSW, id, usage.ru_nivcsw, 1);
     225             : }
     226             : 
     227             : 
     228             : 
     229             : /** The amount of time that ::thread_wait should wait for if no timeout is provided */
     230             : #define DEFAULT_WAIT_TIMEOUT_S 1
     231             : 
     232             : #define MAX_METRIC_INTERVAL_MS 500
     233             : 
     234           2 : int thread_wait(struct dedos_thread *thread, struct timespec *abs_timeout) {
     235             : 
     236           2 :     int rtn = sem_trywait(&thread->sem);
     237             : 
     238           2 :     if (rtn == 0) {
     239             :         struct timespec cur_time;
     240           0 :         clock_gettime(CLOCK_REALTIME_COARSE, &cur_time);
     241           0 :         if (cur_time.tv_sec * 1e3 + cur_time.tv_nsec / 1e6 > \
     242           0 :                 thread->last_metric.tv_sec * 1e3 + thread->last_metric.tv_nsec / 1e6 + \
     243             :                 MAX_METRIC_INTERVAL_MS) {
     244           0 :             gather_thread_metrics(thread);
     245           0 :             thread->last_metric = cur_time;
     246             :         }
     247           0 :         return 0;
     248           2 :     } else if (rtn == -1 && errno == EAGAIN) {
     249           2 :         gather_thread_metrics(thread);
     250           2 :         clock_gettime(CLOCK_REALTIME_COARSE, &thread->last_metric);
     251             :     }
     252             : 
     253           2 :     if (abs_timeout == NULL) {
     254             :         struct timespec cur_time;
     255           2 :         clock_gettime(CLOCK_REALTIME_COARSE, &cur_time);
     256           2 :         cur_time.tv_sec += DEFAULT_WAIT_TIMEOUT_S;
     257           2 :         rtn = sem_timedwait(&thread->sem, &cur_time);
     258             :     } else {
     259           0 :         rtn = sem_timedwait(&thread->sem, abs_timeout);
     260             :     }
     261           2 :     if (rtn < 0 && errno != ETIMEDOUT) {
     262           0 :         log_perror("Error waiting on thread semaphore");
     263           0 :         exit(-1);
     264             :         return -1;
     265             :     }
     266           2 :     return 0;
     267             : }
     268             : 
     269           5 : int start_dedos_thread(dedos_thread_fn thread_fn,
     270             :                        dedos_thread_init_fn init_fn,
     271             :                        dedos_thread_destroy_fn destroy_fn,
     272             :                        enum blocking_mode mode,
     273             :                        int id,
     274             :                        struct dedos_thread *thread) {
     275           5 :     if (id == OUTPUT_THREAD_ID) {
     276           0 :         id = MAX_DEDOS_THREAD_ID;
     277             :     }
     278           5 :     int rtn = init_dedos_thread(thread, mode, id);
     279           5 :     struct thread_init init = {
     280             :         .thread_fn = thread_fn,
     281             :         .init_fn = init_fn,
     282             :         .destroy_fn = destroy_fn,
     283             :         .self = thread
     284             :     };
     285           5 :     if (sem_init(&init.sem, 0, 0)) {
     286           0 :         log_perror("Error initializing semaphore for dedos thread");
     287           0 :         return -1;
     288             :     }
     289           5 :     log(LOG_DEDOS_THREADS, "Waiting on thread %d to start", id);
     290           5 :     rtn = pthread_create(&thread->pthread, NULL,
     291             :                          dedos_thread_starter, (void*)&init);
     292           5 :     if (rtn < 0) {
     293           0 :         log_error("pthread_create failed with errno: %d", rtn);
     294           0 :         return -1;
     295             :     }
     296           5 :     if (sem_wait(&init.sem) != 0) {
     297           0 :         log_perror("Error waiting on thread start semaphore");
     298           0 :         return -1;
     299             :     }
     300           5 :     log(LOG_DEDOS_THREADS, "Thread %d started successfully", id);
     301           5 :     sem_destroy(&init.sem);
     302           5 :     return 0;
     303             : }

Generated by: LCOV version 1.10