Line data Source code
1 : /*
2 : START OF LICENSE STUB
3 : DeDOS: Declarative Dispersion-Oriented Software
4 : Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 :
6 : This program is free software: you can redistribute it and/or modify
7 : it under the terms of the GNU General Public License as published by
8 : the Free Software Foundation, either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : This program is distributed in the hope that it will be useful,
12 : but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : GNU General Public License for more details.
15 :
16 : You should have received a copy of the GNU General Public License
17 : along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : END OF LICENSE STUB
19 : */
20 : /**
21 : * @file local_msu.c
22 : * Defines the structures and functions used by MSUs on the local machine
23 : */
24 :
25 : #include "local_msu.h"
26 : #include "routing_strategies.h"
27 : #include "logging.h"
28 : #include "rt_stats.h"
29 : #include "msu_message.h"
30 : #include "inter_runtime_messages.h"
31 : #include "thread_message.h"
32 : #include "msu_state.h"
33 : #include "msu_calls.h"
34 :
35 : #include <stdlib.h>
36 : #define __USE_GNU // For some reason, necessary for RUSAGE_THREAD
37 : #include <sys/resource.h>
38 : #include <sys/time.h>
39 : /**
40 : * MOVEME: MAX_MSU_ID
41 : * Defines the maximum ID that can be assigned to an MSU.
42 : * Necessary becaues MSUs are indexed by ID in the local registry.
43 : */
44 : #define MAX_MSU_ID 1024
45 :
46 : // TODO: This lock might not be useful, as I believe this registry will
47 : // TODO: only ever be accessed from the socket handler thread
48 :
49 : /** Lock to protect access to local msu registry */
50 : pthread_rwlock_t msu_registry_lock;
51 : /** Mapping of MSU ID to the specific instance of the local MSU */
52 : struct local_msu *local_msu_registry[MAX_MSU_ID];
53 :
54 : /**
55 : * Allocates the memory associated with an MSU structure
56 : * @return Pointer to allocated MSU
57 : */
58 11 : static struct local_msu *msu_alloc() {
59 11 : struct local_msu *msu = calloc(1, sizeof(*msu));
60 11 : if (msu == NULL) {
61 0 : log_error("Failed to allocate MSU");
62 0 : return NULL;
63 : }
64 11 : return msu;
65 : }
66 :
67 : /**
68 : * Frees the memory associated with an MSU structure,
69 : * including any routes, messages in its queue, or states
70 : */
71 2 : static void msu_free(struct local_msu *msu) {
72 2 : struct msu_msg *msg = dequeue_msu_msg(&msu->queue);
73 4 : while (msg != NULL) {
74 0 : if (msg->data_size > 0) {
75 0 : free(msg->data);
76 : }
77 0 : free(msg);
78 0 : msg = dequeue_msu_msg(&msu->queue);
79 : }
80 2 : msu_free_all_state(msu);
81 2 : free(msu->routes.routes);
82 2 : free(msu);
83 2 : }
84 :
85 : /**
86 : * Removes an MSU from the local MSU registry
87 : * @param id ID of the MSU to remove
88 : * @return 0 on success, -1 on error
89 : */
90 4 : static int rm_from_local_registry(int id) {
91 4 : if (pthread_rwlock_wrlock(&msu_registry_lock) != 0) {
92 0 : log_perror("Error opening write lock on msu registry");
93 0 : return -1;
94 : }
95 4 : int rtn = 0;
96 4 : if (local_msu_registry[id] == NULL) {
97 1 : log_warn("MSU with id %d does not exist and cannot be deleted", id);
98 1 : rtn = -1;
99 : } else {
100 3 : local_msu_registry[id] = NULL;
101 : }
102 4 : if (pthread_rwlock_unlock(&msu_registry_lock) != 0) {
103 0 : log_perror("Error unlocking msu registry");
104 0 : return -1;
105 : }
106 4 : return rtn;
107 : }
108 :
109 : /**
110 : * Adds an MSU to the local registry so it can be referred to elsewhere by ID
111 : * @param MSU the local MSU to add to the registry
112 : * @return 0 on success, -1 on error
113 : */
114 13 : static int add_to_local_registry(struct local_msu *msu) {
115 13 : if (pthread_rwlock_wrlock(&msu_registry_lock) != 0) {
116 0 : log_perror("Error opening write lock on msu registry");
117 0 : return -1;
118 : }
119 13 : int rtn = 0;
120 13 : if (local_msu_registry[msu->id] != NULL) {
121 1 : log_error("MSU with id %d already exists and cannot be added to registry", msu->id);
122 1 : rtn = -1;
123 : } else {
124 12 : local_msu_registry[msu->id] = msu;
125 : }
126 13 : if (pthread_rwlock_unlock(&msu_registry_lock) != 0) {
127 0 : log_perror("Error unlocking msu registry");
128 0 : return -1;
129 : }
130 13 : return rtn;
131 : }
132 :
133 :
134 5 : struct local_msu *get_local_msu(unsigned int id) {
135 5 : if (id >= MAX_MSU_ID) {
136 0 : log_error("MSU id %u too high!", id);
137 0 : return NULL;
138 : }
139 5 : if (pthread_rwlock_rdlock(&msu_registry_lock) != 0) {
140 0 : log_perror("Error opening read lock on MSU registry");
141 0 : return NULL;
142 : }
143 5 : struct local_msu *msu = local_msu_registry[id];
144 5 : if (pthread_rwlock_unlock(&msu_registry_lock) != 0) {
145 0 : log_perror("Error unlocking msu registry");
146 0 : return NULL;
147 : }
148 5 : if (msu == NULL) {
149 1 : log_error("Could not get local msu with id %d. Not registered.", id);
150 : }
151 5 : return msu;
152 : }
153 :
154 : /** The stat IDs that are associated with an MSU, to be registered on MSU creation */
155 : static enum stat_id MSU_STAT_IDS[] = {
156 : MSU_QUEUE_LEN,
157 : MSU_ITEMS_PROCESSED,
158 : MSU_EXEC_TIME,
159 : MSU_IDLE_TIME,
160 : MSU_MEM_ALLOC,
161 : MSU_NUM_STATES,
162 : MSU_ERROR_CNT,
163 : MSU_UCPUTIME,
164 : MSU_SCPUTIME,
165 : MSU_MINFLT,
166 : MSU_MAJFLT,
167 : MSU_VCSW,
168 : MSU_IVCSW
169 : };
170 :
171 : #define NUM_MSU_STAT_IDS sizeof(MSU_STAT_IDS) / sizeof(enum stat_id)
172 :
173 : /**
174 : * Initializes the stat IDS that are relevant to an MSU
175 : * @param msu_id ID of the msu to register
176 : */
177 11 : static void init_msu_stats(int msu_id) {
178 154 : for (int i=0; i<NUM_MSU_STAT_IDS; i++) {
179 143 : if (init_stat_item(MSU_STAT_IDS[i], msu_id) != 0) {
180 0 : log_warn("Could not initialize stat item %d for msu %d", MSU_STAT_IDS[i], msu_id);
181 : }
182 : }
183 11 : }
184 :
185 : /**
186 : * Unregisters the stat IDS that are relevant to an MSU
187 : * @param msu_id ID of the MSU to register
188 : */
189 2 : static void unregister_msu_stats(int msu_id) {
190 28 : for (int i=0; i < NUM_MSU_STAT_IDS; i++) {
191 26 : if (remove_stat_item(MSU_STAT_IDS[i], msu_id) != 0) {
192 0 : log_warn("Could not remove stat item %d for msu %d",
193 : MSU_STAT_IDS[i], msu_id);
194 : }
195 : }
196 2 : }
197 :
198 11 : struct local_msu *init_msu(unsigned int id,
199 : struct msu_type *type,
200 : struct worker_thread *thread,
201 : struct msu_init_data *data) {
202 11 : struct local_msu *msu = msu_alloc();
203 11 : init_msu_stats(id);
204 11 : msu->thread = thread;
205 11 : msu->id = id;
206 11 : msu->type = type;
207 11 : msu->scheduling_weight = 0;
208 :
209 11 : if (init_msg_queue(&msu->queue, &thread->thread->sem) != 0) {
210 0 : msu_free(msu);
211 0 : log_error("Error initializing msu queue");
212 0 : return NULL;
213 : }
214 :
215 : // Must be done before running init function, or the msu cannot enqueue to itself
216 11 : int rtn = register_msu_with_thread(msu);
217 11 : if (rtn < 0) {
218 0 : log_error("Error registering MSU With thread");
219 0 : msu_free(msu);
220 0 : return NULL;
221 : }
222 :
223 : // TODO: Unregister if creation fails
224 11 : log_info("Initializing msu (ID: %d, type: %s, data: '%s')", id, type->name,
225 : data->init_data);
226 :
227 : // Run the MSU's type-specific init function if it has one
228 11 : if (type->init) {
229 7 : if (type->init(msu, data) != 0) {
230 0 : log_error("Error running MSU %d (type: %s) type-specific initialization function",
231 : id, type->name);
232 0 : unregister_msu_with_thread(msu);
233 0 : msu_free(msu);
234 0 : return NULL;
235 : }
236 : }
237 :
238 11 : rtn = add_to_local_registry(msu);
239 11 : if (rtn < 0) {
240 0 : log_error("Error adding MSU to local registry");
241 0 : msu_free(msu);
242 0 : unregister_msu_with_thread(msu);
243 0 : return NULL;
244 : }
245 :
246 11 : return msu;
247 : }
248 :
249 0 : int try_destroy_msu(struct local_msu *msu) {
250 0 : if (msu_num_states(msu) > 0) {
251 0 : return 1;
252 : }
253 0 : destroy_msu(msu);
254 0 : return 0;
255 : }
256 :
257 2 : void destroy_msu(struct local_msu *msu) {
258 2 : int id = msu->id;
259 2 : char *type = msu->type->name;
260 2 : unregister_msu_stats(msu->id);
261 2 : if (msu->type->destroy) {
262 1 : msu->type->destroy(msu);
263 : }
264 2 : msu_free(msu);
265 2 : rm_from_local_registry(id);
266 2 : log_info("Removed msu (ID: %d, Type: %s)", id, type);
267 2 : }
268 :
269 : /**
270 : * Calls type-specific MSU receive function and records execution time
271 : * @param msu MSU to receive data
272 : * @param data Data to be sent to MSU
273 : * @return 0 on success, -1 on error
274 : */
275 2 : static int msu_receive(struct local_msu *msu, struct msu_msg *msg) {
276 :
277 2 : record_end_time(MSU_IDLE_TIME, msu->id);
278 2 : record_start_time(MSU_EXEC_TIME, msu->id);
279 2 : int rtn = msu->type->receive(msu, msg);
280 2 : record_end_time(MSU_EXEC_TIME, msu->id);
281 2 : record_start_time(MSU_IDLE_TIME, msu->id);
282 :
283 2 : if (rtn != 0) {
284 0 : log_error("Error executing MSU %d (%s) receive function",
285 : msu->id, msu->type->name);
286 0 : return -1;
287 : }
288 2 : return 0;
289 : }
290 :
291 1 : static inline int gather_metrics_before(struct rusage *before) {
292 1 : int rtn = getrusage(RUSAGE_THREAD, before);
293 1 : if (rtn < 0) {
294 0 : log_error("Error getting MSU rusage");
295 : }
296 1 : return rtn;
297 : }
298 :
299 : #define RECORD_DIFF(dstat, rstat, id) \
300 : increment_stat(dstat, id, after.rstat - before->rstat)
301 :
302 : #define RECORD_TIMEDIFF(dstat, rstat, id) \
303 : increment_stat(dstat, id, ((double)after.rstat.tv_sec + after.rstat.tv_usec * 1e-6) - \
304 : ((double)before->rstat.tv_sec + before->rstat.tv_usec * 1e-6))
305 :
306 1 : static inline void record_metrics(struct rusage *before, int msu_id) {
307 : struct rusage after;
308 1 : int rtn = getrusage(RUSAGE_THREAD, &after);
309 1 : if (rtn < 0) {
310 0 : log_error("Error getting MSU rusage");
311 1 : return;
312 : }
313 :
314 1 : RECORD_TIMEDIFF(MSU_UCPUTIME, ru_utime, msu_id);
315 1 : RECORD_TIMEDIFF(MSU_SCPUTIME, ru_stime, msu_id);
316 1 : RECORD_DIFF(MSU_MINFLT, ru_minflt, msu_id);
317 1 : RECORD_DIFF(MSU_MAJFLT, ru_majflt, msu_id);
318 1 : RECORD_DIFF(MSU_VCSW, ru_nvcsw, msu_id);
319 1 : RECORD_DIFF(MSU_IVCSW, ru_nivcsw, msu_id);
320 : }
321 :
322 :
323 2 : int msu_dequeue(struct local_msu *msu) {
324 2 : struct msu_msg *msg = dequeue_msu_msg(&msu->queue);
325 2 : if (msg) {
326 1 : if (msg->hdr.error_flag) {
327 0 : if (msu->type->receive_error == NULL) {
328 0 : log_error("MSU %d received error with no handler", msu->id);
329 0 : return -1;
330 : }
331 0 : int rtn = msu->type->receive_error(msu, msg);
332 0 : if (rtn < 0) {
333 0 : log_error("Error executing MSU error receive function");
334 0 : return -1;
335 : }
336 : } else {
337 1 : log(LOG_MSU_DEQUEUES, "Dequeued MSU message %p for msu %d", msg, msu->id);
338 : struct rusage before;
339 1 : record_stat(MSU_QUEUE_LEN, msu->id, msu->queue.num_msgs, false);
340 1 : int gather_err = gather_metrics_before(&before);
341 1 : int rtn = msu_receive(msu, msg);
342 1 : if (gather_err == 0) {
343 1 : record_metrics(&before, msu->id);
344 : }
345 1 : increment_stat(MSU_ITEMS_PROCESSED, msu->id, 1);
346 1 : free(msg);
347 1 : return rtn;
348 : }
349 : }
350 1 : return 1;
351 : }
352 :
353 1 : int msu_error(struct local_msu *msu, struct msu_msg_hdr *hdr, int broadcast) {
354 :
355 1 : increment_stat(MSU_ERROR_CNT, msu->id, 1);
356 :
357 1 : if (!broadcast) {
358 0 : return 0;
359 : }
360 :
361 3 : for (int i=0; i < hdr->provinance.path_len; i++) {
362 2 : struct msu_provinance_item *upstream = &hdr->provinance.path[i];
363 2 : struct msu_type *up_type = get_msu_type(upstream->type_id);
364 2 : if (up_type == NULL) {
365 0 : log_error("Cannot get type %d", upstream->type_id);
366 1 : continue;
367 : }
368 2 : if (up_type->receive_error == NULL) {
369 1 : continue;
370 : }
371 : struct msu_endpoint receiver;
372 1 : int rtn = init_msu_endpoint(upstream->msu_id, upstream->runtime_id, &receiver);
373 1 : if (rtn < 0) {
374 0 : log_error("Error initializing msu endpoint");
375 0 : continue;
376 : }
377 1 : rtn = call_msu_error(msu, &receiver, up_type, hdr, 0, NULL);
378 1 : if (rtn < 0) {
379 0 : log_error("Error calling MSU endpoint for error report");
380 0 : continue;
381 : }
382 : }
383 1 : return 0;
384 : }
|