My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
controller_communication.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
26 #include "communication.h"
27 #include "logging.h"
28 #include "socket_monitor.h"
29 #include "dedos_threads.h"
30 #include "thread_message.h"
31 #include "runtime_dfg.h"
32 #include "rt_stats.h"
33 #include "worker_thread.h"
34 #include "output_thread.h"
35 
36 #include <stdlib.h>
37 #include <arpa/inet.h>
38 #include <string.h>
39 
44 static int controller_sock = -1;
45 
46 
47 int send_to_controller(struct rt_controller_msg_hdr *hdr, void *payload) {
48 
49  if (controller_sock < 0) {
50  log_error("Controller socket not initialized");
51  return -1;
52  }
53 
54  int rtn = send_to_endpoint(controller_sock, hdr, sizeof(*hdr));
55  if (rtn <= 0) {
56  log_error("Error sending header to controller");
57  return -1;
58  }
59  if (hdr->payload_size <= 0) {
60  return 0;
61  }
62 
63  rtn = send_to_endpoint(controller_sock, payload, hdr->payload_size);
64  if (rtn <= 0) {
65  log_error("Error sending payload to controller");
66  return -1;
67  }
68 
69  log(LOG_CONTROLLER_COMMUNICATION, "Sent payload of size %d type %d to controller",
70  (int)hdr->payload_size, hdr->type);
71  return 0;
72 }
73 
78 static int send_ctl_init_msg() {
79  int local_id = local_runtime_id();
80  if (local_id < 0) {
81  log_error("Could not get local runtime ID to send to controller");
82  return -1;
83  }
85  int port = local_runtime_port();
86 
87  struct rt_controller_init_msg msg = {
88  .runtime_id = local_id,
89  .ip = ip,
90  .port = port
91  };
92 
93  struct rt_controller_msg_hdr hdr = {
94  .type = RT_CTL_INIT,
95  .payload_size = sizeof(msg)
96  };
97 
98  return send_to_controller(&hdr, &msg);
99 }
100 
105 static int connect_to_controller(struct sockaddr_in *addr) {
106 
107  if (controller_sock != -1) {
108  log_error("Controller socket already initialized");
109  return -1;
110  }
111 
113 
114  if (controller_sock < 0) {
115  log_error("Error connecting to global controller!");
116  return -1;
117  }
118 
119  char ip[INET_ADDRSTRLEN];
120  inet_ntop(AF_INET, &addr->sin_addr, ip, INET_ADDRSTRLEN);
121  int port = ntohs(addr->sin_port);
122 
123  log_info("Connected to global controller at %s:%d", ip, port);
124 
125  int rtn = send_ctl_init_msg();
126  if (rtn < 0) {
127  log_error("Error sending initialization message to controller");
128  return -1;
129  }
130  return controller_sock;
131 }
132 
137 #define CHECK_MSG_SIZE(msg, target) \
138  if (msg->payload_size != sizeof(target)) { \
139  log_warn("Message data size (%d) does not match size" \
140  "of target type (%d)" #target, (int)msg->payload_size , \
141  (int)sizeof(target)); \
142  return -1; \
143  } \
144  return 0;
145 
149 static int verify_msg_size(struct ctrl_runtime_msg_hdr *msg) {
150  switch (msg->type) {
153  case CTRL_CREATE_THREAD:
155  case CTRL_DELETE_THREAD:
157  case CTRL_MODIFY_ROUTE:
158  CHECK_MSG_SIZE(msg, struct ctrl_route_msg);
159  case CTRL_CREATE_MSU:
160  CHECK_MSG_SIZE(msg, struct ctrl_create_msu_msg);
161  case CTRL_DELETE_MSU:
162  CHECK_MSG_SIZE(msg, struct ctrl_delete_msu_msg);
163  case CTRL_MSU_ROUTES:
164  CHECK_MSG_SIZE(msg, struct ctrl_msu_route_msg);
165  default:
166  log_error("Received unknown message type: %d", msg->type);
167  return -1;
168  }
169 }
170 
175  struct sockaddr_in addr = {};
176  addr.sin_family = AF_INET;
177  addr.sin_addr.s_addr = msg->ip;
178  addr.sin_port = htons(msg->port);
179 
180  int rtn = connect_to_runtime_peer(msg->runtime_id, &addr);
181  if (rtn < 0) {
182  log_error("Could not add runtime peer");
183  return -1;
184  }
185  return 0;
186 }
187 
192  int id = msg->thread_id;
193  int rtn = create_worker_thread(id, msg->mode);
194  if (rtn < 0) {
195  log_error("Error creating worker thread %d", id);
196  return -1;
197  }
198  log(LOG_THREAD_CREATION, "Created worker thread %d", id);
199  return 0;
200 }
201 
205 static int process_ctrl_route_msg(struct ctrl_route_msg *msg) {
206  int rtn;
207  log(LOG_CONTROLLER_COMMUNICATION, "Got control route message of type %d", msg->type);
208  switch (msg->type) {
209  case CREATE_ROUTE:
210  rtn = init_route(msg->route_id, msg->type_id);
211  if (rtn < 0) {
212  log_error("Error creating new route of id %d, type %d",
213  msg->route_id, msg->type_id);
214  return 1;
215  }
216  return 0;
217  case ADD_ENDPOINT:;
218  struct msu_endpoint endpoint;
219  int rtn = init_msu_endpoint(msg->msu_id, msg->runtime_id, &endpoint);
220  if (rtn < 0) {
221  log_error("Cannot initilize runtime endpoint for adding "
222  "endpoint %d to route %d", msg->msu_id, msg->route_id);
223  return 1;
224  }
225  rtn = add_route_endpoint(msg->route_id, endpoint, msg->key);
226  if (rtn < 0) {
227  log_error("Error adding endpoint %d to route %d with key %d",
228  msg->msu_id, msg->route_id, msg->key);
229  return 1;
230  }
231  return 0;
232  case DEL_ENDPOINT:
233  rtn = remove_route_endpoint(msg->route_id, msg->msu_id);
234  if (rtn < 0) {
235  log_error("Error removing endpoint %d from route %d",
236  msg->msu_id, msg->route_id);
237  return 1;
238  }
239  return 0;
240  case MOD_ENDPOINT:
241  rtn = modify_route_endpoint(msg->route_id, msg->msu_id, msg->key);
242  if (rtn < 0) {
243  log_error("Error modifying endpoint %d on route %d to have key %d",
244  msg->msu_id, msg->route_id, msg->key);
245  return 1;
246  }
247  return 0;
248  default:
249  log_error("Unknown route control message type received: %d", msg->type);
250  return -1;
251  }
252 }
253 
258  switch (type) {
259  case CTRL_CREATE_MSU:
260  return CREATE_MSU;
261  case CTRL_DELETE_MSU:
262  return DELETE_MSU;
263  case CTRL_MSU_ROUTES:
264  return MSU_ROUTE;
265  default:
266  log_error("Unknown thread message type %d", type);
267  return UNKNOWN_THREAD_MSG;
268  }
269 }
270 
278 static struct thread_msg *thread_msg_from_ctrl_hdr(struct ctrl_runtime_msg_hdr *hdr, int fd) {
279  if (verify_msg_size(hdr) != 0) {
280  log_warn("May not process message. Incorrect payload size for type.");
281  }
282 
283  void *msg_data = malloc(hdr->payload_size);
284  int rtn = read_payload(fd, hdr->payload_size, msg_data);
285  if (rtn < 0) {
286  log_error("Error reading control payload. Cannot process message");
287  free(msg_data);
288  return NULL;
289  }
290  log(LOG_CONTROLLER_COMMUNICATION, "Read control payload %p of size %d",
291  msg_data, (int)hdr->payload_size);
293  struct thread_msg *thread_msg = construct_thread_msg(type, hdr->payload_size, msg_data);
294  thread_msg->ack_id = hdr->id;
295  return thread_msg;
296 }
297 
304 static int pass_ctrl_msg_to_thread(struct ctrl_runtime_msg_hdr *hdr, int fd) {
306  if (thread_msg == NULL) {
307  log_error("Error constructing thread msg from control hdr");
308  return -1;
309  }
310  struct dedos_thread *thread = get_dedos_thread(hdr->thread_id);
311  if (thread == NULL) {
312  log_error("Error getting dedos thread %d to deliver control message",
313  hdr->thread_id);
314  destroy_thread_msg(thread_msg);
315  return -1;
316  }
317 
318  int rtn = enqueue_thread_msg(thread_msg, &thread->queue);
319  if (rtn < 0) {
320  log_error("Error enquing control message on thread %d", hdr->thread_id);
321  return -1;
322  }
323  return 0;
324 }
325 
332 static int process_ctrl_message(struct ctrl_runtime_msg_hdr *hdr, int fd) {
333  if (verify_msg_size(hdr) != 0) {
334  log_warn("May not process message. Incorrect payload size for type");
335  }
336 
337  char msg_data[hdr->payload_size];
338  int rtn = read_payload(fd, hdr->payload_size, (void*)msg_data);
339 
340  if (rtn < 0) {
341  log_error("Error reading control payload. Cannot process message");
342  return -1;
343  }
344  log(LOG_CONTROLLER_COMMUNICATION, "Read control payload %p of size %d",
345  msg_data, (int)hdr->payload_size);
346 
347  switch (hdr->type) {
349  rtn = process_connect_to_runtime((struct ctrl_add_runtime_msg*) msg_data);
350  if (rtn < 0) {
351  log_error("Error processing connect to runtime message");
352  return -1;
353  }
354  break;
355  case CTRL_CREATE_THREAD:
356  rtn = process_create_thread_msg((struct ctrl_create_thread_msg*) msg_data);
357  if (rtn < 0) {
358  log_error("Error processing create thread message");
359  return -1;
360  }
361  break;
362  case CTRL_DELETE_THREAD:
363  log_critical("TODO!");
364  break;
365  case CTRL_MODIFY_ROUTE:
366  rtn = process_ctrl_route_msg((struct ctrl_route_msg*) msg_data);
367  if (rtn < 0) {
368  log_error("Error processing control route message");
369  return -1;
370  }
371  break;
372  default:
373  log_error("Unknown message type delivered to input thread: %d", hdr->type);
374  return -1;
375  }
376  return 0;
377 }
378 
379 //TODO: send_ack_message()
380 int send_ack_message(int id, bool success) {
381  // Not implemented yet
382  return 0;
383 }
384 
386 static int process_ctrl_message_hdr(struct ctrl_runtime_msg_hdr *hdr, int fd) {
387 
388  int rtn;
389  switch (hdr->type) {
390  case CTRL_CREATE_MSU:
391  case CTRL_DELETE_MSU:
392  case CTRL_MSU_ROUTES:
393  rtn = pass_ctrl_msg_to_thread(hdr, fd);
394  if (rtn < 0) {
395  log_error("Error passing control message to thread");
396  return -1;
397  }
398  break;
400  case CTRL_CREATE_THREAD:
401  case CTRL_DELETE_THREAD:
402  case CTRL_MODIFY_ROUTE:
403  rtn = process_ctrl_message(hdr, fd);
404  if (rtn < 0) {
405  log_error("Error processing control message");
406  send_ack_message(hdr->id, false);
407  return -1;
408  }
409  send_ack_message(hdr->id, true);
410  break;
411  default:
412  log_error("Unknown header type %d in receiving thread", hdr->type);
413  return -1;
414  }
415 
416  return 0;
417 }
418 
420  struct ctrl_runtime_msg_hdr hdr;
421  int rtn = read_payload(fd, sizeof(hdr), &hdr);
422  if (rtn< 0) {
423  log_error("Error reading control message");
424  return -1;
425  } else if (rtn == 1) {
426  log_critical("Disconnected from global controller");
427  close(fd);
428  return 1;
429  } else {
430  log(LOG_CONTROLLER_COMMUNICATION,
431  "Read header (type %d) from controller", hdr.type);
432  }
433 
434  rtn = process_ctrl_message_hdr(&hdr, fd);
435  if (rtn < 0) {
436  log_error("Error processing control message");
437  return -1;
438  }
439 
440  return 0;
441 }
442 
443 bool is_controller_fd(int fd) {
444  return fd == controller_sock;
445 }
446 
447 
448 int init_controller_socket(struct sockaddr_in *addr) {
449  int sock = connect_to_controller(addr);
450  if (sock < 0) {
451  log_error("Error connecting to global controller");
452  return -1;
453  }
454  if (monitor_controller_socket(sock) != 0) {
455  log_error("Attempted to initialize controller socket "
456  "before initializing runtime epoll");
457  return -1;
458  }
459  return sock;
460 }
461 
463  if (controller_sock < 0) {
464  log(LOG_STAT_SEND, "Skipping sending statistics: controller not initialized");
465  return -1;
466  }
467  int rtn = 0;
468  int total_items = 0;
469  struct timespec now;
470  clock_gettime(CLOCK_REALTIME_COARSE, &now);
471  for (int i=0; i<N_REPORTED_STAT_TYPES; i++) {
473  int n_items;
474  struct stat_sample *samples = get_stat_samples(stat_id, &now, &n_items);
475  total_items += n_items;
476  if (samples == NULL) {
477  log(LOG_STAT_SEND, "Error getting stat sample for send to controller");
478  continue;
479  }
480  size_t serial_size = serialized_stat_sample_size(samples, n_items);
481 
482  char buffer[serial_size];
483  size_t ser_rtn = serialize_stat_samples(samples, n_items, buffer, serial_size);
484  if (ser_rtn < 0) {
485  log_error("Error serializing stat sample");
486  rtn = -1;
487  }
488 
489  struct rt_controller_msg_hdr hdr = {
490  .type = RT_STATS,
491  .payload_size = ser_rtn
492  };
493 
494  int rtn = send_to_controller(&hdr, buffer);
495  if (rtn < 0) {
496  log_error("Error sending statistics to controller");
497  rtn = -1;
498  }
499  }
500  log(LOG_STAT_SEND, "Sending %d statistics to controller", total_items);
501  return rtn;
502 }
int init_connected_socket(struct sockaddr_in *addr)
Initializes a socket that is connected to a given address.
Definition: communication.c:81
int init_msu_endpoint(int msu_id, int runtime_id, struct msu_endpoint *endpoint)
Initializes an endpoint structure to point to the relevant msu.
Definition: routing.c:483
payload: ctrl_create_thread_msg
Interface for general-purpose socket communication.
int add_route_endpoint(int route_id, struct msu_endpoint endpoint, uint32_t key)
Adds an endpoint to the route with the given ID.
Definition: routing.c:409
size_t serialized_stat_sample_size(struct stat_sample *sample, int n_samples)
Determines the size needed to hold the serialized version of sample.
Definition: stats.c:96
Collecting statistics within the runtime.
int init_route(int route_id, int type_id)
Initializes a new route with the given route_id and type_id.
Definition: routing.c:276
Kept unknown at 0 to catch mis-labeled messages.
ctrl_delete_msu_msg (ctrl_runtime_messages.h)
static int process_ctrl_message(struct ctrl_runtime_msg_hdr *hdr, int fd)
Processes a received control message that is due for delivery to this thread.
uint32_t local_runtime_ip()
Definition: runtime_dfg.c:107
enum stat_id id
Definition: stats.h:63
Messages to be delivered to dedos_threads.
#define log_info(fmt,...)
Definition: logging.h:88
payload: ctrl_msu_route_msg
size_t payload_size
Payload size.
All messages sent from controller to runtime are prefixed with this header.
Payload for messages of type CTRL_DELETE_MSU.
#define log_critical(fmt,...)
Definition: logging.h:124
uint32_t ip
Local IP address with which the runtime listens for other runtimes.
thread_msg_type
All messages that can be received by output thread or workers.
payload: ctrl_add_runtime_msg
static int verify_msg_size(struct ctrl_runtime_msg_hdr *msg)
Checks whether the size of a message matches the size of its target struct.
ssize_t serialize_stat_samples(struct stat_sample *samples, int n_samples, void *buffer, size_t buff_len)
Serializes from the provided samples into the buffer
Definition: stats.c:122
A single stat sample for a single item.
Definition: stats.h:53
int send_ack_message(int id, bool success)
WILL Send an acknoweledgement of success for a specific message.
ctrl_runtime_msg_type
The various top-level types of messages which can be sent from the controller to runtimes.
int read_payload(int fd, size_t size, void *buff)
Reads a buffer of a given size from a file descriptor.
Definition: communication.c:37
uint32_t ip
ip address of the runtime to connect to.
enum blocking_mode mode
The mode of the thread.
Monitors an incoming port for messages from runtime or controller.
stat_id
The identifiers with which stats can be logged.
Definition: stat_ids.h:32
Payload for messages of type CTRL_MODIFY_ROUTE.
int send_to_controller(struct rt_controller_msg_hdr *hdr, void *payload)
Sends a message to the global controller.
int route_id
Route to which the message applies.
payload: ctrl_create_msu_msg
Logging of status messages to the terminal.
static struct stat_type_label reported_stat_types[]
Static structure so the reported stat types can be referenced as an array.
Definition: stats.h:105
int enqueue_thread_msg(struct thread_msg *thread_msg, struct msg_queue *queue)
Enqueues a dedos_msg with a thread_msg as the payload to the appropriate queue.
Threads that hold MSUs.
static int controller_sock
Static (global) variable to hold the socket connecting to the global controller.
void destroy_thread_msg(struct thread_msg *msg)
Frees a thread message.
size_t payload_size
Payload will be serialized following this struct.
int init_controller_socket(struct sockaddr_in *addr)
Initilizes a connection with the global controller located at the provided address.
Interactions with global dfg from individual runtime.
enum thread_msg_type type
Payload for messages of type CTRL_CREATE_MSU.
Payload: output of serialize_stat_samples.
Control spawned threads with message queue within DeDOS.
Header for all messages from controller to runtime.
struct thread_msg * construct_thread_msg(enum thread_msg_type type, ssize_t data_size, void *data)
Allocates and initializes a thread message with the provided options.
#define log_error(fmt,...)
Definition: logging.h:101
int thread_id
The ID to give to the created thread.
A message to be delivered to a dedos_thread.
Payload for messages of type CTRL_CONNECT_TO_RUNTIME.
Payload type: rt_controller_init_msg.
int id
Id for confirmation message (not implemented)
Payload for messages of type CTRL_CREATE_THREAD.
int monitor_controller_socket(int new_fd)
Adds the global controller to be monitored by the socket monitor.
static int process_ctrl_message_hdr(struct ctrl_runtime_msg_hdr *hdr, int fd)
Processes any received control message.
int local_runtime_id()
Definition: runtime_dfg.c:91
int send_stats_to_controller()
Samples the relevant statistics and sends them to the controller.
A dedos_thread which monitors a queue for output to be sent to other runtimes or the global controlle...
#define N_REPORTED_STAT_TYPES
Number of reported stat types.
Definition: stats.h:109
static int process_ctrl_route_msg(struct ctrl_route_msg *msg)
Processes a received ctrl_route_msg.
payload: ctrl_route_msg
Payload for messages of type CTRL_MSU_ROUTES.
enum rt_controller_msg_type type
Type of payload attached.
struct dedos_thread * get_dedos_thread(int id)
Returns the dedos_thread with the given ID.
Definition: dedos_threads.c:72
static int send_ctl_init_msg()
Sends the initilization message containing runtime ID, ip and port to global controller.
Initialization message, sent to controller to identify runtime upon first connection.
int remove_route_endpoint(int route_id, int msu_id)
Removes destination from route with given ID.
Definition: routing.c:419
static enum thread_msg_type get_thread_msg_type(enum ctrl_runtime_msg_type type)
Gets the corresponding thread_msg_type for a ctrl_runtime_msg_type.
Adds an endpoint to a route.
int modify_route_endpoint(int route_id, int msu_id, uint32_t new_key)
Modifies key associated with MSU in route.
Definition: routing.c:434
ctrl_create_msu_msg (ctrl_runtime_messages.h)
payload: ctrl_delete_thread_msg
int msu_id
ID of MSU to add/delete/modify.
bool is_controller_fd(int fd)
Checks if fd is file descriptor for controller.
int connect_to_runtime_peer(unsigned int id, struct sockaddr_in *addr)
Innitiates a connection to a runtime peer with the given ID at the given address. ...
int thread_id
ID of the Thread to which the message is to be delivered.
#define CHECK_MSG_SIZE(msg, target)
Macro to check whether the size of a message matches the size of the struct it's supposed to be...
unsigned int runtime_id
Unique identifier for the runtime.
Deletes an endpoint from a route.
static struct thread_msg * thread_msg_from_ctrl_hdr(struct ctrl_runtime_msg_hdr *hdr, int fd)
Constructs a thread message from a ctrl_runtime_msg_hdr, reading any additional information it needs ...
int type_id
MSU Type of the route.
ctrl_msu_route_msg (ctrl_runtime_messages.h)
static int process_create_thread_msg(struct ctrl_create_thread_msg *msg)
Processes a received ctrl_create_thread_msg.
int ack_id
for sending acknowledgements to controller.
static int pass_ctrl_msg_to_thread(struct ctrl_runtime_msg_hdr *hdr, int fd)
Constructs a thread_msg from a control-runtime message and passes it to the relevant thread...
enum ctrl_runtime_msg_type type
Identifies the type of payload that follows.
enum ctrl_route_msg_type type
sub-type of message
Modifies the key corresponding to a route endpoint.
int local_runtime_port()
Definition: runtime_dfg.c:99
Creates a new route.
struct stat_sample * get_stat_samples(enum stat_id stat_id, struct timespec *sample_time, int *n_samples_out)
Samples the statistic with the provided stat_id.
Definition: rt_stats.c:634
unsigned int uint32_t
Definition: uthash.h:96
#define log(level, fmt,...)
Log at a custom level.
Definition: logging.h:147
int create_worker_thread(unsigned int thread_id, enum blocking_mode mode)
Starts a new worker thread with the given thread ID and pinned/unpinned status.
int handle_controller_communication(int fd)
Reads and processes a controller message off of the provided file descriptor.
ssize_t send_to_endpoint(int fd, void *data, size_t data_len)
Writes a buffer of a given size to a file descriptor.
Definition: communication.c:66
struct msg_queue queue
Queue for incoming message.
Definition: dedos_threads.h:43
int port
port of the runtime to connect to
An endpoint to which an msu_msg can be delivered.
Definition: routing.h:32
#define log_warn(fmt,...)
Definition: logging.h:113
static int connect_to_controller(struct sockaddr_in *addr)
Initializes a connection to the global controller.
payload: ctrl_delete_msu_msg
static int process_connect_to_runtime(struct ctrl_add_runtime_msg *msg)
Processes a received ctrl_add_runtime_msg.
Structure representing any thread within DeDOS.
Definition: dedos_threads.h:35
unsigned int runtime_id
ID of the runtime to connect to.
Communication with global controller from runtime.