My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
runtime_communication.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
24 #include "runtime_communication.h"
25 #include "logging.h"
26 #include "communication.h"
27 #include "msu_message.h"
28 #include "inter_runtime_messages.h"
29 #include "local_msu.h"
30 #include "runtime_dfg.h"
31 #include "thread_message.h"
32 #include "rt_stats.h"
33 #include "socket_monitor.h"
34 
35 #include <sys/stat.h>
36 #include <sys/types.h>
37 #include <sys/socket.h>
38 #include <netinet/tcp.h>
39 #include <stdlib.h>
40 #include <unistd.h>
41 
46 static int runtime_sock = -1;
47 
49 struct runtime_peer {
50  int fd;
51  // ???: struct dfg_runtime rt
52  // ???: uint32_t ip_address
53 };
54 
56 #define MAX_RUNTIME_ID 32
57 
63 
64 int send_to_peer(unsigned int runtime_id, struct inter_runtime_msg_hdr *hdr, void *payload) {
65  if (runtime_id > MAX_RUNTIME_ID) {
66  log_error("Requested peer %d is greater than max runtime ID %d",
67  runtime_id, MAX_RUNTIME_ID);
68  return -1;
69  }
70  struct runtime_peer *peer = &runtime_peers[runtime_id];
71  if (peer->fd <= 0) {
72  log_error("Requested peer %d not instantiated", runtime_id);
73  return -1;
74  }
75 
76  int rtn = send_to_endpoint(peer->fd, hdr, sizeof(*hdr));
77  if (rtn <= 0) {
78  log_error("Error sending header to runtime %d", runtime_id);
79  return -1;
80  }
81 
82  if (hdr->payload_size <= 0) {
83  return 0;
84  }
85 
86  rtn = send_to_endpoint(peer->fd, payload, hdr->payload_size);
87  if (rtn <= 0) {
88  log_error("Error sending payload to runtime %d", runtime_id);
89  return -1;
90  }
91  log(LOG_RUNTIME_COMMUNICATION, "Send a payload of size %d (type %d) to runtime %d (fd: %d)",
92  (int)hdr->payload_size, hdr->type, runtime_id, peer->fd);
93  return 0;
94 }
95 
96 
97 int add_runtime_peer(unsigned int runtime_id, int fd) {
98  struct stat buf;
99  if (fstat(fd, &buf) != 0) {
100  log_error("Cannot register non-descriptor %d for runtime ID %d", fd, runtime_id);
101  return -1;
102  }
103  if (runtime_id > MAX_RUNTIME_ID) {
104  log_error("Runtime ID %d too high!", runtime_id);
105  return -1;
106  }
107  if (runtime_peers[runtime_id].fd != 0) {
108  log_warn("Replacing runtime peer with id %d", runtime_id);
109  }
111 
112  int val = 1;
113  int rtn = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
114  if (rtn < 0) {
115  log_perror("Error setting TCP_NODELAY");
116  }
117 
118  log_info("Added runtime peer %d (fd: %d)", runtime_id, fd);
119  return 0;
120 }
121 
125 static int send_init_msg(int id) {
126  int local_id = local_runtime_id();
127  if (local_id < 0) {
128  log_error("Could not send local runtime ID to remote runtime %d", id);
129  return -1;
130  }
131 
132  struct inter_runtime_init_msg msg = {
133  .origin_id = local_id
134  };
135 
136  struct inter_runtime_msg_hdr hdr = {
137  .type = INTER_RT_INIT,
138  .target = 0,
139  .payload_size = sizeof(msg)
140  };
141 
142  int rtn = send_to_peer(id, &hdr, &msg);
143  if (rtn < 0) {
144  log_error("Could not send initial connection message to peer runtime %d", id);
145  return -1;
146  }
147  return 0;
148 }
149 
150 int connect_to_runtime_peer(unsigned int id, struct sockaddr_in *addr){
151  if (runtime_peers[id].fd != 0) {
152  log_warn("Attempting to replace runtime peer with id %d", id);
153  }
154  int fd = init_connected_socket(addr);
155  if (fd < 0) {
156  log_error("Could not connect to runtime %u", id);
157  return -1;
158  }
159  runtime_peers[id].fd = fd;
160 
161  int val = 1;
162  int rtn = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
163  if (rtn < 0) {
164  log_perror("Error setting TCP_NODELAY");
165  }
166 
167  if (send_init_msg(id) != 0) {
168  log_error("Failed to send initialization message to runtime %d (fd: %d)", id, fd);
169  close(fd);
170  return -1;
171  }
173  log_info("Connected to runtime peer %d (fd: %d)", id, fd);
174  return 0;
175 }
176 
177 int init_runtime_socket(int listen_port) {
178  if (runtime_sock > 0) {
179  log_error("Runtime socket already initialized");
180  return -1;
181  }
182  int sock = init_listening_socket(listen_port);
183  if (sock < 0) {
184  log_error("Error initializing runtime socket");
185  return -1;
186  }
187  return sock;
188 }
189 
193 static int read_runtime_message_hdr(int fd, struct inter_runtime_msg_hdr *msg) {
194  if (read_payload(fd, sizeof(*msg), msg) != 0) {
195  log_error("Could not read runtime message header from socket %d", fd);
196  return -1;
197  }
198  return 0;
199 }
200 
201 /*
202  * Processes a fwd_to_msu message which has just been received from another runtime
203  */
204 static int process_fwd_to_msu_message(size_t payload_size, int msu_id, int fd) {
205 
206  struct local_msu *msu = get_local_msu(msu_id);
207  if (msu == NULL) {
208  log_error("Error getting MSU with ID %d, requested from runtime fd %d",
209  msu_id, fd);
210  // Read the payload anyway just so it doesn't mess future things up
211  void *unused = malloc(payload_size);
212  read_payload(fd, payload_size, unused);
213  free(unused);
214  return -1;
215  }
216 
217  log(LOG_RUNTIME_CONNECTION, "Attempting to read MSU message");
218  struct msu_msg *msu_msg = read_msu_msg(msu, fd, payload_size);
219  if (msu_msg == NULL) {
220  log_error("Error reading MSU msg off of fd %d", fd);
221  return -1;
222  }
223 
224  int rtn = enqueue_msu_msg(&msu->queue, msu_msg);
225  if (rtn < 0) {
226  log_error("Error enqueuing inter-msu message to MSU %d from runtime fd %d",
227  msu_id, fd);
229  return -1;
230  }
231  return 0;
232 }
233 
237 static int process_init_rt_message(size_t payload_size, int fd) {
238 
239  if (payload_size != sizeof(struct inter_runtime_init_msg)) {
240  log_warn("Payload size of runtime initialization message does not match init_msg");
241  }
242  struct inter_runtime_init_msg msg;
243  if (read_payload(fd, sizeof(msg), &msg) != 0) {
244  log_error("Error reading inter_runtime_init_message from fd %d", fd);
245  return -1;
246  }
247 
248  int rtn = add_runtime_peer(msg.origin_id, fd);
249  if (rtn < 0) {
250  log_error("Could not add runtime peer %d (fd: %d)", msg.origin_id, fd);
251  return -1;
252  }
253  log(LOG_RUNTIME_COMMUNICATION, "Runtime peer %d (fd: %d) added",
254  msg.origin_id, fd);
255 
256  return 0;
257 }
258 
262 static int process_runtime_message_hdr(struct inter_runtime_msg_hdr *hdr, int fd) {
263  int rtn;
264  switch (hdr->type) {
265  case RT_FWD_TO_MSU:
266  rtn = process_fwd_to_msu_message(hdr->payload_size, hdr->target, fd);
267  if (rtn < 0) {
268  log_error("Error processing forward message from fd %d", fd);
269  return 1;
270  }
271  return 0;
272  case INTER_RT_INIT:
273  rtn = process_init_rt_message(hdr->payload_size, fd);
274  if (rtn < 0) {
275  log_error("Error processing init runtime message from fd %d", fd);
276  return 1;
277  }
278  return 0;
279  default:
280  log_error("Received unknown message type from fd %d: %d", fd, hdr->type);
281  // Returning 1 because a return of -1 will make the epoll loop exit
282  return 1;
283  }
284 }
285 
287  struct inter_runtime_msg_hdr hdr;
288  int rtn = read_runtime_message_hdr(fd, &hdr);
289 
290  if (rtn < 0) {
291  log_error("Error reading runtime message");
292  // Return of -1 will make epoll loop exit
293  return 1;
294  } else {
295  log(LOG_INTER_RUNTIME_COMMUNICATION,
296  "Read message from runtime with fd %d", fd);
297  }
298 
299  rtn = process_runtime_message_hdr(&hdr, fd);
300  if (rtn < 0) {
301  log_error("Error processing inter-runtime message from fd %d", fd);
302  return -1;
303  }
304  return 0;
305 }
int init_connected_socket(struct sockaddr_in *addr)
Initializes a socket that is connected to a given address.
Definition: communication.c:81
Interface for general-purpose socket communication.
Collecting statistics within the runtime.
static int process_fwd_to_msu_message(size_t payload_size, int msu_id, int fd)
Sent to a newly-connected runtime to establish ID.
static struct runtime_peer runtime_peers[32]
Other runtime peer sockets.
static int process_init_rt_message(size_t payload_size, int fd)
Processes an init message which has just been received from another runtime.
Messages to be delivered to dedos_threads.
#define log_info(fmt,...)
Definition: logging.h:88
Socket-handling between runtimes.
int monitor_runtime_socket(int new_fd)
Adds a runtime to be monitored by the socket monitor.
Header for messages to runtime from another runtime.
int read_payload(int fd, size_t size, void *buff)
Reads a buffer of a given size from a file descriptor.
Definition: communication.c:37
int send_to_peer(unsigned int runtime_id, struct inter_runtime_msg_hdr *hdr, void *payload)
Sends a message to the peer runtime with the provided id.
struct msu_msg * read_msu_msg(struct local_msu *msu, int fd, size_t size)
Reads an MSU message of the given size from the provided file descriptor.
Definition: msu_message.c:171
Monitors an incoming port for messages from runtime or controller.
int add_runtime_peer(unsigned int runtime_id, int fd)
Adds the file descriptor to the list of current runtime peers.
#define log_perror(fmt,...)
Definition: logging.h:102
Logging of status messages to the terminal.
Interactions with global dfg from individual runtime.
int handle_runtime_communication(int fd)
Reads a message off of the provided file descriptor as if it is coming from a runtime peer...
void destroy_msu_msg_and_contents(struct msu_msg *msg)
Frees both the message and message data.
Definition: msu_message.c:156
Payload type: inter_runtime_init_msg.
#define log_error(fmt,...)
Definition: logging.h:101
Declares the structures and functions applicable to MSUs on the local machine.
struct local_msu * get_local_msu(unsigned int id)
Gets the local MSU with the given ID, or NULL if N/A.
Definition: local_msu.c:134
Paylaod type: output of serialize_msu_msg.
The structure that represents an MSU located on the local machine.
Definition: local_msu.h:38
int local_runtime_id()
Definition: runtime_dfg.c:91
unsigned int target
MSU ID or thread ID depending on message type.
struct msg_queue queue
Input queue to MSU.
Definition: local_msu.h:60
static int read_runtime_message_hdr(int fd, struct inter_runtime_msg_hdr *msg)
Reads a header from a peer runtime.
static int process_runtime_message_hdr(struct inter_runtime_msg_hdr *hdr, int fd)
Processes the header which has been received on fd, and processes the header's payload.
Definitions of the message types that can be passed between runtimes.
Holds the file descriptor for a single runtime peer.
int connect_to_runtime_peer(unsigned int id, struct sockaddr_in *addr)
Innitiates a connection to a runtime peer with the given ID at the given address. ...
int init_runtime_socket(int listen_port)
Initializes the socket listening for incoming connections.
unsigned int origin_id
ID of the sending runtime.
static int runtime_sock
Static (global) variable for the socket listening for other runtimes.
int init_listening_socket(int port)
Initializes a socket which is bound to and listening on the given port.
static int runtime_id(int runtime_fd)
#define MAX_RUNTIME_ID
Maximum number of other runtimes that can connect to this one.
#define log(level, fmt,...)
Log at a custom level.
Definition: logging.h:147
A message that is to be delivered to an instance of an MSU.
Definition: msu_message.h:101
Messages passed to MSUs.
ssize_t send_to_endpoint(int fd, void *data, size_t data_len)
Writes a buffer of a given size to a file descriptor.
Definition: communication.c:66
static int send_init_msg(int id)
Sends the inter_runtime_init message to the runtime with the given ID.
enum inter_runtime_msg_type type
int enqueue_msu_msg(struct msg_queue *q, struct msu_msg *data)
Enqueues a message for immediate delivery.
Definition: msu_message.c:101
#define log_warn(fmt,...)
Definition: logging.h:113