32 #include <sys/epoll.h>
42 #define MAX_RUNTIME_ID 32
64 log_error(
"Requested runtime %d is greater than max runtime ID %d",
69 if (endpoint->
fd <= 0) {
70 log_error(
"Requested runtime %d not instantiated (fd: %d)", runtime_id, endpoint->
fd);
76 log_error(
"Error sending header to runtime %d", runtime_id);
86 log_error(
"Error sending payload to runtime %d", runtime_id);
89 log(LOG_RUNTIME_SENDS,
"Sent a payload of size %d to runtime %d (fd: %d)",
105 .payload_size =
sizeof(msg)
110 log_error(
"Error sending initialization message for rt %d to rt %d",
111 new_rt_id, target_id);
113 log(LOG_RT_COMMUNICATION,
"Send initialization message for rt %d to rt %d",
114 new_rt_id, target_id);
132 if (fstat(fd, &buf) != 0) {
133 log_error(
"Cannot register non-descriptor %d for runtime ID %d", fd, runtime_id);
137 log_warn(
"Replacing runtime peer with id %d", runtime_id);
145 log_error(
"Failed to add runtime %d to runtime %d (fd: %d)",
158 if (payload_size !=
sizeof(msg)) {
159 log_error(
"Payload size does not match size of runtime initialization message");
164 log_error(
"Error reading runtime init payload. Cannot process message");
170 log_error(
"Error adding runtime endpoint");
179 char buffer[payload_size];
182 log_error(
"Error reading stats message payload");
187 log_error(
"Cannot get runtime ID from file descriptor");
199 log_error(
"Erorr processing init runtime message from fd %d", fd);
206 log_error(
"Error processing rt stats message from fd %d", fd);
211 log_error(
"Received unknown message type from fd %d: %d", fd, hdr->
type);
216 #define MAX_OUTPUT_SOCKS 2
223 if (output_socks[output_sock_idx] > 0) {
224 close(output_socks[output_sock_idx]);
235 if (fd == output_listen_sock) {
237 int new_fd = accept(fd, NULL, 0);
240 log_info(
"Added new output listener");
250 log_error(
"Error reading runtime message header from fd %d", fd);
252 }
else if (rtn == 1) {
255 log_error(
"Error shutting down socket %d", fd);
257 log_info(
"Runtime %d has been shut down by peer. Removing",
id);
261 log(LOG_RT_COMMUNICATION,
262 "received header from runtime");
267 log_error(
"Error processing rt message");
275 if (listen_sock < 0) {
276 log_error(
"Error initializing listening socket on port %d", port);
282 FD_SET(listen_sock, &fdset);
286 log_info(
"Waiting for DFG reader to connect on port %d!", port);
287 struct timeval timeout = {.tv_sec = 1};
288 rtn = select(1, &fdset, NULL, NULL, &timeout);
291 int output_sock = accept(listen_sock, NULL, 0);
292 if (output_sock < 0) {
293 log_error(
"Error accepting output listener!");
304 signal(SIGPIPE, SIG_IGN);
306 if (listen_sock > 0) {
307 log_error(
"Communication loop already started");
311 if (listen_sock < 0) {
312 log_error(
"Error initializing listening socket on port %d", listen_port);
315 log_info(
"Starting listening for runtimes on port %d", listen_port);
320 log_error(
"Error initializing controller epoll. Closing socket");
325 if (output_port > 0) {
327 if (output_listen_sock < 0) {
328 log_error(
"Error listening on port %d", output_port);
331 log_info(
"Listening for DFG reader on port %d", output_port);
332 add_to_epoll(epoll_fd, output_listen_sock, EPOLLIN,
false);
335 struct timespec begin;
336 clock_gettime(CLOCK_REALTIME_COARSE, &begin);
338 struct timespec elapsed;
341 rtn =
epoll_loop(listen_sock, epoll_fd, 1, 1000, 0,
344 log_error(
"Epoll loop exited with error");
347 clock_gettime(CLOCK_REALTIME_COARSE, &elapsed);
348 if (((elapsed.tv_sec - begin.tv_sec) * 1000 + (elapsed.tv_nsec - begin.tv_nsec) * 1e-6) >
STAT_SAMPLE_PERIOD_MS) {
349 if (output_file != NULL) {
353 if (output_socks[i] > 0) {
355 close(output_socks[i]);
360 clock_gettime(CLOCK_REALTIME_COARSE, &begin);
static int process_rt_message_hdr(struct rt_controller_msg_hdr *hdr, int fd)
static int output_listen_sock
int send_to_runtime(unsigned int runtime_id, struct ctrl_runtime_msg_hdr *hdr, void *payload)
Interface for general-purpose socket communication.
static int add_runtime_endpoint(unsigned int runtime_id, int fd, uint32_t ip, int port)
Macro for declaring functions or variables as unused to avoid compiler warnings.
#define log_info(fmt,...)
size_t payload_size
Payload size.
All messages sent from controller to runtime are prefixed with this header.
Wrapper functions for epoll to manage event-based communication.
int epoll_fd
The epoll file descriptor monitoring sockets.
static struct runtime_endpoint runtime_endpoints[32]
uint32_t ip
Local IP address with which the runtime listens for other runtimes.
payload: ctrl_add_runtime_msg
int handle_serialized_stats_buffer(int runtime_id, void *buffer, size_t buffer_len)
int read_payload(int fd, size_t size, void *buff)
Reads a buffer of a given size from a file descriptor.
uint32_t ip
ip address of the runtime to connect to.
#define STAT_SAMPLE_PERIOD_MS
How often samples are sent from runtime to controller.
static int process_rt_stats_message(size_t payload_size, int fd)
static int output_socks[2]
int runtime_communication_loop(int listen_port, char *output_file, int output_port)
Logging of status messages to the terminal.
size_t payload_size
Payload will be serialized following this struct.
Functions for the sending and receiving of statistics between ctrl and runtime.
static int output_sock_idx
int handle_runtime_communication(int fd)
Reads a message off of the provided file descriptor as if it is coming from a runtime peer...
int add_to_epoll(int epoll_fd, int new_fd, uint32_t events, bool oneshot)
Adds a file descriptor to an epoll instance.
Payload: output of serialize_stat_samples.
static int send_add_runtime_msg(unsigned int target_id, int new_rt_id, uint32_t ip, int port)
Header for all messages from controller to runtime.
int init_epoll(int socket_fd)
Initializes a new instance of an epoll file descriptor and adds a socket to it, listening for input o...
#define log_error(fmt,...)
Payload for messages of type CTRL_CONNECT_TO_RUNTIME.
Payload type: rt_controller_init_msg.
static int process_rt_init_message(ssize_t payload_size, int fd)
int get_output_listener(int port)
static void add_output_sock(int fd)
enum rt_controller_msg_type type
Type of payload attached.
void dfg_to_file(char *filename)
int port
Port on which the runtime listens for other runtimes.
Initialization message, sent to controller to identify runtime upon first connection.
int epoll_loop(int socket_fd, int epoll_fd, int batch_size, int timeout, bool oneshot, int(*connection_handler)(int, void *), int(*accept_handler)(int, void *), void *data)
The event-based loop for epoll_wait.
int runtime_fd(unsigned int runtime_id)
void set_haproxy_weights(int rt_id, int offset)
Definitiions of structures for sending messages from runtimes to controller.
unsigned int runtime_id
Unique identifier for the runtime.
enum ctrl_runtime_msg_type type
Identifies the type of payload that follows.
int init_listening_socket(int port)
Initializes a socket which is bound to and listening on the given port.
static int runtime_id(int runtime_fd)
static int remove_runtime_endpoint(int fd)
#define log(level, fmt,...)
Log at a custom level.
ssize_t send_to_endpoint(int fd, void *data, size_t data_len)
Writes a buffer of a given size to a file descriptor.
#define log_warn(fmt,...)
unsigned int runtime_id
ID of the runtime to connect to.