Line data Source code
1 : /*
2 : START OF LICENSE STUB
3 : DeDOS: Declarative Dispersion-Oriented Software
4 : Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 :
6 : This program is free software: you can redistribute it and/or modify
7 : it under the terms of the GNU General Public License as published by
8 : the Free Software Foundation, either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : This program is distributed in the hope that it will be useful,
12 : but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : GNU General Public License for more details.
15 :
16 : You should have received a copy of the GNU General Public License
17 : along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : END OF LICENSE STUB
19 : */
20 : /**
21 : * @file: runtime/runtime_communication.c
22 : * All communication to and from other runtimes
23 : */
24 : #include "runtime_communication.h"
25 : #include "logging.h"
26 : #include "communication.h"
27 : #include "msu_message.h"
28 : #include "inter_runtime_messages.h"
29 : #include "local_msu.h"
30 : #include "runtime_dfg.h"
31 : #include "thread_message.h"
32 : #include "rt_stats.h"
33 : #include "socket_monitor.h"
34 :
35 : #include <sys/stat.h>
36 : #include <sys/types.h>
37 : #include <sys/socket.h>
38 : #include <netinet/tcp.h>
39 : #include <stdlib.h>
40 : #include <unistd.h>
41 :
42 : /**
43 : * Static (global) variable for the socket listening
44 : * for other runtimes
45 : */
46 : static int runtime_sock = -1;
47 :
48 : /** Holds the file descriptor for a single runtime peer */
49 : struct runtime_peer {
50 : int fd;
51 : // ???: struct dfg_runtime rt
52 : // ???: uint32_t ip_address
53 : };
54 :
55 : /** Maximum number of other runtimes that can connect to this one */
56 : #define MAX_RUNTIME_ID 32
57 :
58 : /**
59 : * Other runtime peer sockets.
60 : * Structs will be initialized to 0 due to static initialization.
61 : */
62 : static struct runtime_peer runtime_peers[MAX_RUNTIME_ID];
63 :
64 7 : int send_to_peer(unsigned int runtime_id, struct inter_runtime_msg_hdr *hdr, void *payload) {
65 7 : if (runtime_id > MAX_RUNTIME_ID) {
66 0 : log_error("Requested peer %d is greater than max runtime ID %d",
67 : runtime_id, MAX_RUNTIME_ID);
68 0 : return -1;
69 : }
70 7 : struct runtime_peer *peer = &runtime_peers[runtime_id];
71 7 : if (peer->fd <= 0) {
72 1 : log_error("Requested peer %d not instantiated", runtime_id);
73 1 : return -1;
74 : }
75 :
76 6 : int rtn = send_to_endpoint(peer->fd, hdr, sizeof(*hdr));
77 6 : if (rtn <= 0) {
78 1 : log_error("Error sending header to runtime %d", runtime_id);
79 1 : return -1;
80 : }
81 :
82 5 : if (hdr->payload_size <= 0) {
83 0 : return 0;
84 : }
85 :
86 5 : rtn = send_to_endpoint(peer->fd, payload, hdr->payload_size);
87 5 : if (rtn <= 0) {
88 0 : log_error("Error sending payload to runtime %d", runtime_id);
89 0 : return -1;
90 : }
91 5 : log(LOG_RUNTIME_COMMUNICATION, "Send a payload of size %d (type %d) to runtime %d (fd: %d)",
92 : (int)hdr->payload_size, hdr->type, runtime_id, peer->fd);
93 5 : return 0;
94 : }
95 :
96 :
97 4 : int add_runtime_peer(unsigned int runtime_id, int fd) {
98 : struct stat buf;
99 4 : if (fstat(fd, &buf) != 0) {
100 1 : log_error("Cannot register non-descriptor %d for runtime ID %d", fd, runtime_id);
101 1 : return -1;
102 : }
103 3 : if (runtime_id > MAX_RUNTIME_ID) {
104 0 : log_error("Runtime ID %d too high!", runtime_id);
105 0 : return -1;
106 : }
107 3 : if (runtime_peers[runtime_id].fd != 0) {
108 0 : log_warn("Replacing runtime peer with id %d", runtime_id);
109 : }
110 3 : runtime_peers[runtime_id].fd = fd;
111 :
112 3 : int val = 1;
113 3 : int rtn = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
114 3 : if (rtn < 0) {
115 2 : log_perror("Error setting TCP_NODELAY");
116 : }
117 :
118 3 : log_info("Added runtime peer %d (fd: %d)", runtime_id, fd);
119 3 : return 0;
120 : }
121 :
122 : /**
123 : * Sends the inter_runtime_init message to the runtime with the given ID
124 : */
125 3 : static int send_init_msg(int id) {
126 3 : int local_id = local_runtime_id();
127 3 : if (local_id < 0) {
128 0 : log_error("Could not send local runtime ID to remote runtime %d", id);
129 0 : return -1;
130 : }
131 :
132 3 : struct inter_runtime_init_msg msg = {
133 : .origin_id = local_id
134 : };
135 :
136 3 : struct inter_runtime_msg_hdr hdr = {
137 : .type = INTER_RT_INIT,
138 : .target = 0,
139 : .payload_size = sizeof(msg)
140 : };
141 :
142 3 : int rtn = send_to_peer(id, &hdr, &msg);
143 3 : if (rtn < 0) {
144 0 : log_error("Could not send initial connection message to peer runtime %d", id);
145 0 : return -1;
146 : }
147 3 : return 0;
148 : }
149 :
150 3 : int connect_to_runtime_peer(unsigned int id, struct sockaddr_in *addr){
151 3 : if (runtime_peers[id].fd != 0) {
152 0 : log_warn("Attempting to replace runtime peer with id %d", id);
153 : }
154 3 : int fd = init_connected_socket(addr);
155 3 : if (fd < 0) {
156 1 : log_error("Could not connect to runtime %u", id);
157 1 : return -1;
158 : }
159 2 : runtime_peers[id].fd = fd;
160 :
161 2 : int val = 1;
162 2 : int rtn = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
163 2 : if (rtn < 0) {
164 0 : log_perror("Error setting TCP_NODELAY");
165 : }
166 :
167 2 : if (send_init_msg(id) != 0) {
168 0 : log_error("Failed to send initialization message to runtime %d (fd: %d)", id, fd);
169 0 : close(fd);
170 0 : return -1;
171 : }
172 2 : monitor_runtime_socket(fd);
173 2 : log_info("Connected to runtime peer %d (fd: %d)", id, fd);
174 2 : return 0;
175 : }
176 :
177 0 : int init_runtime_socket(int listen_port) {
178 0 : if (runtime_sock > 0) {
179 0 : log_error("Runtime socket already initialized");
180 0 : return -1;
181 : }
182 0 : int sock = init_listening_socket(listen_port);
183 0 : if (sock < 0) {
184 0 : log_error("Error initializing runtime socket");
185 0 : return -1;
186 : }
187 0 : return sock;
188 : }
189 :
190 : /**
191 : * Reads a header from a peer runtime
192 : */
193 1 : static int read_runtime_message_hdr(int fd, struct inter_runtime_msg_hdr *msg) {
194 1 : if (read_payload(fd, sizeof(*msg), msg) != 0) {
195 0 : log_error("Could not read runtime message header from socket %d", fd);
196 0 : return -1;
197 : }
198 1 : return 0;
199 : }
200 :
201 : /*
202 : * Processes a fwd_to_msu message which has just been received from another runtime
203 : */
204 1 : static int process_fwd_to_msu_message(size_t payload_size, int msu_id, int fd) {
205 :
206 1 : struct local_msu *msu = get_local_msu(msu_id);
207 1 : if (msu == NULL) {
208 0 : log_error("Error getting MSU with ID %d, requested from runtime fd %d",
209 : msu_id, fd);
210 : // Read the payload anyway just so it doesn't mess future things up
211 0 : void *unused = malloc(payload_size);
212 0 : read_payload(fd, payload_size, unused);
213 0 : free(unused);
214 0 : return -1;
215 : }
216 :
217 1 : log(LOG_RUNTIME_CONNECTION, "Attempting to read MSU message");
218 1 : struct msu_msg *msu_msg = read_msu_msg(msu, fd, payload_size);
219 1 : if (msu_msg == NULL) {
220 0 : log_error("Error reading MSU msg off of fd %d", fd);
221 0 : return -1;
222 : }
223 :
224 1 : int rtn = enqueue_msu_msg(&msu->queue, msu_msg);
225 1 : if (rtn < 0) {
226 0 : log_error("Error enqueuing inter-msu message to MSU %d from runtime fd %d",
227 : msu_id, fd);
228 0 : destroy_msu_msg_and_contents(msu_msg);
229 0 : return -1;
230 : }
231 1 : return 0;
232 : }
233 :
234 : /**
235 : * Processes an init message which has just been received from another runtime
236 : */
237 2 : static int process_init_rt_message(size_t payload_size, int fd) {
238 :
239 2 : if (payload_size != sizeof(struct inter_runtime_init_msg)) {
240 0 : log_warn("Payload size of runtime initialization message does not match init_msg");
241 : }
242 : struct inter_runtime_init_msg msg;
243 2 : if (read_payload(fd, sizeof(msg), &msg) != 0) {
244 0 : log_error("Error reading inter_runtime_init_message from fd %d", fd);
245 0 : return -1;
246 : }
247 :
248 2 : int rtn = add_runtime_peer(msg.origin_id, fd);
249 2 : if (rtn < 0) {
250 0 : log_error("Could not add runtime peer %d (fd: %d)", msg.origin_id, fd);
251 0 : return -1;
252 : }
253 2 : log(LOG_RUNTIME_COMMUNICATION, "Runtime peer %d (fd: %d) added",
254 : msg.origin_id, fd);
255 :
256 2 : return 0;
257 : }
258 :
259 : /**
260 : * Processes the header which has been received on `fd`, and processes the header's payload
261 : */
262 1 : static int process_runtime_message_hdr(struct inter_runtime_msg_hdr *hdr, int fd) {
263 : int rtn;
264 1 : switch (hdr->type) {
265 : case RT_FWD_TO_MSU:
266 0 : rtn = process_fwd_to_msu_message(hdr->payload_size, hdr->target, fd);
267 0 : if (rtn < 0) {
268 0 : log_error("Error processing forward message from fd %d", fd);
269 0 : return 1;
270 : }
271 0 : return 0;
272 : case INTER_RT_INIT:
273 1 : rtn = process_init_rt_message(hdr->payload_size, fd);
274 1 : if (rtn < 0) {
275 0 : log_error("Error processing init runtime message from fd %d", fd);
276 0 : return 1;
277 : }
278 1 : return 0;
279 : default:
280 0 : log_error("Received unknown message type from fd %d: %d", fd, hdr->type);
281 : // Returning 1 because a return of -1 will make the epoll loop exit
282 0 : return 1;
283 : }
284 : }
285 :
286 1 : int handle_runtime_communication(int fd) {
287 : struct inter_runtime_msg_hdr hdr;
288 1 : int rtn = read_runtime_message_hdr(fd, &hdr);
289 :
290 1 : if (rtn < 0) {
291 0 : log_error("Error reading runtime message");
292 : // Return of -1 will make epoll loop exit
293 0 : return 1;
294 : } else {
295 1 : log(LOG_INTER_RUNTIME_COMMUNICATION,
296 : "Read message from runtime with fd %d", fd);
297 : }
298 :
299 1 : rtn = process_runtime_message_hdr(&hdr, fd);
300 1 : if (rtn < 0) {
301 0 : log_error("Error processing inter-runtime message from fd %d", fd);
302 0 : return -1;
303 : }
304 1 : return 0;
305 : }
|