36 #define MAX_DEDOS_THREAD_ID 32
45 log_error(
"Error allocating worker thread");
50 worker_threads[thread->
id] = worker;
58 if (worker_threads[i] != NULL) {
59 d_threads[n_threads] = worker_threads[i]->
thread;
64 for (
int i=0; i<n_threads; i++) {
72 for (
int i=0; i < wthread->
n_msus; i++) {
75 worker_threads[thread->
id] = NULL;
76 free(v_worker_thread);
84 return worker_threads[id];
89 for (
int i=0; i<thread->
n_msus; i++) {
90 if (thread->
msus[i]->
id == msu_id) {
99 if (idx >= thread->
n_msus) {
102 for (
int i=idx; i<thread->
n_msus - 1; i++) {
103 thread->
msus[i] = thread->
msus[i+1];
126 log(LOG_MSU_REGISTRATION,
"Registered msu %d with thread", msu->
id);
134 log_error(
"Failed to create MSU %d. Cannot retrieve type", msg->
msu_id);
139 log_error(
"Error creating MSU %d. Not placing on thread %d",
162 memcpy(msg_cpy, msg,
sizeof(*msg));
170 log_error(
"Error re-enqueing delete MSU message");
195 log(LOG_ROUTING_CHANGES,
"Added route %d to msu %d route set",
204 log(LOG_ROUTING_CHANGES,
"Removed route %d from msu %d route set",
214 #define CHECK_MSG_SIZE(msg, target) \
215 if (msg->data_size != sizeof(target)) { \
216 log_warn("Message data size does not match size" \
217 "of target type " #target ); \
251 log_error(
"Message (type %d) meant for main thread send to worker thread",
255 log_error(
"Unknown message type %d delivered to worker thread %d",
267 #define DEFAULT_WAIT_TIMEOUT_S 1
270 static double timediff_s(
struct timespec *t1,
struct timespec *t2) {
271 return (
double)(t2->tv_sec - t1->tv_sec) + (
double)(t2->tv_nsec - t1->tv_nsec) * 1e-9;
283 clock_gettime(CLOCK_REALTIME_COARSE, &
cur_time);
303 clock_gettime(CLOCK_REALTIME, &tlist->
time);
304 tlist->
time.tv_sec += interval->tv_sec;
305 tlist->
time.tv_nsec += interval->tv_nsec;
306 if (tlist->
time.tv_nsec > 1e9) {
307 tlist->
time.tv_nsec -= 1e9;
308 tlist->
time.tv_sec += 1;
312 log(LOG_WORKER_THREAD,
"Enqueued timeout to head of queue");
320 log(LOG_WORKER_THREAD,
"Enqueued timeout to queue");
325 while (last_timeout->
next != NULL) {
330 last_timeout->
next = tlist;
331 log(LOG_WORKER_THREAD,
"Enqueued timeout to queue");
334 last_timeout = last_timeout->
next;
336 last_timeout->
next = tlist;
338 log(LOG_WORKER_THREAD,
"Enqueued timeout to queue");
346 log_info(
"Starting worker thread loop %d (%s)",
354 log_error(
"Error waiting on thread semaphore");
357 for (
int i=0; i<
self->n_msus; i++) {
358 log(LOG_MSU_DEQUEUES,
"Attempting to dequeue from msu %d (thread %d)",
359 self->msus[i]->id, thread->
id);
364 for (
int i=0; i<num_msgs; i++) {
367 log_error(
"Could not read message though queue is not empty!");
370 log(LOG_THREAD_MESSAGES,
"Dequeued thread message on thread %d",
373 log_error(
"Error processing worker thread message");
384 if (worker_threads[thread_id] != NULL) {
385 log_error(
"Worker thread %u already exists", thread_id);
390 if (thread == NULL) {
391 log_error(
"Error allocating worker thread");
401 log_error(
"Error starting dedos thread %d", thread_id);
404 log(LOG_THREAD_INITS,
"Created worker thread %d", thread_id);
int msu_dequeue(struct local_msu *msu)
Dequeus a message from a local MSU and calls its receive function.
ctrl_delete_msu_msg (ctrl_runtime_messages.h)
Messages to be delivered to dedos_threads.
#define log_info(fmt,...)
static int worker_thread_loop(struct dedos_thread *thread, void *v_worker_thread)
The main worker thread loop.
int register_msu_with_thread(struct local_msu *msu)
Registers an MSU as one that should be run on its assigned thread.
Payload for messages of type CTRL_DELETE_MSU.
Defines a type of MSU, including callback and accessor functions.
int send_ack_message(int id, bool success)
WILL Send an acknoweledgement of success for a specific message.
Representation of a thread that holds MSUs, messages, and waits on a semaphore.
int enqueue_worker_timeout(struct worker_thread *thread, struct timespec *interval)
Signals that the given thread should break when waiting on its semaphore once interval time has passe...
enum ctrl_msu_route_type type
Sub-type of message.
enum thread_mode mode
[un]pinned
static int process_worker_thread_msg(struct worker_thread *thread, struct thread_msg *msg)
Processes a message which has been sent to the worker thread.
static int create_msu_on_thread(struct worker_thread *thread, struct ctrl_create_msu_msg *msg)
Creates a new MSU on this thread based on the provided message.
static struct worker_thread * worker_threads[32]
Static struct to keep track of worker threads.
static struct timespec cur_time
Static structure for holding current time, so it can be returned from next_timeout.
struct msu_type * get_msu_type(int id)
Gets the MSU type with the provided ID.
Logging of status messages to the terminal.
int enqueue_thread_msg(struct thread_msg *thread_msg, struct msg_queue *queue)
Enqueues a dedos_msg with a thread_msg as the payload to the appropriate queue.
unsigned int id
Unique ID for a local MSU.
int rm_route_from_set(struct route_set *set, int route_id)
Removes a route from a set of routes.
static int del_msu_from_thread(struct worker_thread *thread, struct ctrl_delete_msu_msg *msg, int ack_id)
Removes an MSU from this thread based on the provided messages.
int thread_wait(struct dedos_thread *thread, struct timespec *abs_timeout)
To be called from the thread, causes it to wait until a message has been received or the timeout has ...
enum thread_msg_type type
struct timeout_list * timeouts
The times at which the sem_trywait on on the local semaphore should be released.
Payload for messages of type CTRL_CREATE_MSU.
struct thread_msg * dequeue_thread_msg(struct msg_queue *queue)
Dequeues a thread_msg from the message queue.
void destroy_msu(struct local_msu *msu)
Calls type-specific destroy function and frees associated memory.
#define MAX_MSU_PER_THREAD
The maximum number of MSUs which can be placed on a single thread.
static double timediff_s(struct timespec *t1, struct timespec *t2)
Returns the difference in time in seconds, t2 - t1.
int msu_id
MSU to which the route is to be added or removed.
#define CHECK_MSG_SIZE(msg, target)
Checks whether the size of the message is equal to the size of the target struct. ...
int type_id
Type ID of the MSU to create.
static int remove_idx_from_msu_list(struct worker_thread *thread, int idx)
Removes the MSU at the given index from the worker_thread::msus.
struct route_set routes
Routing table set, containing all destination MSUs (see routing.h for details)
struct thread_msg * construct_thread_msg(enum thread_msg_type type, ssize_t data_size, void *data)
Allocates and initializes a thread message with the provided options.
An entry in the linked list of timeouts.
void dedos_thread_stop(struct dedos_thread *thread)
Sets the exit signal for a thread, causing the main loop to quit.
struct local_msu * msus[MAX_MSU_PER_THREAD]
The MSUs on the thread.
#define log_error(fmt,...)
Declares the structures and functions applicable to MSUs on the local machine.
A message to be delivered to a dedos_thread.
struct local_msu * init_msu(unsigned int id, struct msu_type *type, struct worker_thread *thread, struct msu_init_data *data)
Allocates and creates a new MSU of the specified type and ID on the given thread. ...
int id
A unique identifier for the thread.
int route_id
ID of route to be added or removed.
blocking_mode
Whether an MSU is blocking or non-blocking.
The structure that represents an MSU located on the local machine.
int try_destroy_msu(struct local_msu *msu)
Destroys the MSU, but only if it has no states currently saved.
int n_msus
The number of msus on the thread.
int add_route_to_set(struct route_set *set, int route_id)
Adds a route to a set of routes.
int msu_id
ID of the MSU to be deleted.
struct timeout_list * next
static int get_msu_index(struct worker_thread *thread, int msu_id)
Gets the index in worker_thread::msus at which the msu_id resides.
Payload for messages of type CTRL_MSU_ROUTES.
void stop_all_worker_threads()
Signals all worker threads to stop.
#define DEFAULT_WAIT_TIMEOUT_S
Default amount of time to wait before sem_trywait should return.
static void * init_worker_thread(struct dedos_thread *thread)
Allocates and returns a new worker thread structure.
struct worker_thread * get_worker_thread(int id)
struct worker_thread * thread
The worker thread on which this MSU is placed.
int unregister_msu_with_thread(struct local_msu *msu)
Removes an MSU from the list of MSUs within its thread.
static void destroy_worker_thread(struct dedos_thread *thread, void *v_worker_thread)
Destroys all MSUs on a worker thread and frees the associated structure.
static int worker_mod_msu_route(struct worker_thread *thread, struct ctrl_msu_route_msg *msg)
Modifies the MSU's routes, either adding or removing a route subscription.
uint32_t num_msgs
Number of messages currently in the queue.
ctrl_create_msu_msg (ctrl_runtime_messages.h)
int msu_id
ID of the MSU to create.
Removes a route from an MSU.
void dedos_thread_join(struct dedos_thread *thread)
Joins and destroys the dedos_thread.
ctrl_msu_route_msg (ctrl_runtime_messages.h)
int ack_id
for sending acknowledgements to controller.
struct dedos_thread * thread
The underlying dedos_thread.
bool force
If true, forces the deletion of an MSU even if it has existing states.
int dedos_thread_should_exit(struct dedos_thread *thread)
Returns 1 if the exit signal has been triggered, otherwise 0.
#define MAX_DEDOS_THREAD_ID
The maximum ID that can be assigned to a worker thread.
#define log(level, fmt,...)
Log at a custom level.
int create_worker_thread(unsigned int thread_id, enum blocking_mode mode)
Starts a new worker thread with the given thread ID and pinned/unpinned status.
int start_dedos_thread(dedos_thread_fn thread_fn, dedos_thread_init_fn init_fn, dedos_thread_destroy_fn destroy_fn, enum blocking_mode mode, int id, struct dedos_thread *thread)
Initilizes and starts execution of a dedos_thread.
struct msu_init_data init_data
Initial data to pass to the MSU.
struct msg_queue queue
Queue for incoming message.
static struct timespec * next_timeout(struct worker_thread *thread)
Returns the next time at which the worker thread should exit its semaphore wait.
payload: send_to_runtime_msg (below)
#define log_warn(fmt,...)
payload: ctrl_add_runtime_msg (ctrl_runtime_messages.h)
Structure representing any thread within DeDOS.
Communication with global controller from runtime.