38 #include <sys/resource.h>
41 #define MAX_DEDOS_THREAD_ID 32
46 #define OUTPUT_THREAD_INDEX MAX_DEDOS_THREAD_ID + 1
63 #define N_THREAD_STAT_ITEMS sizeof(thread_stat_items) / sizeof(*thread_stat_items)
76 log_error(
"Dedos thread ID cannot be negative! Provided: %d",
id);
82 if (dedos_threads[
id] == NULL) {
83 log_error(
"Dedos thread with id %d is not initialized",
id);
86 return dedos_threads[
id];
97 sem_init(&thread->
sem, 0, 0);
98 pthread_mutex_init(&thread->
exit_lock, NULL);
105 log_error(
"Error initializing message queue for dedos thread");
108 log(LOG_DEDOS_THREADS,
"Initialized thread %d (mode: %s, addr: %p)",
113 dedos_threads[
id] = thread;
129 int num_cpu = sysconf(_SC_NPROCESSORS_ONLN);
130 for (cpu_id = 0; cpu_id < num_cpu &&
pinned_cores[cpu_id] == 1; cpu_id++);
131 if (cpu_id == num_cpu) {
132 log_warn(
"No cores available to pin thread");
138 CPU_SET(cpu_id, &cpuset);
139 int s = pthread_setaffinity_np(ptid,
sizeof(cpuset), &cpuset);
141 log_warn(
"pthread_setaffinity_np returned error %d", s);
144 pinned_cores[cpu_id] = 1;
145 log(LOG_DEDOS_THREADS,
"Successfully pinned pthread %d", (
int)ptid);
150 log_info(
"Signaling thread %d to exit", thread->
id);
153 sem_post(&thread->
sem);
154 pthread_mutex_unlock(&thread->
exit_lock);
159 pthread_join(thread->
pthread, NULL);
166 pthread_mutex_unlock(&thread->
exit_lock);
185 void *init_rtn = NULL;
187 init_rtn = init->
init_fn(thread);
192 log_warn(
"Could not pin thread %d", thread->
id);
196 sem_post(&init->
sem);
197 log(LOG_DEDOS_THREADS,
"Started thread %d (mode: %s, addr: %p)",
200 int rtn = thread_fn(thread, init_rtn);
201 log(LOG_DEDOS_THREADS,
"Thread %d ended.", thread->
id);
204 destroy_fn(thread, init_rtn);
207 return (
void*)(intptr_t)rtn;
212 int rtn = getrusage(RUSAGE_THREAD, &usage);
214 log_error(
"Error getting thread %d rusage", thread->
id);
230 #define DEFAULT_WAIT_TIMEOUT_S 1
232 #define MAX_METRIC_INTERVAL_MS 500
236 int rtn = sem_trywait(&thread->
sem);
239 struct timespec cur_time;
240 clock_gettime(CLOCK_REALTIME_COARSE, &cur_time);
241 if (cur_time.tv_sec * 1e3 + cur_time.tv_nsec / 1e6 > \
248 }
else if (rtn == -1 && errno == EAGAIN) {
250 clock_gettime(CLOCK_REALTIME_COARSE, &thread->
last_metric);
253 if (abs_timeout == NULL) {
254 struct timespec cur_time;
255 clock_gettime(CLOCK_REALTIME_COARSE, &cur_time);
257 rtn = sem_timedwait(&thread->
sem, &cur_time);
259 rtn = sem_timedwait(&thread->
sem, abs_timeout);
261 if (rtn < 0 && errno != ETIMEDOUT) {
262 log_perror(
"Error waiting on thread semaphore");
285 if (sem_init(&init.
sem, 0, 0)) {
286 log_perror(
"Error initializing semaphore for dedos thread");
289 log(LOG_DEDOS_THREADS,
"Waiting on thread %d to start",
id);
290 rtn = pthread_create(&thread->
pthread, NULL,
293 log_error(
"pthread_create failed with errno: %d", rtn);
296 if (sem_wait(&init.
sem) != 0) {
297 log_perror(
"Error waiting on thread start semaphore");
300 log(LOG_DEDOS_THREADS,
"Thread %d started successfully",
id);
301 sem_destroy(&init.
sem);
dedos_thread_fn thread_fn
Collecting statistics within the runtime.
static int pin_thread(pthread_t ptid)
Pins the thread with the pthread id ptid to the first unused core.
thread_mode
Identifies if a thread is pinned to a core or able to be scheduled on any core.
Messages to be delivered to dedos_threads.
#define log_info(fmt,...)
#define MAX_CORES
The maximum number of cores that can be present on a node.
static void gather_thread_metrics(struct dedos_thread *thread)
dedos_thread_init_fn init_fn
int exit_signal
For checking if thread should exit.
struct timespec last_metric
For logging thread metrics.
static void init_thread_stat_items(int id)
Initilizes the stat items associated with a thread.
enum thread_mode mode
[un]pinned
stat_id
The identifiers with which stats can be logged.
pthread_mutex_t exit_lock
For checking if thread should exit.
pthread_t pthread
The underlying pthread.
#define log_perror(fmt,...)
static struct timespec cur_time
Static structure for holding current time, so it can be returned from next_timeout.
Logging of status messages to the terminal.
int thread_wait(struct dedos_thread *thread, struct timespec *abs_timeout)
To be called from the thread, causes it to wait until a message has been received or the timeout has ...
#define OUTPUT_THREAD_INDEX
The index at which to store the dedos_thread handling sending messages.
Control spawned threads with message queue within DeDOS.
static int pinned_cores[16]
Keep track of which cores have been assigned to threads.
void dedos_thread_stop(struct dedos_thread *thread)
Sets the exit signal for a thread, causing the main loop to quit.
#define log_error(fmt,...)
int init_stat_item(enum stat_id stat_id, unsigned int item_id)
Initializes a stat item so that statistics can be logged to it.
int id
A unique identifier for the thread.
blocking_mode
Whether an MSU is blocking or non-blocking.
Structure which holds the initialization info for a dedos_thread.
A dedos_thread which monitors a queue for output to be sent to other runtimes or the global controlle...
static void * dedos_thread_starter(void *thread_init_v)
The actual function passed to pthread_create() that starts a new thread.
struct dedos_thread * get_dedos_thread(int id)
Returns the dedos_thread with the given ID.
void(* dedos_thread_destroy_fn)(struct dedos_thread *thread, void *init_output)
Typedef for the destructor function for a dedos_thread.
dedos_thread_destroy_fn destroy_fn
#define DEFAULT_WAIT_TIMEOUT_S
The amount of time that thread_wait should wait for if no timeout is provided.
int record_stat(enum stat_id stat_id, unsigned int item_id, double stat, bool relog)
Records a statistic in the statlog.
#define MAX_METRIC_INTERVAL_MS
void dedos_thread_join(struct dedos_thread *thread)
Joins and destroys the dedos_thread.
int dedos_thread_should_exit(struct dedos_thread *thread)
Returns 1 if the exit signal has been triggered, otherwise 0.
#define MAX_DEDOS_THREAD_ID
The maximum identifier that can be used for a dedos_thread.
#define N_THREAD_STAT_ITEMS
#define log(level, fmt,...)
Log at a custom level.
sem_t sem
Locks thread until a message is available.
int(* dedos_thread_fn)(struct dedos_thread *thread, void *init_output)
Typedef for the function that should be called on a dedos_thread.
int start_dedos_thread(dedos_thread_fn thread_fn, dedos_thread_init_fn init_fn, dedos_thread_destroy_fn destroy_fn, enum blocking_mode mode, int id, struct dedos_thread *thread)
Initilizes and starts execution of a dedos_thread.
static struct dedos_thread * dedos_threads[32+2]
Static structure to hold created dedos_thread's.
#define OUTPUT_THREAD_ID
A thread_msg marked for delivery to this thread ID will be enqueued to the output thread...
int init_msg_queue(struct msg_queue *q, sem_t *sem)
Initilializes a mesasge queue to have no messages in it, and sets up the mutex and semaphore...
struct msg_queue queue
Queue for incoming message.
void *(* dedos_thread_init_fn)(struct dedos_thread *thread)
Typedef for an initialization function for a dedos_thread.
#define log_warn(fmt,...)
struct dedos_thread * self
enum stat_id thread_stat_items[]
static int init_dedos_thread(struct dedos_thread *thread, enum thread_mode mode, int id)
Initializes a dedos_thread structure to contain the appropriate fields.
Structure representing any thread within DeDOS.