My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
dedos_threads.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
27 #define _GNU_SOURCE
28 
29 #include "dedos_threads.h"
30 #include "thread_message.h"
31 #include "output_thread.h"
32 #include "logging.h"
33 #include "rt_stats.h"
34 
35 #include <stdbool.h>
36 #include <stdlib.h>
37 #include <sched.h>
38 #include <sys/resource.h>
39 
41 #define MAX_DEDOS_THREAD_ID 32
42 
43 #define MAX_CORES 16
44 
46 #define OUTPUT_THREAD_INDEX MAX_DEDOS_THREAD_ID + 1
47 
51 static int pinned_cores[MAX_CORES];
52 
61 };
62 
63 #define N_THREAD_STAT_ITEMS sizeof(thread_stat_items) / sizeof(*thread_stat_items)
64 
66 static inline void init_thread_stat_items(int id) {
67  for (int i=0; i < N_THREAD_STAT_ITEMS; i++) {
69  }
70 }
71 
72 struct dedos_thread *get_dedos_thread(int id) {
73  if (id == OUTPUT_THREAD_ID) {
75  } else if (id < 0) {
76  log_error("Dedos thread ID cannot be negative! Provided: %d", id);
77  return NULL;
78  } else if (id > MAX_DEDOS_THREAD_ID) {
79  log_error("Requested thread ID too high. Maximum is %d", MAX_DEDOS_THREAD_ID);
80  return NULL;
81  }
82  if (dedos_threads[id] == NULL) {
83  log_error("Dedos thread with id %d is not initialized", id);
84  return NULL;
85  }
86  return dedos_threads[id];
87 }
88 
92 static int init_dedos_thread(struct dedos_thread *thread,
93  enum thread_mode mode,
94  int id) {
95  thread->id = id;
96  thread->mode = mode;
97  sem_init(&thread->sem, 0, 0);
98  pthread_mutex_init(&thread->exit_lock, NULL);
99  thread->exit_signal = 0;
100 
101  init_thread_stat_items(thread->id);
102 
103 
104  if (init_msg_queue(&thread->queue, &thread->sem) != 0) {
105  log_error("Error initializing message queue for dedos thread");
106  return -1;
107  }
108  log(LOG_DEDOS_THREADS, "Initialized thread %d (mode: %s, addr: %p)",
109  id, mode == PINNED_THREAD ? "pinned" : "unpinned", thread);
110  if (id == OUTPUT_THREAD_ID) {
111  id = OUTPUT_THREAD_INDEX;
112  }
113  dedos_threads[id] = thread;
114  return 0;
115 }
116 
118 struct thread_init {
122  struct dedos_thread *self;
123  sem_t sem;
124 };
125 
127 static int pin_thread(pthread_t ptid) {
128  int cpu_id;
129  int num_cpu = sysconf(_SC_NPROCESSORS_ONLN);
130  for (cpu_id = 0; cpu_id < num_cpu && pinned_cores[cpu_id] == 1; cpu_id++);
131  if (cpu_id == num_cpu) {
132  log_warn("No cores available to pin thread");
133  return -1;
134  }
135 
136  cpu_set_t cpuset;
137  CPU_ZERO(&cpuset);
138  CPU_SET(cpu_id, &cpuset);
139  int s = pthread_setaffinity_np(ptid, sizeof(cpuset), &cpuset);
140  if (s != 0) {
141  log_warn("pthread_setaffinity_np returned error %d", s);
142  return -1;
143  }
144  pinned_cores[cpu_id] = 1;
145  log(LOG_DEDOS_THREADS, "Successfully pinned pthread %d", (int)ptid);
146  return 0;
147 }
148 
149 void dedos_thread_stop(struct dedos_thread *thread) {
150  log_info("Signaling thread %d to exit", thread->id);
151  pthread_mutex_lock(&thread->exit_lock);
152  thread->exit_signal = 1;
153  sem_post(&thread->sem);
154  pthread_mutex_unlock(&thread->exit_lock);
155 }
156 
157 
158 void dedos_thread_join(struct dedos_thread *thread) {
159  pthread_join(thread->pthread, NULL);
160  free(thread);
161 }
162 
164  pthread_mutex_lock(&thread->exit_lock);
165  int exit_signal = thread->exit_signal;
166  pthread_mutex_unlock(&thread->exit_lock);
167  return exit_signal;
168 }
169 
176 static void *dedos_thread_starter(void *thread_init_v) {
177  struct thread_init *init = thread_init_v;
178 
179  // Have to get things out of the thread_init, because
180  // it will be destroyed externally once sem_post() is complete
181  struct dedos_thread *thread = init->self;
182  dedos_thread_fn thread_fn = init->thread_fn;
183  dedos_thread_destroy_fn destroy_fn = init->destroy_fn;
184 
185  void *init_rtn = NULL;
186  if (init->init_fn) {
187  init_rtn = init->init_fn(thread);
188  }
189 
190  if (thread->mode == PINNED_THREAD) {
191  if (pin_thread(thread->pthread) != 0) {
192  log_warn("Could not pin thread %d", thread->id);
193  }
194  }
195 
196  sem_post(&init->sem);
197  log(LOG_DEDOS_THREADS, "Started thread %d (mode: %s, addr: %p)",
198  thread->id, thread-> mode == PINNED_THREAD ? "pinned" : "unpinned", thread);
199 
200  int rtn = thread_fn(thread, init_rtn);
201  log(LOG_DEDOS_THREADS, "Thread %d ended.", thread->id);
202 
203  if (destroy_fn) {
204  destroy_fn(thread, init_rtn);
205  }
206 
207  return (void*)(intptr_t)rtn;
208 }
209 
210 static inline void gather_thread_metrics(struct dedos_thread *thread) {
211  struct rusage usage;
212  int rtn = getrusage(RUSAGE_THREAD, &usage);
213  if (rtn < 0) {
214  log_error("Error getting thread %d rusage", thread->id);
215  return;
216  }
217  int id = thread->id;
218  record_stat(THREAD_UCPUTIME, id, (double)usage.ru_utime.tv_sec + usage.ru_utime.tv_usec * 1e-6, 1);
219  record_stat(THREAD_SCPUTIME, id, (double)usage.ru_stime.tv_sec + usage.ru_stime.tv_usec * 1e-6, 1);
220  record_stat(THREAD_MAXRSS, id, usage.ru_maxrss, 0);
221  record_stat(THREAD_MINFLT, id, usage.ru_minflt, 0);
222  record_stat(THREAD_MAJFLT, id, usage.ru_majflt, 0);
223  record_stat(THREAD_VCSW, id, usage.ru_nvcsw, 1);
224  record_stat(THREAD_IVCSW, id, usage.ru_nivcsw, 1);
225 }
226 
227 
228 
230 #define DEFAULT_WAIT_TIMEOUT_S 1
231 
232 #define MAX_METRIC_INTERVAL_MS 500
233 
234 int thread_wait(struct dedos_thread *thread, struct timespec *abs_timeout) {
235 
236  int rtn = sem_trywait(&thread->sem);
237 
238  if (rtn == 0) {
239  struct timespec cur_time;
240  clock_gettime(CLOCK_REALTIME_COARSE, &cur_time);
241  if (cur_time.tv_sec * 1e3 + cur_time.tv_nsec / 1e6 > \
242  thread->last_metric.tv_sec * 1e3 + thread->last_metric.tv_nsec / 1e6 + \
244  gather_thread_metrics(thread);
245  thread->last_metric = cur_time;
246  }
247  return 0;
248  } else if (rtn == -1 && errno == EAGAIN) {
249  gather_thread_metrics(thread);
250  clock_gettime(CLOCK_REALTIME_COARSE, &thread->last_metric);
251  }
252 
253  if (abs_timeout == NULL) {
254  struct timespec cur_time;
255  clock_gettime(CLOCK_REALTIME_COARSE, &cur_time);
256  cur_time.tv_sec += DEFAULT_WAIT_TIMEOUT_S;
257  rtn = sem_timedwait(&thread->sem, &cur_time);
258  } else {
259  rtn = sem_timedwait(&thread->sem, abs_timeout);
260  }
261  if (rtn < 0 && errno != ETIMEDOUT) {
262  log_perror("Error waiting on thread semaphore");
263  exit(-1);
264  return -1;
265  }
266  return 0;
267 }
268 
270  dedos_thread_init_fn init_fn,
271  dedos_thread_destroy_fn destroy_fn,
272  enum blocking_mode mode,
273  int id,
274  struct dedos_thread *thread) {
275  if (id == OUTPUT_THREAD_ID) {
276  id = MAX_DEDOS_THREAD_ID;
277  }
278  int rtn = init_dedos_thread(thread, mode, id);
279  struct thread_init init = {
280  .thread_fn = thread_fn,
281  .init_fn = init_fn,
282  .destroy_fn = destroy_fn,
283  .self = thread
284  };
285  if (sem_init(&init.sem, 0, 0)) {
286  log_perror("Error initializing semaphore for dedos thread");
287  return -1;
288  }
289  log(LOG_DEDOS_THREADS, "Waiting on thread %d to start", id);
290  rtn = pthread_create(&thread->pthread, NULL,
291  dedos_thread_starter, (void*)&init);
292  if (rtn < 0) {
293  log_error("pthread_create failed with errno: %d", rtn);
294  return -1;
295  }
296  if (sem_wait(&init.sem) != 0) {
297  log_perror("Error waiting on thread start semaphore");
298  return -1;
299  }
300  log(LOG_DEDOS_THREADS, "Thread %d started successfully", id);
301  sem_destroy(&init.sem);
302  return 0;
303 }
dedos_thread_fn thread_fn
Collecting statistics within the runtime.
static int pin_thread(pthread_t ptid)
Pins the thread with the pthread id ptid to the first unused core.
thread_mode
Identifies if a thread is pinned to a core or able to be scheduled on any core.
Definition: dfg.h:91
Messages to be delivered to dedos_threads.
#define log_info(fmt,...)
Definition: logging.h:88
#define MAX_CORES
The maximum number of cores that can be present on a node.
Definition: dedos_threads.c:43
static void gather_thread_metrics(struct dedos_thread *thread)
dedos_thread_init_fn init_fn
int exit_signal
For checking if thread should exit.
Definition: dedos_threads.h:49
struct timespec last_metric
For logging thread metrics.
Definition: dedos_threads.h:51
static void init_thread_stat_items(int id)
Initilizes the stat items associated with a thread.
Definition: dedos_threads.c:66
enum thread_mode mode
[un]pinned
Definition: dedos_threads.h:41
stat_id
The identifiers with which stats can be logged.
Definition: stat_ids.h:32
pthread_mutex_t exit_lock
For checking if thread should exit.
Definition: dedos_threads.h:47
pthread_t pthread
The underlying pthread.
Definition: dedos_threads.h:37
#define log_perror(fmt,...)
Definition: logging.h:102
static struct timespec cur_time
Static structure for holding current time, so it can be returned from next_timeout.
Logging of status messages to the terminal.
int thread_wait(struct dedos_thread *thread, struct timespec *abs_timeout)
To be called from the thread, causes it to wait until a message has been received or the timeout has ...
#define OUTPUT_THREAD_INDEX
The index at which to store the dedos_thread handling sending messages.
Definition: dedos_threads.c:46
Control spawned threads with message queue within DeDOS.
static int pinned_cores[16]
Keep track of which cores have been assigned to threads.
Definition: dedos_threads.c:51
void dedos_thread_stop(struct dedos_thread *thread)
Sets the exit signal for a thread, causing the main loop to quit.
#define log_error(fmt,...)
Definition: logging.h:101
int init_stat_item(enum stat_id stat_id, unsigned int item_id)
Initializes a stat item so that statistics can be logged to it.
Definition: rt_stats.c:719
int id
A unique identifier for the thread.
Definition: dedos_threads.h:39
blocking_mode
Whether an MSU is blocking or non-blocking.
Definition: dfg.h:161
Structure which holds the initialization info for a dedos_thread.
A dedos_thread which monitors a queue for output to be sent to other runtimes or the global controlle...
static void * dedos_thread_starter(void *thread_init_v)
The actual function passed to pthread_create() that starts a new thread.
struct dedos_thread * get_dedos_thread(int id)
Returns the dedos_thread with the given ID.
Definition: dedos_threads.c:72
void(* dedos_thread_destroy_fn)(struct dedos_thread *thread, void *init_output)
Typedef for the destructor function for a dedos_thread.
Definition: dedos_threads.h:77
dedos_thread_destroy_fn destroy_fn
#define DEFAULT_WAIT_TIMEOUT_S
The amount of time that thread_wait should wait for if no timeout is provided.
int record_stat(enum stat_id stat_id, unsigned int item_id, double stat, bool relog)
Records a statistic in the statlog.
Definition: rt_stats.c:426
#define MAX_METRIC_INTERVAL_MS
void dedos_thread_join(struct dedos_thread *thread)
Joins and destroys the dedos_thread.
int dedos_thread_should_exit(struct dedos_thread *thread)
Returns 1 if the exit signal has been triggered, otherwise 0.
#define MAX_DEDOS_THREAD_ID
The maximum identifier that can be used for a dedos_thread.
Definition: dedos_threads.c:41
#define N_THREAD_STAT_ITEMS
Definition: dedos_threads.c:63
#define log(level, fmt,...)
Log at a custom level.
Definition: logging.h:147
sem_t sem
Locks thread until a message is available.
Definition: dedos_threads.h:45
int(* dedos_thread_fn)(struct dedos_thread *thread, void *init_output)
Typedef for the function that should be called on a dedos_thread.
Definition: dedos_threads.h:70
int start_dedos_thread(dedos_thread_fn thread_fn, dedos_thread_init_fn init_fn, dedos_thread_destroy_fn destroy_fn, enum blocking_mode mode, int id, struct dedos_thread *thread)
Initilizes and starts execution of a dedos_thread.
static struct dedos_thread * dedos_threads[32+2]
Static structure to hold created dedos_thread's.
Definition: dedos_threads.c:49
#define OUTPUT_THREAD_ID
A thread_msg marked for delivery to this thread ID will be enqueued to the output thread...
Definition: output_thread.h:35
int init_msg_queue(struct msg_queue *q, sem_t *sem)
Initilializes a mesasge queue to have no messages in it, and sets up the mutex and semaphore...
struct msg_queue queue
Queue for incoming message.
Definition: dedos_threads.h:43
void *(* dedos_thread_init_fn)(struct dedos_thread *thread)
Typedef for an initialization function for a dedos_thread.
Definition: dedos_threads.h:62
#define log_warn(fmt,...)
Definition: logging.h:113
struct dedos_thread * self
enum stat_id thread_stat_items[]
Definition: dedos_threads.c:53
static int init_dedos_thread(struct dedos_thread *thread, enum thread_mode mode, int id)
Initializes a dedos_thread structure to contain the appropriate fields.
Definition: dedos_threads.c:92
Structure representing any thread within DeDOS.
Definition: dedos_threads.h:35