My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
runtime_communication.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
20 #include "runtime_communication.h"
21 #include "communication.h"
22 #include "logging.h"
23 #include "rt_controller_messages.h"
24 #include "stat_msg_handler.h"
25 #include "epollops.h"
26 #include "stats.h"
27 #include "unused_def.h"
28 #include "dfg_writer.h"
29 #include "haproxy.h"
30 
31 #include <signal.h>
32 #include <sys/epoll.h>
33 #include <unistd.h>
34 #include <sys/stat.h>
35 
37  int fd;
39  int port;
40 };
41 
42 #define MAX_RUNTIME_ID 32
43 
45 
46 int runtime_fd(unsigned int runtime_id) {
47  if (runtime_endpoints[runtime_id].fd > 0) {
49  }
50  return 0;
51 }
52 
53 static int runtime_id(int runtime_fd) {
54  for (int i=0; i < MAX_RUNTIME_ID; i++) {
55  if (runtime_endpoints[i].fd == runtime_fd) {
56  return i;
57  }
58  }
59  return -1;
60 }
61 
62 int send_to_runtime(unsigned int runtime_id, struct ctrl_runtime_msg_hdr *hdr, void *payload) {
63  if (runtime_id > MAX_RUNTIME_ID) {
64  log_error("Requested runtime %d is greater than max runtime ID %d",
65  runtime_id, MAX_RUNTIME_ID);
66  return -1;
67  }
68  struct runtime_endpoint *endpoint = &runtime_endpoints[runtime_id];
69  if (endpoint->fd <= 0) {
70  log_error("Requested runtime %d not instantiated (fd: %d)", runtime_id, endpoint->fd);
71  return -1;
72  }
73 
74  int rtn = send_to_endpoint(endpoint->fd, hdr, sizeof(*hdr));
75  if (rtn <= 0) {
76  log_error("Error sending header to runtime %d", runtime_id);
77  return -1;
78  }
79 
80  if (hdr->payload_size <= 0) {
81  return 0;
82  }
83 
84  rtn = send_to_endpoint(endpoint->fd, payload, hdr->payload_size);
85  if (rtn <= 0) {
86  log_error("Error sending payload to runtime %d", runtime_id);
87  return -1;
88  }
89  log(LOG_RUNTIME_SENDS, "Sent a payload of size %d to runtime %d (fd: %d)",
90  (int)hdr->payload_size, runtime_id, endpoint->fd);
91  return 0;
92 }
93 
94 static int send_add_runtime_msg(unsigned int target_id, int new_rt_id,
95  uint32_t ip, int port) {
96  struct ctrl_add_runtime_msg msg = {
97  .runtime_id = new_rt_id,
98  .ip = ip,
99  .port = port
100  };
101 
102  struct ctrl_runtime_msg_hdr hdr = {
104  .thread_id = 0,
105  .payload_size = sizeof(msg)
106  };
107 
108  int rtn = send_to_runtime(target_id, &hdr, &msg);
109  if (rtn < 0) {
110  log_error("Error sending initialization message for rt %d to rt %d",
111  new_rt_id, target_id);
112  } else {
113  log(LOG_RT_COMMUNICATION, "Send initialization message for rt %d to rt %d",
114  new_rt_id, target_id);
115  }
116  return 0;
117 }
118 
119 static int remove_runtime_endpoint(int fd) {
120  for (int i=0; i<MAX_RUNTIME_ID; i++) {
121  if (runtime_endpoints[i].fd == fd) {
122  close(runtime_endpoints[i].fd);
123  runtime_endpoints[i].fd = 0;
124  return i;
125  }
126  }
127  return -1;
128 }
129 
130 static int add_runtime_endpoint(unsigned int runtime_id, int fd, uint32_t ip, int port) {
131  struct stat buf;
132  if (fstat(fd, &buf) != 0) {
133  log_error("Cannot register non-descriptor %d for runtime ID %d", fd, runtime_id);
134  return -1;
135  }
136  if (runtime_endpoints[runtime_id].fd != 0) {
137  log_warn("Replacing runtime peer with id %d", runtime_id);
138  }
140 
141  int n_sent = 0;
142  for (int i=0; i<MAX_RUNTIME_ID; i++) {
143  if (i != runtime_id && runtime_endpoints[i].fd != 0) {
144  if (send_add_runtime_msg(i, runtime_id, ip, port) != 0) {
145  log_error("Failed to add runtime %d to runtime %d (fd: %d)",
146  runtime_id, i, runtime_endpoints[runtime_id].fd);
147  } else {
148  n_sent++;
149  }
150  }
151  }
152  set_haproxy_weights(0,0);
153  return 0;
154 }
155 
156 static int process_rt_init_message(ssize_t payload_size, int fd) {
157  struct rt_controller_init_msg msg;
158  if (payload_size != sizeof(msg)) {
159  log_error("Payload size does not match size of runtime initialization message");
160  return -1;
161  }
162  int rtn = read_payload(fd, sizeof(msg), &msg);
163  if (rtn < 0) {
164  log_error("Error reading runtime init payload. Cannot process message");
165  return -1;
166  }
167 
168  rtn = add_runtime_endpoint(msg.runtime_id, fd, msg.ip, msg.port);
169  if (rtn < 0) {
170  log_error("Error adding runtime endpoint");
171  return -1;
172  }
173  log_info("Added runtime endpoint %d (fd: %d)", msg.runtime_id, fd);
174  return 0;
175 }
176 
177 static int process_rt_stats_message(size_t payload_size, int fd) {
178 
179  char buffer[payload_size];
180  int rtn = read_payload(fd, sizeof(buffer), &buffer);
181  if (rtn < 0) {
182  log_error("Error reading stats message payload");
183  return -1;
184  }
185  int id = runtime_id(fd);
186  if (id < 0) {
187  log_error("Cannot get runtime ID from file descriptor");
188  return -1;
189  }
190  return handle_serialized_stats_buffer(id, buffer, payload_size);
191 }
192 
193 static int process_rt_message_hdr(struct rt_controller_msg_hdr *hdr, int fd) {
194  int rtn;
195  switch (hdr->type) {
196  case RT_CTL_INIT:
197  rtn = process_rt_init_message(hdr->payload_size, fd);
198  if (rtn < 0) {
199  log_error("Erorr processing init runtime message from fd %d", fd);
200  return -1;
201  }
202  return 0;
203  case RT_STATS:
204  rtn = process_rt_stats_message(hdr->payload_size, fd);
205  if (rtn < 0) {
206  log_error("Error processing rt stats message from fd %d", fd);
207  return -1;
208  }
209  return 0;
210  default:
211  log_error("Received unknown message type from fd %d: %d", fd, hdr->type);
212  return -1;
213  }
214 }
215 
216 #define MAX_OUTPUT_SOCKS 2
217 
218 static int output_listen_sock = -1;
220 static int output_sock_idx = 0;
221 
222 static void add_output_sock(int fd) {
223  if (output_socks[output_sock_idx] > 0) {
224  close(output_socks[output_sock_idx]);
225  }
226  output_socks[output_sock_idx] = fd;
227  output_sock_idx++;
228  output_sock_idx %= MAX_OUTPUT_SOCKS;
229 }
230 
231 
232 static int handle_runtime_communication(int fd, void UNUSED *data) {
233  struct rt_controller_msg_hdr hdr;
234 
235  if (fd == output_listen_sock) {
236  log_info("Adding new output port");
237  int new_fd = accept(fd, NULL, 0);
238  if (new_fd > 0) {
239  add_output_sock(new_fd);
240  log_info("Added new output listener");
241  return 0;
242  } else {
243  log_error("Error adding new listener");
244  return 0;
245  }
246  }
247 
248  int rtn = read_payload(fd, sizeof(hdr), &hdr);
249  if (rtn < 0) {
250  log_error("Error reading runtime message header from fd %d", fd);
251  return 1;
252  } else if (rtn == 1) {
253  int id = remove_runtime_endpoint(fd);
254  if (id < 0) {
255  log_error("Error shutting down socket %d", fd);
256  } else {
257  log_info("Runtime %d has been shut down by peer. Removing", id);
258  }
259  return 0;
260  } else {
261  log(LOG_RT_COMMUNICATION,
262  "received header from runtime");
263  }
264 
265  rtn = process_rt_message_hdr(&hdr, fd);
266  if (rtn < 0) {
267  log_error("Error processing rt message");
268  return 0;
269  }
270  return 0;
271 }
272 
273 int get_output_listener(int port) {
275  if (listen_sock < 0) {
276  log_error("Error initializing listening socket on port %d", port);
277  return -1;
278  }
279 
280  fd_set fdset;
281  FD_ZERO(&fdset);
282  FD_SET(listen_sock, &fdset);
283 
284  int rtn = -1;
285  do {
286  log_info("Waiting for DFG reader to connect on port %d!", port);
287  struct timeval timeout = {.tv_sec = 1};
288  rtn = select(1, &fdset, NULL, NULL, &timeout);
289  } while (rtn < 1);
290 
291  int output_sock = accept(listen_sock, NULL, 0);
292  if (output_sock < 0) {
293  log_error("Error accepting output listener!");
294  return -1;
295  }
296  close(listen_sock);
297  return output_sock;
298 }
299 
300 static int listen_sock = -1;
301 
302 int runtime_communication_loop(int listen_port, char *output_file, int output_port) {
303 
304  signal(SIGPIPE, SIG_IGN);
305 
306  if (listen_sock > 0) {
307  log_error("Communication loop already started");
308  return -1;
309  }
310  listen_sock = init_listening_socket(listen_port);
311  if (listen_sock < 0) {
312  log_error("Error initializing listening socket on port %d", listen_port);
313  return -1;
314  }
315  log_info("Starting listening for runtimes on port %d", listen_port);
316 
317  int epoll_fd = init_epoll(listen_sock);
318 
319  if (epoll_fd < 0) {
320  log_error("Error initializing controller epoll. Closing socket");
321  close(listen_sock);
322  return -1;
323  }
324 
325  if (output_port > 0) {
326  output_listen_sock = init_listening_socket(output_port);
327  if (output_listen_sock < 0) {
328  log_error("Error listening on port %d", output_port);
329  return -1;
330  }
331  log_info("Listening for DFG reader on port %d", output_port);
332  add_to_epoll(epoll_fd, output_listen_sock, EPOLLIN, false);
333  }
334 
335  struct timespec begin;
336  clock_gettime(CLOCK_REALTIME_COARSE, &begin);
337 
338  struct timespec elapsed;
339  int rtn = 0;
340  while (rtn == 0) {
341  rtn = epoll_loop(listen_sock, epoll_fd, 1, 1000, 0,
342  handle_runtime_communication, NULL, NULL);
343  if (rtn < 0) {
344  log_error("Epoll loop exited with error");
345  return -1;
346  }
347  clock_gettime(CLOCK_REALTIME_COARSE, &elapsed);
348  if (((elapsed.tv_sec - begin.tv_sec) * 1000 + (elapsed.tv_nsec - begin.tv_nsec) * 1e-6) > STAT_SAMPLE_PERIOD_MS) {
349  if (output_file != NULL) {
350  dfg_to_file(output_file);
351  }
352  for (int i=0; i < MAX_OUTPUT_SOCKS; i++) {
353  if (output_socks[i] > 0) {
354  if (dfg_to_fd(output_socks[i]) < 0) {
355  close(output_socks[i]);
356  output_socks[i] = 0;
357  }
358  }
359  }
360  clock_gettime(CLOCK_REALTIME_COARSE, &begin);
361  }
362  }
363  log_info("Epoll loop exited");
364  return 0;
365 }
static int process_rt_message_hdr(struct rt_controller_msg_hdr *hdr, int fd)
static int output_listen_sock
int send_to_runtime(unsigned int runtime_id, struct ctrl_runtime_msg_hdr *hdr, void *payload)
Interface for general-purpose socket communication.
static int add_runtime_endpoint(unsigned int runtime_id, int fd, uint32_t ip, int port)
Macro for declaring functions or variables as unused to avoid compiler warnings.
#define log_info(fmt,...)
Definition: logging.h:88
size_t payload_size
Payload size.
All messages sent from controller to runtime are prefixed with this header.
Wrapper functions for epoll to manage event-based communication.
int epoll_fd
The epoll file descriptor monitoring sockets.
static struct runtime_endpoint runtime_endpoints[32]
uint32_t ip
Local IP address with which the runtime listens for other runtimes.
payload: ctrl_add_runtime_msg
int handle_serialized_stats_buffer(int runtime_id, void *buffer, size_t buffer_len)
int read_payload(int fd, size_t size, void *buff)
Reads a buffer of a given size from a file descriptor.
Definition: communication.c:37
uint32_t ip
ip address of the runtime to connect to.
#define STAT_SAMPLE_PERIOD_MS
How often samples are sent from runtime to controller.
Definition: stats.h:132
static int process_rt_stats_message(size_t payload_size, int fd)
#define MAX_RUNTIME_ID
static int output_socks[2]
int runtime_communication_loop(int listen_port, char *output_file, int output_port)
Logging of status messages to the terminal.
size_t payload_size
Payload will be serialized following this struct.
Functions for the sending and receiving of statistics between ctrl and runtime.
static int output_sock_idx
int handle_runtime_communication(int fd)
Reads a message off of the provided file descriptor as if it is coming from a runtime peer...
int add_to_epoll(int epoll_fd, int new_fd, uint32_t events, bool oneshot)
Adds a file descriptor to an epoll instance.
Definition: epollops.c:59
Payload: output of serialize_stat_samples.
static int send_add_runtime_msg(unsigned int target_id, int new_rt_id, uint32_t ip, int port)
Header for all messages from controller to runtime.
int init_epoll(int socket_fd)
Initializes a new instance of an epoll file descriptor and adds a socket to it, listening for input o...
Definition: epollops.c:182
#define log_error(fmt,...)
Definition: logging.h:101
Payload for messages of type CTRL_CONNECT_TO_RUNTIME.
Payload type: rt_controller_init_msg.
static int process_rt_init_message(ssize_t payload_size, int fd)
int get_output_listener(int port)
int dfg_to_fd(int fd)
Definition: dfg_writer.c:429
static void add_output_sock(int fd)
enum rt_controller_msg_type type
Type of payload attached.
void dfg_to_file(char *filename)
Definition: dfg_writer.c:413
int port
Port on which the runtime listens for other runtimes.
Initialization message, sent to controller to identify runtime upon first connection.
int epoll_loop(int socket_fd, int epoll_fd, int batch_size, int timeout, bool oneshot, int(*connection_handler)(int, void *), int(*accept_handler)(int, void *), void *data)
The event-based loop for epoll_wait.
Definition: epollops.c:132
int runtime_fd(unsigned int runtime_id)
#define UNUSED
#define MAX_OUTPUT_SOCKS
void set_haproxy_weights(int rt_id, int offset)
Definition: haproxy.c:102
Definitiions of structures for sending messages from runtimes to controller.
unsigned int runtime_id
Unique identifier for the runtime.
enum ctrl_runtime_msg_type type
Identifies the type of payload that follows.
int init_listening_socket(int port)
Initializes a socket which is bound to and listening on the given port.
static int runtime_id(int runtime_fd)
unsigned int uint32_t
Definition: uthash.h:96
static int remove_runtime_endpoint(int fd)
#define log(level, fmt,...)
Log at a custom level.
Definition: logging.h:147
ssize_t send_to_endpoint(int fd, void *data, size_t data_len)
Writes a buffer of a given size to a file descriptor.
Definition: communication.c:66
static int listen_sock
#define log_warn(fmt,...)
Definition: logging.h:113
unsigned int runtime_id
ID of the runtime to connect to.