Line data Source code
1 : /*
2 : START OF LICENSE STUB
3 : DeDOS: Declarative Dispersion-Oriented Software
4 : Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 :
6 : This program is free software: you can redistribute it and/or modify
7 : it under the terms of the GNU General Public License as published by
8 : the Free Software Foundation, either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : This program is distributed in the hope that it will be useful,
12 : but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : GNU General Public License for more details.
15 :
16 : You should have received a copy of the GNU General Public License
17 : along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : END OF LICENSE STUB
19 : */
20 : /**
21 : * @file worker_thread.c
22 : *
23 : * Threads that hold MSUs
24 : */
25 : #include "worker_thread.h"
26 : #include "msu_type.h"
27 : #include "local_msu.h"
28 : #include "logging.h"
29 : #include "thread_message.h"
30 : #include "msu_message.h"
31 : #include "controller_communication.h"
32 :
33 : #include <stdlib.h>
34 :
35 : /** The maximum ID that can be assigned to a worker thread */
36 : #define MAX_DEDOS_THREAD_ID 32
37 :
38 : /** Static struct to keep track of worker threads */
39 : static struct worker_thread *worker_threads[MAX_DEDOS_THREAD_ID];
40 :
41 : /** Allocates and returns a new worker thread structure */
42 1 : static void *init_worker_thread(struct dedos_thread *thread) {
43 1 : struct worker_thread *worker = calloc(1, sizeof(*worker));
44 1 : if (worker == NULL) {
45 0 : log_error("Error allocating worker thread");
46 0 : return NULL;
47 : }
48 1 : worker->thread = thread;
49 1 : worker->exit_signal = 0;
50 1 : worker_threads[thread->id] = worker;
51 1 : return worker;
52 : }
53 :
54 0 : void stop_all_worker_threads() {
55 : struct dedos_thread *d_threads[MAX_DEDOS_THREAD_ID];
56 0 : int n_threads=0;
57 0 : for (int i=0; i<MAX_DEDOS_THREAD_ID; i++) {
58 0 : if (worker_threads[i] != NULL) {
59 0 : d_threads[n_threads] = worker_threads[i]->thread;
60 0 : n_threads++;
61 0 : dedos_thread_stop(worker_threads[i]->thread);
62 : }
63 : }
64 0 : for (int i=0; i<n_threads; i++) {
65 0 : dedos_thread_join(d_threads[i]);
66 : }
67 0 : }
68 :
69 : /** Destroys all MSUs on a worker thread and frees the associated structure */
70 0 : static void destroy_worker_thread(struct dedos_thread *thread, void *v_worker_thread) {
71 0 : struct worker_thread *wthread = v_worker_thread;
72 0 : for (int i=0; i < wthread->n_msus; i++) {
73 0 : destroy_msu(wthread->msus[i]);
74 : }
75 0 : worker_threads[thread->id] = NULL;
76 0 : free(v_worker_thread);
77 0 : }
78 :
79 1 : struct worker_thread *get_worker_thread(int id) {
80 1 : if (id > MAX_DEDOS_THREAD_ID) {
81 0 : log_error("Error: ID higher than maximum thread ID: %d > %d", id, MAX_DEDOS_THREAD_ID);
82 0 : return NULL;
83 : }
84 1 : return worker_threads[id];
85 : }
86 :
87 : /** Gets the index in worker_thread::msus at which the msu_id resides */
88 5 : static int get_msu_index(struct worker_thread *thread, int msu_id) {
89 12 : for (int i=0; i<thread->n_msus; i++) {
90 10 : if (thread->msus[i]->id == msu_id) {
91 3 : return i;
92 : }
93 : }
94 2 : return -1;
95 : }
96 :
97 : /** Removes the MSU at the given index from the worker_thread::msus */
98 1 : static int remove_idx_from_msu_list(struct worker_thread *thread, int idx) {
99 1 : if (idx >= thread->n_msus) {
100 0 : return -1;
101 : }
102 1 : for (int i=idx; i<thread->n_msus - 1; i++) {
103 0 : thread->msus[i] = thread->msus[i+1];
104 : }
105 1 : thread->msus[thread->n_msus-1] = NULL;
106 1 : thread->n_msus--;
107 1 : return 0;
108 : }
109 :
110 0 : int unregister_msu_with_thread(struct local_msu *msu) {
111 0 : int idx = get_msu_index(msu->thread, msu->id);
112 0 : if (idx == -1) {
113 0 : log_error("MSU %d does not exist on thread %d", msu->id, msu->thread->thread->id);
114 0 : return -1;
115 : }
116 0 : return remove_idx_from_msu_list(msu->thread, idx);
117 : }
118 :
119 1 : int register_msu_with_thread(struct local_msu *msu) {
120 1 : if (msu->thread->n_msus >= MAX_MSU_PER_THREAD) {
121 0 : log_error("Too many MSUs on thread %d", msu->thread->thread->id);
122 0 : return -1;
123 : }
124 1 : msu->thread->msus[msu->thread->n_msus] = msu;
125 1 : msu->thread->n_msus++;
126 1 : log(LOG_MSU_REGISTRATION, "Registered msu %d with thread", msu->id);
127 1 : return 0;
128 : }
129 :
130 : /** Creates a new MSU on this thread based on the provided message */
131 1 : static int create_msu_on_thread(struct worker_thread *thread, struct ctrl_create_msu_msg *msg) {
132 1 : struct msu_type *type = get_msu_type(msg->type_id);
133 1 : if (type == NULL) {
134 0 : log_error("Failed to create MSU %d. Cannot retrieve type", msg->msu_id);
135 0 : return -1;
136 : }
137 1 : struct local_msu *msu = init_msu(msg->msu_id, type, thread, &msg->init_data);
138 1 : if (msu == NULL) {
139 0 : log_error("Error creating MSU %d. Not placing on thread %d",
140 : msg->msu_id, thread->thread->id);
141 0 : return -1;
142 : }
143 1 : return 0;
144 : }
145 :
146 : /** Removes an MSU from this thread based on the provided messages */
147 2 : static int del_msu_from_thread(struct worker_thread *thread, struct ctrl_delete_msu_msg *msg,
148 : int ack_id) {
149 2 : int idx = get_msu_index(thread, msg->msu_id);
150 2 : if (idx == -1) {
151 1 : log_error("MSU %d does not exist on thread %d", msg->msu_id, thread->thread->id);
152 1 : return -1;
153 : }
154 1 : struct local_msu *msu = thread->msus[idx];
155 1 : if (msg->force) {
156 0 : destroy_msu(msu);
157 0 : remove_idx_from_msu_list(thread, idx);
158 : } else {
159 1 : int rtn = try_destroy_msu(msu);
160 1 : if (rtn == 1) {
161 0 : struct ctrl_delete_msu_msg *msg_cpy = malloc(sizeof(*msg_cpy));
162 0 : memcpy(msg_cpy, msg, sizeof(*msg));
163 :
164 0 : struct thread_msg *thread_msg = construct_thread_msg(DELETE_MSU,
165 : sizeof(*msg),
166 : msg_cpy);
167 0 : thread_msg->ack_id = ack_id;
168 0 : rtn = enqueue_thread_msg(thread_msg, &thread->thread->queue);
169 0 : if (rtn < 0) {
170 0 : log_error("Error re-enqueing delete MSU message");
171 : }
172 : } else {
173 1 : remove_idx_from_msu_list(thread, idx);
174 : }
175 : }
176 1 : return 0;
177 : }
178 :
179 : /** Modifies the MSU's routes, either adding or removing a route subscription */
180 0 : static int worker_mod_msu_route(struct worker_thread *thread, struct ctrl_msu_route_msg *msg) {
181 0 : int idx = get_msu_index(thread, msg->msu_id);
182 0 : if (idx < 0) {
183 0 : log_error("MSU %d does not exist on thread %d", msg->msu_id, thread->thread->id);
184 0 : return -1;
185 : }
186 0 : struct local_msu *msu = thread->msus[idx];
187 : int rtn;
188 0 : switch (msg->type) {
189 : case ADD_ROUTE:
190 0 : rtn = add_route_to_set(&msu->routes, msg->route_id);
191 0 : if (rtn < 0) {
192 0 : log_error("Error adding route %d to msu %d route set", msg->route_id, msg->msu_id);
193 0 : return -1;
194 : }
195 0 : log(LOG_ROUTING_CHANGES, "Added route %d to msu %d route set",
196 : msg->route_id, msg->msu_id);
197 0 : return 0;
198 : case DEL_ROUTE:
199 0 : rtn = rm_route_from_set(&msu->routes, msg->route_id);
200 0 : if (rtn < 0) {
201 0 : log_error("Error removing route %d from msu %d route set", msg->route_id, msg->msu_id);
202 0 : return -1;
203 : }
204 0 : log(LOG_ROUTING_CHANGES, "Removed route %d from msu %d route set",
205 : msg->route_id, msg->msu_id);
206 0 : return 0;
207 : default:
208 0 : log_error("Unknown route message type: %d", msg->type);
209 0 : return -1;
210 : }
211 : }
212 :
213 : /** Checks whether the size of the message is equal to the size of the target struct */
214 : #define CHECK_MSG_SIZE(msg, target) \
215 : if (msg->data_size != sizeof(target)) { \
216 : log_warn("Message data size does not match size" \
217 : "of target type " #target ); \
218 : break; \
219 : }
220 :
221 : /** Processes a message which has been sent to the worker thread */
222 0 : static int process_worker_thread_msg(struct worker_thread *thread, struct thread_msg *msg) {
223 0 : int rtn = -1;
224 0 : switch (msg->type) {
225 : case CREATE_MSU:
226 0 : CHECK_MSG_SIZE(msg, struct ctrl_create_msu_msg);
227 0 : struct ctrl_create_msu_msg *create_msg = msg->data;
228 0 : rtn = create_msu_on_thread(thread, create_msg);
229 0 : if (rtn < 0) {
230 0 : log_error("Error creating MSU");
231 : }
232 0 : break;
233 : case DELETE_MSU:
234 0 : CHECK_MSG_SIZE(msg, struct ctrl_delete_msu_msg);
235 0 : struct ctrl_delete_msu_msg *del_msg = msg->data;
236 0 : rtn = del_msu_from_thread(thread, del_msg, msg->ack_id);
237 0 : if (rtn < 0) {
238 0 : log_error("Error deleting MSU");
239 : }
240 0 : break;
241 : case MSU_ROUTE:
242 0 : CHECK_MSG_SIZE(msg, struct ctrl_msu_route_msg);
243 0 : struct ctrl_msu_route_msg *route_msg = msg->data;
244 0 : rtn = worker_mod_msu_route(thread, route_msg);
245 0 : if (rtn < 0) {
246 0 : log_error("Error modifiying MSU route");
247 : }
248 0 : break;
249 : case CONNECT_TO_RUNTIME:
250 : case SEND_TO_PEER:
251 0 : log_error("Message (type %d) meant for main thread send to worker thread",
252 : msg->type);
253 0 : break;
254 : default:
255 0 : log_error("Unknown message type %d delivered to worker thread %d",
256 : msg->type, thread->thread->id);
257 0 : break;
258 : }
259 0 : if (msg->ack_id > 0) {
260 0 : send_ack_message(msg->ack_id, rtn == 0);
261 0 : log_warn("SENT ACK %d", msg->ack_id);
262 : }
263 0 : return rtn;
264 : }
265 :
266 : /** Default amount of time to wait before sem_trywait should return */
267 : #define DEFAULT_WAIT_TIMEOUT_S 1
268 :
269 : /** Returns the difference in time in seconds, t2 - t1 */
270 10 : static double timediff_s(struct timespec *t1, struct timespec *t2) {
271 10 : return (double)(t2->tv_sec - t1->tv_sec) + (double)(t2->tv_nsec - t1->tv_nsec) * 1e-9;
272 : }
273 :
274 : /** Static structure for holding current time, so it can be returned from ::next_timeout */
275 : static struct timespec cur_time;
276 :
277 : /** Returns the next time at which the worker thread should exit its semaphore wait*/
278 1 : static struct timespec *next_timeout(struct worker_thread *thread) {
279 1 : if (thread->timeouts == NULL) {
280 0 : return NULL;
281 : }
282 1 : struct timespec *time = &thread->timeouts->time;
283 1 : clock_gettime(CLOCK_REALTIME_COARSE, &cur_time);
284 1 : double diff_s = timediff_s(time, &cur_time);
285 1 : if (diff_s >= 0) {
286 0 : struct timeout_list *old = thread->timeouts;
287 0 : thread->timeouts = old->next;
288 0 : free(old);
289 0 : return &cur_time;
290 : }
291 1 : if (-diff_s > DEFAULT_WAIT_TIMEOUT_S) {
292 0 : return NULL;
293 : }
294 1 : cur_time = *time;
295 1 : struct timeout_list *old = thread->timeouts;
296 1 : thread->timeouts = old->next;
297 1 : free(old);
298 1 : return &cur_time;
299 : }
300 :
301 6 : int enqueue_worker_timeout(struct worker_thread *thread, struct timespec *interval) {
302 6 : struct timeout_list *tlist = calloc(1, sizeof(*tlist));
303 6 : clock_gettime(CLOCK_REALTIME, &tlist->time);
304 6 : tlist->time.tv_sec += interval->tv_sec;
305 6 : tlist->time.tv_nsec += interval->tv_nsec;
306 6 : if (tlist->time.tv_nsec > 1e9) {
307 3 : tlist->time.tv_nsec -= 1e9;
308 3 : tlist->time.tv_sec += 1;
309 : }
310 6 : if (thread->timeouts == NULL) {
311 2 : thread->timeouts = tlist;
312 2 : log(LOG_WORKER_THREAD, "Enqueued timeout to head of queue");
313 2 : return 0;
314 : }
315 :
316 4 : double diff = timediff_s(&tlist->time, &thread->timeouts->time);
317 4 : if (diff > 0) {
318 0 : tlist->next = thread->timeouts;
319 0 : thread->timeouts = tlist;
320 0 : log(LOG_WORKER_THREAD, "Enqueued timeout to queue");
321 0 : return 0;
322 : }
323 :
324 4 : struct timeout_list *last_timeout = thread->timeouts;
325 11 : while (last_timeout->next != NULL) {
326 5 : struct timespec *next_timeout = &last_timeout->next->time;
327 5 : diff = timediff_s(&tlist->time, next_timeout);
328 5 : if (diff > 0) {
329 2 : tlist->next = last_timeout->next;
330 2 : last_timeout->next = tlist;
331 2 : log(LOG_WORKER_THREAD, "Enqueued timeout to queue");
332 2 : return 0;
333 : }
334 3 : last_timeout = last_timeout->next;
335 : }
336 2 : last_timeout->next = tlist;
337 2 : tlist->next = NULL;
338 2 : log(LOG_WORKER_THREAD, "Enqueued timeout to queue");
339 2 : return 0;
340 : }
341 :
342 :
343 :
344 : /** The main worker thread loop. Checks for exit signal, processes messages */
345 0 : static int worker_thread_loop(struct dedos_thread *thread, void *v_worker_thread) {
346 0 : log_info("Starting worker thread loop %d (%s)",
347 : thread->id, thread->mode == PINNED_THREAD ? "pinned" : "unpinned");
348 :
349 0 : struct worker_thread *self = v_worker_thread;
350 :
351 0 : while (!dedos_thread_should_exit(thread)) {
352 : // TODO: Get context switches
353 0 : if (thread_wait(thread, next_timeout(self)) != 0) {
354 0 : log_error("Error waiting on thread semaphore");
355 0 : continue;
356 : }
357 0 : for (int i=0; i<self->n_msus; i++) {
358 0 : log(LOG_MSU_DEQUEUES, "Attempting to dequeue from msu %d (thread %d)",
359 : self->msus[i]->id, thread->id);
360 0 : msu_dequeue(self->msus[i]);
361 : }
362 : // FIXME: Protect read of num_msgs through mutex
363 0 : int num_msgs = thread->queue.num_msgs;
364 0 : for (int i=0; i<num_msgs; i++) {
365 0 : struct thread_msg *msg = dequeue_thread_msg(&thread->queue);
366 0 : if (msg == NULL) {
367 0 : log_error("Could not read message though queue is not empty!");
368 0 : continue;
369 : }
370 0 : log(LOG_THREAD_MESSAGES,"Dequeued thread message on thread %d",
371 : thread->id);
372 0 : if (process_worker_thread_msg(self, msg) != 0) {
373 0 : log_error("Error processing worker thread message");
374 : }
375 0 : free(msg);
376 : }
377 : }
378 0 : log_info("Leaving thread %d", thread->id);
379 0 : return 0;
380 : }
381 :
382 0 : int create_worker_thread(unsigned int thread_id,
383 : enum blocking_mode mode) {
384 0 : if (worker_threads[thread_id] != NULL) {
385 0 : log_error("Worker thread %u already exists", thread_id);
386 0 : return -1;
387 : }
388 :
389 0 : struct dedos_thread *thread = malloc(sizeof(*thread));
390 0 : if (thread == NULL) {
391 0 : log_error("Error allocating worker thread");
392 0 : return -1;
393 : }
394 0 : int rtn = start_dedos_thread(worker_thread_loop,
395 : init_worker_thread,
396 : destroy_worker_thread,
397 : mode,
398 : thread_id,
399 : thread);
400 0 : if (rtn < 0) {
401 0 : log_error("Error starting dedos thread %d", thread_id);
402 0 : return -1;
403 : }
404 0 : log(LOG_THREAD_INITS, "Created worker thread %d", thread_id);
405 0 : return 0;
406 : }
407 :
|