Line data Source code
1 : /*
2 : START OF LICENSE STUB
3 : DeDOS: Declarative Dispersion-Oriented Software
4 : Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 :
6 : This program is free software: you can redistribute it and/or modify
7 : it under the terms of the GNU General Public License as published by
8 : the Free Software Foundation, either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : This program is distributed in the hope that it will be useful,
12 : but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : GNU General Public License for more details.
15 :
16 : You should have received a copy of the GNU General Public License
17 : along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : END OF LICENSE STUB
19 : */
20 : /**
21 : * @file dedos_threads.c
22 : *
23 : * Control spawned threads with message queue within DeDOS
24 : */
25 :
26 : /** Needed for CPU_SET etc. */
27 : #define _GNU_SOURCE
28 :
29 : #include "dedos_threads.h"
30 : #include "thread_message.h"
31 : #include "output_thread.h"
32 : #include "logging.h"
33 : #include "rt_stats.h"
34 :
35 : #include <stdbool.h>
36 : #include <stdlib.h>
37 : #include <sched.h>
38 : #include <sys/resource.h>
39 :
40 : /** The maximum identifier that can be used for a dedos_thread */
41 : #define MAX_DEDOS_THREAD_ID 32
42 : /** The maximum number of cores that can be present on a node */
43 : #define MAX_CORES 16
44 :
45 : /** The index at which to store the dedos_thread handling sending messages */
46 : #define OUTPUT_THREAD_INDEX MAX_DEDOS_THREAD_ID + 1
47 :
48 : /** Static structure to hold created dedos_thread's */
49 : static struct dedos_thread *dedos_threads[MAX_DEDOS_THREAD_ID + 2];
50 : /** Keep track of which cores have been assigned to threads */
51 : static int pinned_cores[MAX_CORES];
52 :
53 : enum stat_id thread_stat_items[] = {
54 : THREAD_UCPUTIME,
55 : THREAD_SCPUTIME,
56 : THREAD_MAXRSS,
57 : THREAD_MINFLT,
58 : THREAD_MAJFLT,
59 : THREAD_VCSW,
60 : THREAD_IVCSW
61 : };
62 :
63 : #define N_THREAD_STAT_ITEMS sizeof(thread_stat_items) / sizeof(*thread_stat_items)
64 :
65 : /** Initilizes the stat items associated with a thread */
66 6 : static inline void init_thread_stat_items(int id) {
67 48 : for (int i=0; i < N_THREAD_STAT_ITEMS; i++) {
68 42 : init_stat_item(thread_stat_items[i], id);
69 : }
70 6 : }
71 :
72 3 : struct dedos_thread *get_dedos_thread(int id) {
73 3 : if (id == OUTPUT_THREAD_ID) {
74 0 : id = MAX_DEDOS_THREAD_ID;
75 3 : } else if (id < 0) {
76 0 : log_error("Dedos thread ID cannot be negative! Provided: %d", id);
77 0 : return NULL;
78 3 : } else if (id > MAX_DEDOS_THREAD_ID) {
79 0 : log_error("Requested thread ID too high. Maximum is %d", MAX_DEDOS_THREAD_ID);
80 0 : return NULL;
81 : }
82 3 : if (dedos_threads[id] == NULL) {
83 1 : log_error("Dedos thread with id %d is not initialized", id);
84 1 : return NULL;
85 : }
86 2 : return dedos_threads[id];
87 : }
88 :
89 : /**
90 : * Initializes a dedos_thread structure to contain the appropriate fields
91 : */
92 6 : static int init_dedos_thread(struct dedos_thread *thread,
93 : enum thread_mode mode,
94 : int id) {
95 6 : thread->id = id;
96 6 : thread->mode = mode;
97 6 : sem_init(&thread->sem, 0, 0);
98 6 : pthread_mutex_init(&thread->exit_lock, NULL);
99 6 : thread->exit_signal = 0;
100 :
101 6 : init_thread_stat_items(thread->id);
102 :
103 :
104 6 : if (init_msg_queue(&thread->queue, &thread->sem) != 0) {
105 0 : log_error("Error initializing message queue for dedos thread");
106 0 : return -1;
107 : }
108 6 : log(LOG_DEDOS_THREADS, "Initialized thread %d (mode: %s, addr: %p)",
109 : id, mode == PINNED_THREAD ? "pinned" : "unpinned", thread);
110 6 : if (id == OUTPUT_THREAD_ID) {
111 0 : id = OUTPUT_THREAD_INDEX;
112 : }
113 6 : dedos_threads[id] = thread;
114 6 : return 0;
115 : }
116 :
117 : /** Structure which holds the initialization info for a dedos_thread */
118 : struct thread_init {
119 : dedos_thread_fn thread_fn;
120 : dedos_thread_init_fn init_fn;
121 : dedos_thread_destroy_fn destroy_fn;
122 : struct dedos_thread *self;
123 : sem_t sem;
124 : };
125 :
126 : /** Pins the thread with the pthread id `ptid` to the first unused core */
127 3 : static int pin_thread(pthread_t ptid) {
128 : int cpu_id;
129 3 : int num_cpu = sysconf(_SC_NPROCESSORS_ONLN);
130 3 : for (cpu_id = 0; cpu_id < num_cpu && pinned_cores[cpu_id] == 1; cpu_id++);
131 3 : if (cpu_id == num_cpu) {
132 1 : log_warn("No cores available to pin thread");
133 1 : return -1;
134 : }
135 :
136 : cpu_set_t cpuset;
137 2 : CPU_ZERO(&cpuset);
138 2 : CPU_SET(cpu_id, &cpuset);
139 2 : int s = pthread_setaffinity_np(ptid, sizeof(cpuset), &cpuset);
140 2 : if (s != 0) {
141 0 : log_warn("pthread_setaffinity_np returned error %d", s);
142 0 : return -1;
143 : }
144 2 : pinned_cores[cpu_id] = 1;
145 2 : log(LOG_DEDOS_THREADS, "Successfully pinned pthread %d", (int)ptid);
146 2 : return 0;
147 : }
148 :
149 2 : void dedos_thread_stop(struct dedos_thread *thread) {
150 2 : log_info("Signaling thread %d to exit", thread->id);
151 2 : pthread_mutex_lock(&thread->exit_lock);
152 2 : thread->exit_signal = 1;
153 2 : sem_post(&thread->sem);
154 2 : pthread_mutex_unlock(&thread->exit_lock);
155 2 : }
156 :
157 :
158 2 : void dedos_thread_join(struct dedos_thread *thread) {
159 2 : pthread_join(thread->pthread, NULL);
160 2 : free(thread);
161 2 : }
162 :
163 4 : int dedos_thread_should_exit(struct dedos_thread *thread) {
164 4 : pthread_mutex_lock(&thread->exit_lock);
165 4 : int exit_signal = thread->exit_signal;
166 4 : pthread_mutex_unlock(&thread->exit_lock);
167 4 : return exit_signal;
168 : }
169 :
170 : /**
171 : * The actual function passed to pthread_create() that starts a new thread.
172 : * Initilizes the appropriate structures, posts to the start semaphore,
173 : * then calls the appropriate function.
174 : * @param thread_init_v Pointer to the thread_init structure
175 : */
176 5 : static void *dedos_thread_starter(void *thread_init_v) {
177 5 : struct thread_init *init = thread_init_v;
178 :
179 : // Have to get things out of the thread_init, because
180 : // it will be destroyed externally once sem_post() is complete
181 5 : struct dedos_thread *thread = init->self;
182 5 : dedos_thread_fn thread_fn = init->thread_fn;
183 5 : dedos_thread_destroy_fn destroy_fn = init->destroy_fn;
184 :
185 5 : void *init_rtn = NULL;
186 5 : if (init->init_fn) {
187 5 : init_rtn = init->init_fn(thread);
188 : }
189 :
190 5 : if (thread->mode == PINNED_THREAD) {
191 3 : if (pin_thread(thread->pthread) != 0) {
192 1 : log_warn("Could not pin thread %d", thread->id);
193 : }
194 : }
195 :
196 5 : sem_post(&init->sem);
197 5 : log(LOG_DEDOS_THREADS, "Started thread %d (mode: %s, addr: %p)",
198 : thread->id, thread-> mode == PINNED_THREAD ? "pinned" : "unpinned", thread);
199 :
200 5 : int rtn = thread_fn(thread, init_rtn);
201 5 : log(LOG_DEDOS_THREADS, "Thread %d ended.", thread->id);
202 :
203 5 : if (destroy_fn) {
204 5 : destroy_fn(thread, init_rtn);
205 : }
206 :
207 5 : return (void*)(intptr_t)rtn;
208 : }
209 :
210 2 : static inline void gather_thread_metrics(struct dedos_thread *thread) {
211 : struct rusage usage;
212 2 : int rtn = getrusage(RUSAGE_THREAD, &usage);
213 2 : if (rtn < 0) {
214 0 : log_error("Error getting thread %d rusage", thread->id);
215 2 : return;
216 : }
217 2 : int id = thread->id;
218 2 : record_stat(THREAD_UCPUTIME, id, (double)usage.ru_utime.tv_sec + usage.ru_utime.tv_usec * 1e-6, 1);
219 2 : record_stat(THREAD_SCPUTIME, id, (double)usage.ru_stime.tv_sec + usage.ru_stime.tv_usec * 1e-6, 1);
220 2 : record_stat(THREAD_MAXRSS, id, usage.ru_maxrss, 0);
221 2 : record_stat(THREAD_MINFLT, id, usage.ru_minflt, 0);
222 2 : record_stat(THREAD_MAJFLT, id, usage.ru_majflt, 0);
223 2 : record_stat(THREAD_VCSW, id, usage.ru_nvcsw, 1);
224 2 : record_stat(THREAD_IVCSW, id, usage.ru_nivcsw, 1);
225 : }
226 :
227 :
228 :
229 : /** The amount of time that ::thread_wait should wait for if no timeout is provided */
230 : #define DEFAULT_WAIT_TIMEOUT_S 1
231 :
232 : #define MAX_METRIC_INTERVAL_MS 500
233 :
234 2 : int thread_wait(struct dedos_thread *thread, struct timespec *abs_timeout) {
235 :
236 2 : int rtn = sem_trywait(&thread->sem);
237 :
238 2 : if (rtn == 0) {
239 : struct timespec cur_time;
240 0 : clock_gettime(CLOCK_REALTIME_COARSE, &cur_time);
241 0 : if (cur_time.tv_sec * 1e3 + cur_time.tv_nsec / 1e6 > \
242 0 : thread->last_metric.tv_sec * 1e3 + thread->last_metric.tv_nsec / 1e6 + \
243 : MAX_METRIC_INTERVAL_MS) {
244 0 : gather_thread_metrics(thread);
245 0 : thread->last_metric = cur_time;
246 : }
247 0 : return 0;
248 2 : } else if (rtn == -1 && errno == EAGAIN) {
249 2 : gather_thread_metrics(thread);
250 2 : clock_gettime(CLOCK_REALTIME_COARSE, &thread->last_metric);
251 : }
252 :
253 2 : if (abs_timeout == NULL) {
254 : struct timespec cur_time;
255 2 : clock_gettime(CLOCK_REALTIME_COARSE, &cur_time);
256 2 : cur_time.tv_sec += DEFAULT_WAIT_TIMEOUT_S;
257 2 : rtn = sem_timedwait(&thread->sem, &cur_time);
258 : } else {
259 0 : rtn = sem_timedwait(&thread->sem, abs_timeout);
260 : }
261 2 : if (rtn < 0 && errno != ETIMEDOUT) {
262 0 : log_perror("Error waiting on thread semaphore");
263 0 : exit(-1);
264 : return -1;
265 : }
266 2 : return 0;
267 : }
268 :
269 5 : int start_dedos_thread(dedos_thread_fn thread_fn,
270 : dedos_thread_init_fn init_fn,
271 : dedos_thread_destroy_fn destroy_fn,
272 : enum blocking_mode mode,
273 : int id,
274 : struct dedos_thread *thread) {
275 5 : if (id == OUTPUT_THREAD_ID) {
276 0 : id = MAX_DEDOS_THREAD_ID;
277 : }
278 5 : int rtn = init_dedos_thread(thread, mode, id);
279 5 : struct thread_init init = {
280 : .thread_fn = thread_fn,
281 : .init_fn = init_fn,
282 : .destroy_fn = destroy_fn,
283 : .self = thread
284 : };
285 5 : if (sem_init(&init.sem, 0, 0)) {
286 0 : log_perror("Error initializing semaphore for dedos thread");
287 0 : return -1;
288 : }
289 5 : log(LOG_DEDOS_THREADS, "Waiting on thread %d to start", id);
290 5 : rtn = pthread_create(&thread->pthread, NULL,
291 : dedos_thread_starter, (void*)&init);
292 5 : if (rtn < 0) {
293 0 : log_error("pthread_create failed with errno: %d", rtn);
294 0 : return -1;
295 : }
296 5 : if (sem_wait(&init.sem) != 0) {
297 0 : log_perror("Error waiting on thread start semaphore");
298 0 : return -1;
299 : }
300 5 : log(LOG_DEDOS_THREADS, "Thread %d started successfully", id);
301 5 : sem_destroy(&init.sem);
302 5 : return 0;
303 : }
|