Line data Source code
1 : /*
2 : START OF LICENSE STUB
3 : DeDOS: Declarative Dispersion-Oriented Software
4 : Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 :
6 : This program is free software: you can redistribute it and/or modify
7 : it under the terms of the GNU General Public License as published by
8 : the Free Software Foundation, either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : This program is distributed in the hope that it will be useful,
12 : but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : GNU General Public License for more details.
15 :
16 : You should have received a copy of the GNU General Public License
17 : along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : END OF LICENSE STUB
19 : */
20 : #include "socket_msu.h"
21 : #include "local_msu.h"
22 : #include "epollops.h"
23 : #include "logging.h"
24 : #include "msu_message.h"
25 : #include "runtime_dfg.h"
26 : #include "communication.h"
27 : #include "msu_calls.h"
28 : #include "rt_stats.h"
29 :
30 : #include <sys/epoll.h>
31 : #include <stdlib.h>
32 : #include <netinet/ip.h>
33 :
34 : #define MAX_FDS 65536
35 :
36 : struct local_msu *instance;
37 :
38 : struct sock_msu_state {
39 : int sock_fd;
40 : int epoll_fd;
41 : struct local_msu *self;
42 : struct msu_type *default_target;
43 : struct msu_msg_hdr hdr_mask[MAX_FDS];
44 : struct local_msu *destinations[MAX_FDS];
45 : };
46 :
47 :
48 : #define SOCKET_HANDLER_TIMEOUT 500
49 : #define SOCKET_HANDLER_BATCH_SIZE 1000
50 :
51 : struct key_seed {
52 : struct sockaddr_in sockaddr;
53 : uint32_t local_ip;
54 : };
55 :
56 : #define MONITOR_NUM_FDS
57 :
58 250 : int msu_monitor_fd(int fd, uint32_t events, struct local_msu *destination,
59 : struct msu_msg_hdr *hdr) {
60 250 : if (instance == NULL) {
61 0 : log_error("Socket monitor instance is NULL! Must instantiate before monitoring fd");
62 0 : return -1;
63 : }
64 250 : struct sock_msu_state *state = instance->msu_state;
65 :
66 250 : state->hdr_mask[fd] = *hdr;
67 250 : state->destinations[fd] = destination;
68 :
69 250 : int rtn = enable_epoll(state->epoll_fd, fd, events);
70 :
71 250 : if (rtn < 0) {
72 2 : rtn = add_to_epoll(state->epoll_fd, fd, events, true);
73 : #ifdef MONITOR_NUM_FDS
74 2 : increment_stat(MSU_STAT1, instance->id, 1);
75 : #endif
76 2 : if (rtn < 0) {
77 1 : log_perror("Error enabling epoll for fd %d", fd);
78 1 : return -1;
79 : }
80 : }
81 249 : log(LOG_SOCKET_MSU, "Added fd %d to epoll", fd);
82 249 : return 0;
83 : }
84 :
85 : struct msu_msg_hdr blank_hdr = {};
86 :
87 121 : int msu_remove_fd_monitor(int fd) {
88 121 : struct sock_msu_state *state = instance->msu_state;
89 :
90 121 : state->hdr_mask[fd] = blank_hdr;
91 121 : state->destinations[fd] = NULL;
92 :
93 121 : int rtn = epoll_ctl(state->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
94 :
95 121 : if (rtn < 0) {
96 0 : log_perror("Error removing fd %d from epoll", fd);
97 0 : return -1;
98 : } else {
99 : #ifdef MONITOR_NUM_FDS
100 121 : increment_stat(MSU_STAT1, instance->id, -1);
101 : #endif
102 : }
103 :
104 121 : return 0;
105 : }
106 :
107 : struct msu_msg_key self_key = {
108 : .key = {0},
109 : .key_len = 0,
110 : .id = 0
111 : };
112 :
113 371 : static int process_connection(int fd, void *v_state) {
114 371 : struct sock_msu_state *state = v_state;
115 371 : log(LOG_SOCKET_HANDLER, "Processing connection: fd = %d", fd);
116 :
117 371 : struct socket_msg *msg = malloc(sizeof(*msg));
118 371 : msg->fd = fd;
119 :
120 371 : struct local_msu *destination= state->destinations[fd];
121 371 : if (destination == NULL) {
122 : // This is a file descriptor we haven't seen before
123 125 : log(LOG_SOCKET_HANDLER, "New connection: fd = %d", fd);
124 :
125 : // Generate the key for the message
126 : struct msu_msg_key key;
127 : struct key_seed seed;
128 125 : socklen_t addrlen = sizeof(seed.sockaddr);
129 125 : int rtn = getpeername(fd, (struct sockaddr*)&seed.sockaddr, &addrlen);
130 125 : if (rtn < 0) {
131 0 : log_perror("Could not getpeername() on fd %d", fd);
132 0 : msu_error(instance, NULL, 0);
133 0 : return -1;
134 : }
135 125 : seed.local_ip = local_runtime_ip();
136 125 : seed_msg_key(&seed, sizeof(seed), &key);
137 :
138 125 : struct msu_type *type = state->default_target;
139 :
140 125 : rtn = init_call_msu_type(state->self, type, &key, sizeof(*msg), msg);
141 125 : if (rtn < 0) {
142 0 : log_error("Error enqueing to destination");
143 0 : free(msg);
144 0 : msu_error(instance, NULL, 0);
145 0 : msu_monitor_fd(fd, EPOLLIN | EPOLLOUT, NULL, &blank_hdr);
146 0 : return -1;
147 : }
148 125 : return 0;
149 : } else {
150 : // This file descriptor was enqueued with a particular target in mind
151 246 : struct msu_msg_hdr *hdr = &state->hdr_mask[fd];
152 246 : if (hdr == NULL) {
153 0 : log_error("Existing destination with null header for fd %d", fd);
154 0 : return -1;
155 : }
156 246 : int rtn = call_local_msu(state->self, destination, hdr, sizeof(*msg), msg);
157 246 : if (rtn < 0) {
158 0 : log_error("Error enqueueing to next MSU");
159 0 : msu_error(instance, NULL, 0);
160 0 : msu_monitor_fd(fd, EPOLLIN | EPOLLOUT, destination, hdr);
161 0 : return -1;
162 : }
163 246 : log(LOG_SOCKET_HANDLER,"Enqueued to MSU %d", destination->id);
164 246 : return 0;
165 : }
166 : }
167 :
168 125 : static int set_default_target(int fd, void *v_state) {
169 : #ifdef MONITOR_NUM_FDS
170 125 : increment_stat(MSU_STAT1, instance->id, 1);
171 : #endif
172 125 : struct sock_msu_state *state = v_state;
173 125 : state->destinations[fd] = NULL;
174 125 : return 0;
175 : }
176 :
177 14 : static int socket_handler_main_loop(struct local_msu *self) {
178 14 : struct sock_msu_state *state = self->msu_state;
179 :
180 14 : int rtn = epoll_loop(state->sock_fd, state->epoll_fd,
181 : SOCKET_HANDLER_BATCH_SIZE, SOCKET_HANDLER_TIMEOUT, true,
182 : process_connection,
183 : set_default_target,
184 : state);
185 14 : return rtn;
186 : }
187 :
188 :
189 14 : static int socket_msu_receive(struct local_msu *self, struct msu_msg *msg) {
190 14 : int rtn = socket_handler_main_loop(self);
191 14 : if (rtn < 0) {
192 0 : log_error("Error exiting socket handler main loop");
193 : }
194 14 : init_call_local_msu(self, self, &self_key, 0, NULL);
195 14 : return 0;
196 : }
197 :
198 :
199 14 : static void socket_msu_destroy(struct local_msu *self) {
200 14 : struct sock_msu_state *state = self->msu_state;
201 :
202 14 : int rtn = close(state->sock_fd);
203 14 : if (rtn == -1) {
204 0 : log_error("Error closing socket");
205 : }
206 :
207 14 : rtn = close(state->epoll_fd);
208 14 : if (rtn == -1) {
209 0 : log_error("Error closing epoll fd");
210 : }
211 :
212 14 : free(state);
213 14 : instance = NULL;
214 14 : }
215 :
216 : #define DEFAULT_PORT 8080
217 : #define DEFAULT_TARGET 501
218 : #define INIT_SYNTAX "<port>, <target_msu_type>"
219 :
220 : struct sock_init {
221 : int port;
222 : int target_type;
223 : };
224 :
225 14 : static int parse_init_payload (char *to_parse, struct sock_init *parsed) {
226 :
227 14 : parsed->port = DEFAULT_PORT;
228 14 : parsed->target_type = DEFAULT_TARGET;
229 :
230 14 : if (to_parse == NULL) {
231 0 : log_warn("Initializing socket handler MSU with default parameters. "
232 : "(FYI: init syntax is [" INIT_SYNTAX "])");
233 : } else {
234 : char *saveptr, *tok;
235 14 : if ( (tok = strtok_r(to_parse, " ,", &saveptr)) == NULL) {
236 0 : log_warn("Couldn't get port from initialization string");
237 0 : return 0;
238 : }
239 14 : parsed->port = atoi(tok);
240 :
241 14 : if ( (tok = strtok_r(NULL, " ,", &saveptr)) == NULL) {
242 0 : log_warn("Couldn't get target MSU from initialization string");
243 0 : return 0;
244 : }
245 14 : parsed->target_type = atoi(tok);
246 :
247 14 : if ( (tok = strtok_r(NULL, " ,", &saveptr)) != NULL) {
248 0 : log_warn("Discarding extra tokens from socket initialzation: %s", tok);
249 : }
250 : }
251 14 : return 0;
252 : }
253 :
254 :
255 14 : static int socket_msu_init(struct local_msu *self, struct msu_init_data *init_data) {
256 :
257 14 : if (instance != NULL) {
258 0 : log_error("Socket MSU already instantiated. There can be only one!");
259 0 : return -1;
260 : }
261 :
262 14 : char *init_cmd = init_data->init_data;
263 : struct sock_init init;
264 14 : parse_init_payload(init_cmd, &init);
265 :
266 14 : log(LOG_SOCKET_INIT, "Initializing socket handler with port %d, target %d",
267 : init.port, init.target_type);
268 :
269 14 : struct sock_msu_state *state = malloc(sizeof(*state));
270 14 : self->msu_state = state;
271 :
272 14 : state->default_target = get_msu_type(init.target_type);
273 :
274 14 : if (state->default_target == NULL) {
275 0 : log_error("Cannot get type %d for socket handler", init.target_type);
276 0 : return -1;
277 : }
278 :
279 14 : state->sock_fd = init_listening_socket(init.port);
280 14 : if (state->sock_fd == -1) {
281 0 : log_error("Couldn't initialize socket for socket handler MSU %d", self->id);
282 0 : return -1;
283 : }
284 14 : log(LOG_SOCKET_INIT, "Listening for traffic on port %d", init.port);
285 :
286 14 : state->epoll_fd = init_epoll(state->sock_fd);
287 14 : if (state->epoll_fd == -1) {
288 0 : log_error("Couldn't initialize epoll_Fd for socket handler MSU %d", self->id);
289 0 : return -1;
290 : }
291 :
292 14 : state->self = self;
293 14 : instance = self;
294 :
295 : #ifdef MONITOR_NUM_FDS
296 14 : init_stat_item(MSU_STAT1, self->id);
297 : #endif
298 :
299 14 : init_call_local_msu(self, self, &self_key, 0, NULL);
300 :
301 14 : return 0;
302 : }
303 :
304 : struct msu_type SOCKET_MSU_TYPE = {
305 : .name = "socket_msu",
306 : .id = SOCKET_MSU_TYPE_ID,
307 : .init = socket_msu_init,
308 : .destroy = socket_msu_destroy,
309 : .receive = socket_msu_receive
310 : };
311 :
|