My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
worker_thread.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
25 #include "worker_thread.h"
26 #include "msu_type.h"
27 #include "local_msu.h"
28 #include "logging.h"
29 #include "thread_message.h"
30 #include "msu_message.h"
32 
33 #include <stdlib.h>
34 
36 #define MAX_DEDOS_THREAD_ID 32
37 
40 
42 static void *init_worker_thread(struct dedos_thread *thread) {
43  struct worker_thread *worker = calloc(1, sizeof(*worker));
44  if (worker == NULL) {
45  log_error("Error allocating worker thread");
46  return NULL;
47  }
48  worker->thread = thread;
49  worker->exit_signal = 0;
50  worker_threads[thread->id] = worker;
51  return worker;
52 }
53 
55  struct dedos_thread *d_threads[MAX_DEDOS_THREAD_ID];
56  int n_threads=0;
57  for (int i=0; i<MAX_DEDOS_THREAD_ID; i++) {
58  if (worker_threads[i] != NULL) {
59  d_threads[n_threads] = worker_threads[i]->thread;
60  n_threads++;
61  dedos_thread_stop(worker_threads[i]->thread);
62  }
63  }
64  for (int i=0; i<n_threads; i++) {
65  dedos_thread_join(d_threads[i]);
66  }
67 }
68 
70 static void destroy_worker_thread(struct dedos_thread *thread, void *v_worker_thread) {
71  struct worker_thread *wthread = v_worker_thread;
72  for (int i=0; i < wthread->n_msus; i++) {
73  destroy_msu(wthread->msus[i]);
74  }
75  worker_threads[thread->id] = NULL;
76  free(v_worker_thread);
77 }
78 
80  if (id > MAX_DEDOS_THREAD_ID) {
81  log_error("Error: ID higher than maximum thread ID: %d > %d", id, MAX_DEDOS_THREAD_ID);
82  return NULL;
83  }
84  return worker_threads[id];
85 }
86 
88 static int get_msu_index(struct worker_thread *thread, int msu_id) {
89  for (int i=0; i<thread->n_msus; i++) {
90  if (thread->msus[i]->id == msu_id) {
91  return i;
92  }
93  }
94  return -1;
95 }
96 
98 static int remove_idx_from_msu_list(struct worker_thread *thread, int idx) {
99  if (idx >= thread->n_msus) {
100  return -1;
101  }
102  for (int i=idx; i<thread->n_msus - 1; i++) {
103  thread->msus[i] = thread->msus[i+1];
104  }
105  thread->msus[thread->n_msus-1] = NULL;
106  thread->n_msus--;
107  return 0;
108 }
109 
111  int idx = get_msu_index(msu->thread, msu->id);
112  if (idx == -1) {
113  log_error("MSU %d does not exist on thread %d", msu->id, msu->thread->thread->id);
114  return -1;
115  }
116  return remove_idx_from_msu_list(msu->thread, idx);
117 }
118 
120  if (msu->thread->n_msus >= MAX_MSU_PER_THREAD) {
121  log_error("Too many MSUs on thread %d", msu->thread->thread->id);
122  return -1;
123  }
124  msu->thread->msus[msu->thread->n_msus] = msu;
125  msu->thread->n_msus++;
126  log(LOG_MSU_REGISTRATION, "Registered msu %d with thread", msu->id);
127  return 0;
128 }
129 
132  struct msu_type *type = get_msu_type(msg->type_id);
133  if (type == NULL) {
134  log_error("Failed to create MSU %d. Cannot retrieve type", msg->msu_id);
135  return -1;
136  }
137  struct local_msu *msu = init_msu(msg->msu_id, type, thread, &msg->init_data);
138  if (msu == NULL) {
139  log_error("Error creating MSU %d. Not placing on thread %d",
140  msg->msu_id, thread->thread->id);
141  return -1;
142  }
143  return 0;
144 }
145 
148  int ack_id) {
149  int idx = get_msu_index(thread, msg->msu_id);
150  if (idx == -1) {
151  log_error("MSU %d does not exist on thread %d", msg->msu_id, thread->thread->id);
152  return -1;
153  }
154  struct local_msu *msu = thread->msus[idx];
155  if (msg->force) {
156  destroy_msu(msu);
157  remove_idx_from_msu_list(thread, idx);
158  } else {
159  int rtn = try_destroy_msu(msu);
160  if (rtn == 1) {
161  struct ctrl_delete_msu_msg *msg_cpy = malloc(sizeof(*msg_cpy));
162  memcpy(msg_cpy, msg, sizeof(*msg));
163 
165  sizeof(*msg),
166  msg_cpy);
167  thread_msg->ack_id = ack_id;
168  rtn = enqueue_thread_msg(thread_msg, &thread->thread->queue);
169  if (rtn < 0) {
170  log_error("Error re-enqueing delete MSU message");
171  }
172  } else {
173  remove_idx_from_msu_list(thread, idx);
174  }
175  }
176  return 0;
177 }
178 
180 static int worker_mod_msu_route(struct worker_thread *thread, struct ctrl_msu_route_msg *msg) {
181  int idx = get_msu_index(thread, msg->msu_id);
182  if (idx < 0) {
183  log_error("MSU %d does not exist on thread %d", msg->msu_id, thread->thread->id);
184  return -1;
185  }
186  struct local_msu *msu = thread->msus[idx];
187  int rtn;
188  switch (msg->type) {
189  case ADD_ROUTE:
190  rtn = add_route_to_set(&msu->routes, msg->route_id);
191  if (rtn < 0) {
192  log_error("Error adding route %d to msu %d route set", msg->route_id, msg->msu_id);
193  return -1;
194  }
195  log(LOG_ROUTING_CHANGES, "Added route %d to msu %d route set",
196  msg->route_id, msg->msu_id);
197  return 0;
198  case DEL_ROUTE:
199  rtn = rm_route_from_set(&msu->routes, msg->route_id);
200  if (rtn < 0) {
201  log_error("Error removing route %d from msu %d route set", msg->route_id, msg->msu_id);
202  return -1;
203  }
204  log(LOG_ROUTING_CHANGES, "Removed route %d from msu %d route set",
205  msg->route_id, msg->msu_id);
206  return 0;
207  default:
208  log_error("Unknown route message type: %d", msg->type);
209  return -1;
210  }
211 }
212 
214 #define CHECK_MSG_SIZE(msg, target) \
215  if (msg->data_size != sizeof(target)) { \
216  log_warn("Message data size does not match size" \
217  "of target type " #target ); \
218  break; \
219  }
220 
222 static int process_worker_thread_msg(struct worker_thread *thread, struct thread_msg *msg) {
223  int rtn = -1;
224  switch (msg->type) {
225  case CREATE_MSU:
226  CHECK_MSG_SIZE(msg, struct ctrl_create_msu_msg);
227  struct ctrl_create_msu_msg *create_msg = msg->data;
228  rtn = create_msu_on_thread(thread, create_msg);
229  if (rtn < 0) {
230  log_error("Error creating MSU");
231  }
232  break;
233  case DELETE_MSU:
234  CHECK_MSG_SIZE(msg, struct ctrl_delete_msu_msg);
235  struct ctrl_delete_msu_msg *del_msg = msg->data;
236  rtn = del_msu_from_thread(thread, del_msg, msg->ack_id);
237  if (rtn < 0) {
238  log_error("Error deleting MSU");
239  }
240  break;
241  case MSU_ROUTE:
242  CHECK_MSG_SIZE(msg, struct ctrl_msu_route_msg);
243  struct ctrl_msu_route_msg *route_msg = msg->data;
244  rtn = worker_mod_msu_route(thread, route_msg);
245  if (rtn < 0) {
246  log_error("Error modifiying MSU route");
247  }
248  break;
249  case CONNECT_TO_RUNTIME:
250  case SEND_TO_PEER:
251  log_error("Message (type %d) meant for main thread send to worker thread",
252  msg->type);
253  break;
254  default:
255  log_error("Unknown message type %d delivered to worker thread %d",
256  msg->type, thread->thread->id);
257  break;
258  }
259  if (msg->ack_id > 0) {
260  send_ack_message(msg->ack_id, rtn == 0);
261  log_warn("SENT ACK %d", msg->ack_id);
262  }
263  return rtn;
264 }
265 
267 #define DEFAULT_WAIT_TIMEOUT_S 1
268 
270 static double timediff_s(struct timespec *t1, struct timespec *t2) {
271  return (double)(t2->tv_sec - t1->tv_sec) + (double)(t2->tv_nsec - t1->tv_nsec) * 1e-9;
272 }
273 
275 static struct timespec cur_time;
276 
278 static struct timespec *next_timeout(struct worker_thread *thread) {
279  if (thread->timeouts == NULL) {
280  return NULL;
281  }
282  struct timespec *time = &thread->timeouts->time;
283  clock_gettime(CLOCK_REALTIME_COARSE, &cur_time);
284  double diff_s = timediff_s(time, &cur_time);
285  if (diff_s >= 0) {
286  struct timeout_list *old = thread->timeouts;
287  thread->timeouts = old->next;
288  free(old);
289  return &cur_time;
290  }
291  if (-diff_s > DEFAULT_WAIT_TIMEOUT_S) {
292  return NULL;
293  }
294  cur_time = *time;
295  struct timeout_list *old = thread->timeouts;
296  thread->timeouts = old->next;
297  free(old);
298  return &cur_time;
299 }
300 
301 int enqueue_worker_timeout(struct worker_thread *thread, struct timespec *interval) {
302  struct timeout_list *tlist = calloc(1, sizeof(*tlist));
303  clock_gettime(CLOCK_REALTIME, &tlist->time);
304  tlist->time.tv_sec += interval->tv_sec;
305  tlist->time.tv_nsec += interval->tv_nsec;
306  if (tlist->time.tv_nsec > 1e9) {
307  tlist->time.tv_nsec -= 1e9;
308  tlist->time.tv_sec += 1;
309  }
310  if (thread->timeouts == NULL) {
311  thread->timeouts = tlist;
312  log(LOG_WORKER_THREAD, "Enqueued timeout to head of queue");
313  return 0;
314  }
315 
316  double diff = timediff_s(&tlist->time, &thread->timeouts->time);
317  if (diff > 0) {
318  tlist->next = thread->timeouts;
319  thread->timeouts = tlist;
320  log(LOG_WORKER_THREAD, "Enqueued timeout to queue");
321  return 0;
322  }
323 
324  struct timeout_list *last_timeout = thread->timeouts;
325  while (last_timeout->next != NULL) {
326  struct timespec *next_timeout = &last_timeout->next->time;
327  diff = timediff_s(&tlist->time, next_timeout);
328  if (diff > 0) {
329  tlist->next = last_timeout->next;
330  last_timeout->next = tlist;
331  log(LOG_WORKER_THREAD, "Enqueued timeout to queue");
332  return 0;
333  }
334  last_timeout = last_timeout->next;
335  }
336  last_timeout->next = tlist;
337  tlist->next = NULL;
338  log(LOG_WORKER_THREAD, "Enqueued timeout to queue");
339  return 0;
340 }
341 
342 
343 
345 static int worker_thread_loop(struct dedos_thread *thread, void *v_worker_thread) {
346  log_info("Starting worker thread loop %d (%s)",
347  thread->id, thread->mode == PINNED_THREAD ? "pinned" : "unpinned");
348 
349  struct worker_thread *self = v_worker_thread;
350 
351  while (!dedos_thread_should_exit(thread)) {
352  // TODO: Get context switches
353  if (thread_wait(thread, next_timeout(self)) != 0) {
354  log_error("Error waiting on thread semaphore");
355  continue;
356  }
357  for (int i=0; i<self->n_msus; i++) {
358  log(LOG_MSU_DEQUEUES, "Attempting to dequeue from msu %d (thread %d)",
359  self->msus[i]->id, thread->id);
360  msu_dequeue(self->msus[i]);
361  }
362  // FIXME: Protect read of num_msgs through mutex
363  int num_msgs = thread->queue.num_msgs;
364  for (int i=0; i<num_msgs; i++) {
365  struct thread_msg *msg = dequeue_thread_msg(&thread->queue);
366  if (msg == NULL) {
367  log_error("Could not read message though queue is not empty!");
368  continue;
369  }
370  log(LOG_THREAD_MESSAGES,"Dequeued thread message on thread %d",
371  thread->id);
372  if (process_worker_thread_msg(self, msg) != 0) {
373  log_error("Error processing worker thread message");
374  }
375  free(msg);
376  }
377  }
378  log_info("Leaving thread %d", thread->id);
379  return 0;
380 }
381 
382 int create_worker_thread(unsigned int thread_id,
383  enum blocking_mode mode) {
384  if (worker_threads[thread_id] != NULL) {
385  log_error("Worker thread %u already exists", thread_id);
386  return -1;
387  }
388 
389  struct dedos_thread *thread = malloc(sizeof(*thread));
390  if (thread == NULL) {
391  log_error("Error allocating worker thread");
392  return -1;
393  }
397  mode,
398  thread_id,
399  thread);
400  if (rtn < 0) {
401  log_error("Error starting dedos thread %d", thread_id);
402  return -1;
403  }
404  log(LOG_THREAD_INITS, "Created worker thread %d", thread_id);
405  return 0;
406 }
407 
int msu_dequeue(struct local_msu *msu)
Dequeus a message from a local MSU and calls its receive function.
Definition: local_msu.c:323
ctrl_delete_msu_msg (ctrl_runtime_messages.h)
Messages to be delivered to dedos_threads.
#define log_info(fmt,...)
Definition: logging.h:88
static int worker_thread_loop(struct dedos_thread *thread, void *v_worker_thread)
The main worker thread loop.
int register_msu_with_thread(struct local_msu *msu)
Registers an MSU as one that should be run on its assigned thread.
Payload for messages of type CTRL_DELETE_MSU.
Adds a route to an MSU.
Defines a type of MSU, including callback and accessor functions.
int send_ack_message(int id, bool success)
WILL Send an acknoweledgement of success for a specific message.
Representation of a thread that holds MSUs, messages, and waits on a semaphore.
Definition: worker_thread.h:40
int enqueue_worker_timeout(struct worker_thread *thread, struct timespec *interval)
Signals that the given thread should break when waiting on its semaphore once interval time has passe...
enum ctrl_msu_route_type type
Sub-type of message.
enum thread_mode mode
[un]pinned
Definition: dedos_threads.h:41
static int process_worker_thread_msg(struct worker_thread *thread, struct thread_msg *msg)
Processes a message which has been sent to the worker thread.
static int create_msu_on_thread(struct worker_thread *thread, struct ctrl_create_msu_msg *msg)
Creates a new MSU on this thread based on the provided message.
static struct worker_thread * worker_threads[32]
Static struct to keep track of worker threads.
Definition: worker_thread.c:39
static struct timespec cur_time
Static structure for holding current time, so it can be returned from next_timeout.
struct msu_type * get_msu_type(int id)
Gets the MSU type with the provided ID.
Definition: msu_type.c:80
Logging of status messages to the terminal.
int enqueue_thread_msg(struct thread_msg *thread_msg, struct msg_queue *queue)
Enqueues a dedos_msg with a thread_msg as the payload to the appropriate queue.
unsigned int id
Unique ID for a local MSU.
Definition: local_msu.h:54
Threads that hold MSUs.
int rm_route_from_set(struct route_set *set, int route_id)
Removes a route from a set of routes.
Definition: routing.c:465
static int del_msu_from_thread(struct worker_thread *thread, struct ctrl_delete_msu_msg *msg, int ack_id)
Removes an MSU from this thread based on the provided messages.
int thread_wait(struct dedos_thread *thread, struct timespec *abs_timeout)
To be called from the thread, causes it to wait until a message has been received or the timeout has ...
enum thread_msg_type type
struct timeout_list * timeouts
The times at which the sem_trywait on on the local semaphore should be released.
Definition: worker_thread.h:51
Payload for messages of type CTRL_CREATE_MSU.
struct thread_msg * dequeue_thread_msg(struct msg_queue *queue)
Dequeues a thread_msg from the message queue.
void destroy_msu(struct local_msu *msu)
Calls type-specific destroy function and frees associated memory.
Definition: local_msu.c:257
#define MAX_MSU_PER_THREAD
The maximum number of MSUs which can be placed on a single thread.
Definition: dfg.h:61
static double timediff_s(struct timespec *t1, struct timespec *t2)
Returns the difference in time in seconds, t2 - t1.
int msu_id
MSU to which the route is to be added or removed.
#define CHECK_MSG_SIZE(msg, target)
Checks whether the size of the message is equal to the size of the target struct. ...
int type_id
Type ID of the MSU to create.
static int remove_idx_from_msu_list(struct worker_thread *thread, int idx)
Removes the MSU at the given index from the worker_thread::msus.
Definition: worker_thread.c:98
struct route_set routes
Routing table set, containing all destination MSUs (see routing.h for details)
Definition: local_msu.h:51
struct thread_msg * construct_thread_msg(enum thread_msg_type type, ssize_t data_size, void *data)
Allocates and initializes a thread message with the provided options.
An entry in the linked list of timeouts.
Definition: worker_thread.h:34
void dedos_thread_stop(struct dedos_thread *thread)
Sets the exit signal for a thread, causing the main loop to quit.
struct local_msu * msus[MAX_MSU_PER_THREAD]
The MSUs on the thread.
Definition: worker_thread.h:49
#define log_error(fmt,...)
Definition: logging.h:101
Declares the structures and functions applicable to MSUs on the local machine.
A message to be delivered to a dedos_thread.
struct local_msu * init_msu(unsigned int id, struct msu_type *type, struct worker_thread *thread, struct msu_init_data *data)
Allocates and creates a new MSU of the specified type and ID on the given thread. ...
Definition: local_msu.c:198
int id
A unique identifier for the thread.
Definition: dedos_threads.h:39
int route_id
ID of route to be added or removed.
blocking_mode
Whether an MSU is blocking or non-blocking.
Definition: dfg.h:161
The structure that represents an MSU located on the local machine.
Definition: local_msu.h:38
int try_destroy_msu(struct local_msu *msu)
Destroys the MSU, but only if it has no states currently saved.
Definition: local_msu.c:249
int n_msus
The number of msus on the thread.
Definition: worker_thread.h:44
int add_route_to_set(struct route_set *set, int route_id)
Adds a route to a set of routes.
Definition: routing.c:450
int msu_id
ID of the MSU to be deleted.
struct timeout_list * next
Definition: worker_thread.h:36
static int get_msu_index(struct worker_thread *thread, int msu_id)
Gets the index in worker_thread::msus at which the msu_id resides.
Definition: worker_thread.c:88
Payload for messages of type CTRL_MSU_ROUTES.
void stop_all_worker_threads()
Signals all worker threads to stop.
Definition: worker_thread.c:54
#define DEFAULT_WAIT_TIMEOUT_S
Default amount of time to wait before sem_trywait should return.
static void * init_worker_thread(struct dedos_thread *thread)
Allocates and returns a new worker thread structure.
Definition: worker_thread.c:42
struct worker_thread * get_worker_thread(int id)
Definition: worker_thread.c:79
struct worker_thread * thread
The worker thread on which this MSU is placed.
Definition: local_msu.h:69
int unregister_msu_with_thread(struct local_msu *msu)
Removes an MSU from the list of MSUs within its thread.
static void destroy_worker_thread(struct dedos_thread *thread, void *v_worker_thread)
Destroys all MSUs on a worker thread and frees the associated structure.
Definition: worker_thread.c:70
Defines a type of MSU.
Definition: msu_type.h:46
static int worker_mod_msu_route(struct worker_thread *thread, struct ctrl_msu_route_msg *msg)
Modifies the MSU's routes, either adding or removing a route subscription.
uint32_t num_msgs
Number of messages currently in the queue.
Definition: message_queue.h:57
ctrl_create_msu_msg (ctrl_runtime_messages.h)
int msu_id
ID of the MSU to create.
Removes a route from an MSU.
void dedos_thread_join(struct dedos_thread *thread)
Joins and destroys the dedos_thread.
ctrl_msu_route_msg (ctrl_runtime_messages.h)
int ack_id
for sending acknowledgements to controller.
struct dedos_thread * thread
The underlying dedos_thread.
Definition: worker_thread.h:42
bool force
If true, forces the deletion of an MSU even if it has existing states.
int dedos_thread_should_exit(struct dedos_thread *thread)
Returns 1 if the exit signal has been triggered, otherwise 0.
#define MAX_DEDOS_THREAD_ID
The maximum ID that can be assigned to a worker thread.
Definition: worker_thread.c:36
#define log(level, fmt,...)
Log at a custom level.
Definition: logging.h:147
int create_worker_thread(unsigned int thread_id, enum blocking_mode mode)
Starts a new worker thread with the given thread ID and pinned/unpinned status.
Messages passed to MSUs.
int start_dedos_thread(dedos_thread_fn thread_fn, dedos_thread_init_fn init_fn, dedos_thread_destroy_fn destroy_fn, enum blocking_mode mode, int id, struct dedos_thread *thread)
Initilizes and starts execution of a dedos_thread.
struct msu_init_data init_data
Initial data to pass to the MSU.
struct timespec time
Definition: worker_thread.h:35
struct msg_queue queue
Queue for incoming message.
Definition: dedos_threads.h:43
static struct timespec * next_timeout(struct worker_thread *thread)
Returns the next time at which the worker thread should exit its semaphore wait.
payload: send_to_runtime_msg (below)
#define log_warn(fmt,...)
Definition: logging.h:113
payload: ctrl_add_runtime_msg (ctrl_runtime_messages.h)
Structure representing any thread within DeDOS.
Definition: dedos_threads.h:35
Communication with global controller from runtime.