Line data Source code
1 : /*
2 : START OF LICENSE STUB
3 : DeDOS: Declarative Dispersion-Oriented Software
4 : Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 :
6 : This program is free software: you can redistribute it and/or modify
7 : it under the terms of the GNU General Public License as published by
8 : the Free Software Foundation, either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : This program is distributed in the hope that it will be useful,
12 : but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : GNU General Public License for more details.
15 :
16 : You should have received a copy of the GNU General Public License
17 : along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : END OF LICENSE STUB
19 : */
20 : /**
21 : * @file: controller_communication.c
22 : * Communication with global controller from runtime
23 : */
24 :
25 : #include "controller_communication.h"
26 : #include "communication.h"
27 : #include "logging.h"
28 : #include "socket_monitor.h"
29 : #include "dedos_threads.h"
30 : #include "thread_message.h"
31 : #include "runtime_dfg.h"
32 : #include "rt_stats.h"
33 : #include "worker_thread.h"
34 : #include "output_thread.h"
35 :
36 : #include <stdlib.h>
37 : #include <arpa/inet.h>
38 : #include <string.h>
39 :
40 : /**
41 : * Static (global) variable to hold the socket
42 : * connecting to the global controller
43 : */
44 : static int controller_sock = -1;
45 :
46 :
47 2 : int send_to_controller(struct rt_controller_msg_hdr *hdr, void *payload) {
48 :
49 2 : if (controller_sock < 0) {
50 0 : log_error("Controller socket not initialized");
51 0 : return -1;
52 : }
53 :
54 2 : int rtn = send_to_endpoint(controller_sock, hdr, sizeof(*hdr));
55 2 : if (rtn <= 0) {
56 0 : log_error("Error sending header to controller");
57 0 : return -1;
58 : }
59 2 : if (hdr->payload_size <= 0) {
60 0 : return 0;
61 : }
62 :
63 2 : rtn = send_to_endpoint(controller_sock, payload, hdr->payload_size);
64 2 : if (rtn <= 0) {
65 0 : log_error("Error sending payload to controller");
66 0 : return -1;
67 : }
68 :
69 2 : log(LOG_CONTROLLER_COMMUNICATION, "Sent payload of size %d type %d to controller",
70 : (int)hdr->payload_size, hdr->type);
71 2 : return 0;
72 : }
73 :
74 : /**
75 : * Sends the initilization message containing runtime ID, ip and port to
76 : * global controller
77 : */
78 2 : static int send_ctl_init_msg() {
79 2 : int local_id = local_runtime_id();
80 2 : if (local_id < 0) {
81 0 : log_error("Could not get local runtime ID to send to controller");
82 0 : return -1;
83 : }
84 2 : uint32_t ip = local_runtime_ip();
85 2 : int port = local_runtime_port();
86 :
87 2 : struct rt_controller_init_msg msg = {
88 : .runtime_id = local_id,
89 : .ip = ip,
90 : .port = port
91 : };
92 :
93 2 : struct rt_controller_msg_hdr hdr = {
94 : .type = RT_CTL_INIT,
95 : .payload_size = sizeof(msg)
96 : };
97 :
98 2 : return send_to_controller(&hdr, &msg);
99 : }
100 :
101 : /**
102 : * Initializes a connection to the global controller
103 : * @returns 0 on success, -1 on error
104 : */
105 1 : static int connect_to_controller(struct sockaddr_in *addr) {
106 :
107 1 : if (controller_sock != -1) {
108 0 : log_error("Controller socket already initialized");
109 0 : return -1;
110 : }
111 :
112 1 : controller_sock = init_connected_socket(addr);
113 :
114 1 : if (controller_sock < 0) {
115 0 : log_error("Error connecting to global controller!");
116 0 : return -1;
117 : }
118 :
119 : char ip[INET_ADDRSTRLEN];
120 1 : inet_ntop(AF_INET, &addr->sin_addr, ip, INET_ADDRSTRLEN);
121 1 : int port = ntohs(addr->sin_port);
122 :
123 1 : log_info("Connected to global controller at %s:%d", ip, port);
124 :
125 1 : int rtn = send_ctl_init_msg();
126 1 : if (rtn < 0) {
127 0 : log_error("Error sending initialization message to controller");
128 0 : return -1;
129 : }
130 1 : return controller_sock;
131 : }
132 :
133 : /**
134 : * Macro to check whether the size of a message matches the size
135 : * of the struct it's supposed to be
136 : */
137 : #define CHECK_MSG_SIZE(msg, target) \
138 : if (msg->payload_size != sizeof(target)) { \
139 : log_warn("Message data size (%d) does not match size" \
140 : "of target type (%d)" #target, (int)msg->payload_size , \
141 : (int)sizeof(target)); \
142 : return -1; \
143 : } \
144 : return 0;
145 :
146 : /**
147 : * Checks whether the size of a message matches the size of its target struct
148 : */
149 2 : static int verify_msg_size(struct ctrl_runtime_msg_hdr *msg) {
150 2 : switch (msg->type) {
151 : case CTRL_CONNECT_TO_RUNTIME:
152 0 : CHECK_MSG_SIZE(msg, struct ctrl_add_runtime_msg);
153 : case CTRL_CREATE_THREAD:
154 0 : CHECK_MSG_SIZE(msg, struct ctrl_create_thread_msg);
155 : case CTRL_DELETE_THREAD:
156 0 : CHECK_MSG_SIZE(msg, struct ctrl_create_thread_msg);
157 : case CTRL_MODIFY_ROUTE:
158 0 : CHECK_MSG_SIZE(msg, struct ctrl_route_msg);
159 : case CTRL_CREATE_MSU:
160 2 : CHECK_MSG_SIZE(msg, struct ctrl_create_msu_msg);
161 : case CTRL_DELETE_MSU:
162 0 : CHECK_MSG_SIZE(msg, struct ctrl_delete_msu_msg);
163 : case CTRL_MSU_ROUTES:
164 0 : CHECK_MSG_SIZE(msg, struct ctrl_msu_route_msg);
165 : default:
166 0 : log_error("Received unknown message type: %d", msg->type);
167 0 : return -1;
168 : }
169 : }
170 :
171 : /**
172 : * Processes a received ctrl_add_runtime_msg
173 : */
174 1 : static int process_connect_to_runtime(struct ctrl_add_runtime_msg *msg) {
175 1 : struct sockaddr_in addr = {};
176 1 : addr.sin_family = AF_INET;
177 1 : addr.sin_addr.s_addr = msg->ip;
178 1 : addr.sin_port = htons(msg->port);
179 :
180 1 : int rtn = connect_to_runtime_peer(msg->runtime_id, &addr);
181 1 : if (rtn < 0) {
182 0 : log_error("Could not add runtime peer");
183 0 : return -1;
184 : }
185 1 : return 0;
186 : }
187 :
188 : /**
189 : * Processes a received ctrl_create_thread_msg
190 : */
191 2 : static int process_create_thread_msg(struct ctrl_create_thread_msg *msg) {
192 2 : int id = msg->thread_id;
193 2 : int rtn = create_worker_thread(id, msg->mode);
194 2 : if (rtn < 0) {
195 1 : log_error("Error creating worker thread %d", id);
196 1 : return -1;
197 : }
198 1 : log(LOG_THREAD_CREATION, "Created worker thread %d", id);
199 1 : return 0;
200 : }
201 :
202 : /**
203 : * Processes a received ctrl_route_msg
204 : */
205 4 : static int process_ctrl_route_msg(struct ctrl_route_msg *msg) {
206 : int rtn;
207 4 : log(LOG_CONTROLLER_COMMUNICATION, "Got control route message of type %d", msg->type);
208 4 : switch (msg->type) {
209 : case CREATE_ROUTE:
210 2 : rtn = init_route(msg->route_id, msg->type_id);
211 2 : if (rtn < 0) {
212 1 : log_error("Error creating new route of id %d, type %d",
213 : msg->route_id, msg->type_id);
214 1 : return 1;
215 : }
216 1 : return 0;
217 : case ADD_ENDPOINT:;
218 : struct msu_endpoint endpoint;
219 2 : int rtn = init_msu_endpoint(msg->msu_id, msg->runtime_id, &endpoint);
220 2 : if (rtn < 0) {
221 1 : log_error("Cannot initilize runtime endpoint for adding "
222 : "endpoint %d to route %d", msg->msu_id, msg->route_id);
223 1 : return 1;
224 : }
225 1 : rtn = add_route_endpoint(msg->route_id, endpoint, msg->key);
226 1 : if (rtn < 0) {
227 0 : log_error("Error adding endpoint %d to route %d with key %d",
228 : msg->msu_id, msg->route_id, msg->key);
229 0 : return 1;
230 : }
231 1 : return 0;
232 : case DEL_ENDPOINT:
233 0 : rtn = remove_route_endpoint(msg->route_id, msg->msu_id);
234 0 : if (rtn < 0) {
235 0 : log_error("Error removing endpoint %d from route %d",
236 : msg->msu_id, msg->route_id);
237 0 : return 1;
238 : }
239 0 : return 0;
240 : case MOD_ENDPOINT:
241 0 : rtn = modify_route_endpoint(msg->route_id, msg->msu_id, msg->key);
242 0 : if (rtn < 0) {
243 0 : log_error("Error modifying endpoint %d on route %d to have key %d",
244 : msg->msu_id, msg->route_id, msg->key);
245 0 : return 1;
246 : }
247 0 : return 0;
248 : default:
249 0 : log_error("Unknown route control message type received: %d", msg->type);
250 0 : return -1;
251 : }
252 : }
253 :
254 : /**
255 : * Gets the corresponding thread_msg_type for a ctrl_runtime_msg_type
256 : */
257 2 : static enum thread_msg_type get_thread_msg_type(enum ctrl_runtime_msg_type type) {
258 2 : switch (type) {
259 : case CTRL_CREATE_MSU:
260 2 : return CREATE_MSU;
261 : case CTRL_DELETE_MSU:
262 0 : return DELETE_MSU;
263 : case CTRL_MSU_ROUTES:
264 0 : return MSU_ROUTE;
265 : default:
266 0 : log_error("Unknown thread message type %d", type);
267 0 : return UNKNOWN_THREAD_MSG;
268 : }
269 : }
270 :
271 : /**
272 : * Constructs a thread message from a ctrl_runtime_msg_hdr, reading any additional
273 : * information it needs off of the associated socket
274 : * @param hdr Header describing the information available to read
275 : * @param fd The file descriptor off of which to read the remainder of the control message
276 : * @return Created thread_msg on success, NULL on error
277 : */
278 2 : static struct thread_msg *thread_msg_from_ctrl_hdr(struct ctrl_runtime_msg_hdr *hdr, int fd) {
279 2 : if (verify_msg_size(hdr) != 0) {
280 0 : log_warn("May not process message. Incorrect payload size for type.");
281 : }
282 :
283 2 : void *msg_data = malloc(hdr->payload_size);
284 2 : int rtn = read_payload(fd, hdr->payload_size, msg_data);
285 2 : if (rtn < 0) {
286 0 : log_error("Error reading control payload. Cannot process message");
287 0 : free(msg_data);
288 0 : return NULL;
289 : }
290 2 : log(LOG_CONTROLLER_COMMUNICATION, "Read control payload %p of size %d",
291 : msg_data, (int)hdr->payload_size);
292 2 : enum thread_msg_type type = get_thread_msg_type(hdr->type);
293 2 : struct thread_msg *thread_msg = construct_thread_msg(type, hdr->payload_size, msg_data);
294 2 : thread_msg->ack_id = hdr->id;
295 2 : return thread_msg;
296 : }
297 :
298 : /**
299 : * Constructs a thread_msg from a control-runtime message and passes it to the relevant thread.
300 : * @param hdr Header describing info available to read
301 : * @param fd File descriptor to read message from
302 : * @return 0 on success, -1 on error
303 : */
304 1 : static int pass_ctrl_msg_to_thread(struct ctrl_runtime_msg_hdr *hdr, int fd) {
305 1 : struct thread_msg *thread_msg = thread_msg_from_ctrl_hdr(hdr, fd);
306 1 : if (thread_msg == NULL) {
307 0 : log_error("Error constructing thread msg from control hdr");
308 0 : return -1;
309 : }
310 1 : struct dedos_thread *thread = get_dedos_thread(hdr->thread_id);
311 1 : if (thread == NULL) {
312 0 : log_error("Error getting dedos thread %d to deliver control message",
313 : hdr->thread_id);
314 0 : destroy_thread_msg(thread_msg);
315 0 : return -1;
316 : }
317 :
318 1 : int rtn = enqueue_thread_msg(thread_msg, &thread->queue);
319 1 : if (rtn < 0) {
320 0 : log_error("Error enquing control message on thread %d", hdr->thread_id);
321 0 : return -1;
322 : }
323 1 : return 0;
324 : }
325 :
326 : /**
327 : * Processes a received control message that is due for delivery to this thread.
328 : * @param hdr The header for the control message
329 : * @param fd File descriptor off of which to read the control message
330 : * @return 0 on success, -1 on error
331 : */
332 0 : static int process_ctrl_message(struct ctrl_runtime_msg_hdr *hdr, int fd) {
333 0 : if (verify_msg_size(hdr) != 0) {
334 0 : log_warn("May not process message. Incorrect payload size for type");
335 : }
336 :
337 0 : char msg_data[hdr->payload_size];
338 0 : int rtn = read_payload(fd, hdr->payload_size, (void*)msg_data);
339 :
340 0 : if (rtn < 0) {
341 0 : log_error("Error reading control payload. Cannot process message");
342 0 : return -1;
343 : }
344 0 : log(LOG_CONTROLLER_COMMUNICATION, "Read control payload %p of size %d",
345 : msg_data, (int)hdr->payload_size);
346 :
347 0 : switch (hdr->type) {
348 : case CTRL_CONNECT_TO_RUNTIME:
349 0 : rtn = process_connect_to_runtime((struct ctrl_add_runtime_msg*) msg_data);
350 0 : if (rtn < 0) {
351 0 : log_error("Error processing connect to runtime message");
352 0 : return -1;
353 : }
354 0 : break;
355 : case CTRL_CREATE_THREAD:
356 0 : rtn = process_create_thread_msg((struct ctrl_create_thread_msg*) msg_data);
357 0 : if (rtn < 0) {
358 0 : log_error("Error processing create thread message");
359 0 : return -1;
360 : }
361 0 : break;
362 : case CTRL_DELETE_THREAD:
363 0 : log_critical("TODO!");
364 0 : break;
365 : case CTRL_MODIFY_ROUTE:
366 0 : rtn = process_ctrl_route_msg((struct ctrl_route_msg*) msg_data);
367 0 : if (rtn < 0) {
368 0 : log_error("Error processing control route message");
369 0 : return -1;
370 : }
371 0 : break;
372 : default:
373 0 : log_error("Unknown message type delivered to input thread: %d", hdr->type);
374 0 : return -1;
375 : }
376 0 : return 0;
377 : }
378 :
379 : //TODO: send_ack_message()
380 0 : int send_ack_message(int id, bool success) {
381 : // Not implemented yet
382 0 : return 0;
383 : }
384 :
385 : /** Processes any received control message. */
386 0 : static int process_ctrl_message_hdr(struct ctrl_runtime_msg_hdr *hdr, int fd) {
387 :
388 : int rtn;
389 0 : switch (hdr->type) {
390 : case CTRL_CREATE_MSU:
391 : case CTRL_DELETE_MSU:
392 : case CTRL_MSU_ROUTES:
393 0 : rtn = pass_ctrl_msg_to_thread(hdr, fd);
394 0 : if (rtn < 0) {
395 0 : log_error("Error passing control message to thread");
396 0 : return -1;
397 : }
398 0 : break;
399 : case CTRL_CONNECT_TO_RUNTIME:
400 : case CTRL_CREATE_THREAD:
401 : case CTRL_DELETE_THREAD:
402 : case CTRL_MODIFY_ROUTE:
403 0 : rtn = process_ctrl_message(hdr, fd);
404 0 : if (rtn < 0) {
405 0 : log_error("Error processing control message");
406 0 : send_ack_message(hdr->id, false);
407 0 : return -1;
408 : }
409 0 : send_ack_message(hdr->id, true);
410 0 : break;
411 : default:
412 0 : log_error("Unknown header type %d in receiving thread", hdr->type);
413 0 : return -1;
414 : }
415 :
416 0 : return 0;
417 : }
418 :
419 0 : int handle_controller_communication(int fd) {
420 : struct ctrl_runtime_msg_hdr hdr;
421 0 : int rtn = read_payload(fd, sizeof(hdr), &hdr);
422 0 : if (rtn< 0) {
423 0 : log_error("Error reading control message");
424 0 : return -1;
425 0 : } else if (rtn == 1) {
426 0 : log_critical("Disconnected from global controller");
427 0 : close(fd);
428 0 : return 1;
429 : } else {
430 0 : log(LOG_CONTROLLER_COMMUNICATION,
431 : "Read header (type %d) from controller", hdr.type);
432 : }
433 :
434 0 : rtn = process_ctrl_message_hdr(&hdr, fd);
435 0 : if (rtn < 0) {
436 0 : log_error("Error processing control message");
437 0 : return -1;
438 : }
439 :
440 0 : return 0;
441 : }
442 :
443 0 : bool is_controller_fd(int fd) {
444 0 : return fd == controller_sock;
445 : }
446 :
447 :
448 0 : int init_controller_socket(struct sockaddr_in *addr) {
449 0 : int sock = connect_to_controller(addr);
450 0 : if (sock < 0) {
451 0 : log_error("Error connecting to global controller");
452 0 : return -1;
453 : }
454 0 : if (monitor_controller_socket(sock) != 0) {
455 0 : log_error("Attempted to initialize controller socket "
456 : "before initializing runtime epoll");
457 0 : return -1;
458 : }
459 0 : return sock;
460 : }
461 :
462 0 : int send_stats_to_controller() {
463 0 : if (controller_sock < 0) {
464 0 : log(LOG_STAT_SEND, "Skipping sending statistics: controller not initialized");
465 0 : return -1;
466 : }
467 0 : int rtn = 0;
468 0 : int total_items = 0;
469 : struct timespec now;
470 0 : clock_gettime(CLOCK_REALTIME_COARSE, &now);
471 0 : for (int i=0; i<N_REPORTED_STAT_TYPES; i++) {
472 0 : enum stat_id stat_id = reported_stat_types[i].id;
473 : int n_items;
474 0 : struct stat_sample *samples = get_stat_samples(stat_id, &now, &n_items);
475 0 : total_items += n_items;
476 0 : if (samples == NULL) {
477 0 : log(LOG_STAT_SEND, "Error getting stat sample for send to controller");
478 0 : continue;
479 : }
480 0 : size_t serial_size = serialized_stat_sample_size(samples, n_items);
481 :
482 0 : char buffer[serial_size];
483 0 : size_t ser_rtn = serialize_stat_samples(samples, n_items, buffer, serial_size);
484 : if (ser_rtn < 0) {
485 : log_error("Error serializing stat sample");
486 : rtn = -1;
487 : }
488 :
489 0 : struct rt_controller_msg_hdr hdr = {
490 : .type = RT_STATS,
491 : .payload_size = ser_rtn
492 : };
493 :
494 0 : int rtn = send_to_controller(&hdr, buffer);
495 0 : if (rtn < 0) {
496 0 : log_error("Error sending statistics to controller");
497 0 : rtn = -1;
498 : }
499 : }
500 0 : log(LOG_STAT_SEND, "Sending %d statistics to controller", total_items);
501 0 : return rtn;
502 : }
|