My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
socket_msu.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
20 #include "socket_msu.h"
21 #include "local_msu.h"
22 #include "epollops.h"
23 #include "logging.h"
24 #include "msu_message.h"
25 #include "runtime_dfg.h"
26 #include "communication.h"
27 #include "msu_calls.h"
28 #include "rt_stats.h"
29 
30 #include <sys/epoll.h>
31 #include <stdlib.h>
32 #include <netinet/ip.h>
33 
34 #define MAX_FDS 65536
35 
37 
39  int sock_fd;
40  int epoll_fd;
41  struct local_msu *self;
45 };
46 
47 
48 #define SOCKET_HANDLER_TIMEOUT 500
49 #define SOCKET_HANDLER_BATCH_SIZE 1000
50 
51 struct key_seed {
52  struct sockaddr_in sockaddr;
54 };
55 
56 #define MONITOR_NUM_FDS
57 
58 int msu_monitor_fd(int fd, uint32_t events, struct local_msu *destination,
59  struct msu_msg_hdr *hdr) {
60  if (instance == NULL) {
61  log_error("Socket monitor instance is NULL! Must instantiate before monitoring fd");
62  return -1;
63  }
64  struct sock_msu_state *state = instance->msu_state;
65 
66  state->hdr_mask[fd] = *hdr;
67  state->destinations[fd] = destination;
68 
69  int rtn = enable_epoll(state->epoll_fd, fd, events);
70 
71  if (rtn < 0) {
72  rtn = add_to_epoll(state->epoll_fd, fd, events, true);
73 #ifdef MONITOR_NUM_FDS
74  increment_stat(MSU_STAT1, instance->id, 1);
75 #endif
76  if (rtn < 0) {
77  log_perror("Error enabling epoll for fd %d", fd);
78  return -1;
79  }
80  }
81  log(LOG_SOCKET_MSU, "Added fd %d to epoll", fd);
82  return 0;
83 }
84 
85 struct msu_msg_hdr blank_hdr = {};
86 
87 int msu_remove_fd_monitor(int fd) {
88  struct sock_msu_state *state = instance->msu_state;
89 
90  state->hdr_mask[fd] = blank_hdr;
91  state->destinations[fd] = NULL;
92 
93  int rtn = epoll_ctl(state->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
94 
95  if (rtn < 0) {
96  log_perror("Error removing fd %d from epoll", fd);
97  return -1;
98  } else {
99 #ifdef MONITOR_NUM_FDS
100  increment_stat(MSU_STAT1, instance->id, -1);
101 #endif
102  }
103 
104  return 0;
105 }
106 
108  .key = {0},
109  .key_len = 0,
110  .id = 0
111 };
112 
113 static int process_connection(int fd, void *v_state) {
114  struct sock_msu_state *state = v_state;
115  log(LOG_SOCKET_HANDLER, "Processing connection: fd = %d", fd);
116 
117  struct socket_msg *msg = malloc(sizeof(*msg));
118  msg->fd = fd;
119 
120  struct local_msu *destination= state->destinations[fd];
121  if (destination == NULL) {
122  // This is a file descriptor we haven't seen before
123  log(LOG_SOCKET_HANDLER, "New connection: fd = %d", fd);
124 
125  // Generate the key for the message
126  struct msu_msg_key key;
127  struct key_seed seed;
128  socklen_t addrlen = sizeof(seed.sockaddr);
129  int rtn = getpeername(fd, (struct sockaddr*)&seed.sockaddr, &addrlen);
130  if (rtn < 0) {
131  log_perror("Could not getpeername() on fd %d", fd);
132  msu_error(instance, NULL, 0);
133  return -1;
134  }
135  seed.local_ip = local_runtime_ip();
136  seed_msg_key(&seed, sizeof(seed), &key);
137 
138  struct msu_type *type = state->default_target;
139 
140  rtn = init_call_msu_type(state->self, type, &key, sizeof(*msg), msg);
141  if (rtn < 0) {
142  log_error("Error enqueing to destination");
143  free(msg);
144  msu_error(instance, NULL, 0);
145  msu_monitor_fd(fd, EPOLLIN | EPOLLOUT, NULL, &blank_hdr);
146  return -1;
147  }
148  return 0;
149  } else {
150  // This file descriptor was enqueued with a particular target in mind
151  struct msu_msg_hdr *hdr = &state->hdr_mask[fd];
152  if (hdr == NULL) {
153  log_error("Existing destination with null header for fd %d", fd);
154  return -1;
155  }
156  int rtn = call_local_msu(state->self, destination, hdr, sizeof(*msg), msg);
157  if (rtn < 0) {
158  log_error("Error enqueueing to next MSU");
159  msu_error(instance, NULL, 0);
160  msu_monitor_fd(fd, EPOLLIN | EPOLLOUT, destination, hdr);
161  return -1;
162  }
163  log(LOG_SOCKET_HANDLER,"Enqueued to MSU %d", destination->id);
164  return 0;
165  }
166 }
167 
168 static int set_default_target(int fd, void *v_state) {
169 #ifdef MONITOR_NUM_FDS
170  increment_stat(MSU_STAT1, instance->id, 1);
171 #endif
172  struct sock_msu_state *state = v_state;
173  state->destinations[fd] = NULL;
174  return 0;
175 }
176 
177 static int socket_handler_main_loop(struct local_msu *self) {
178  struct sock_msu_state *state = self->msu_state;
179 
180  int rtn = epoll_loop(state->sock_fd, state->epoll_fd,
184  state);
185  return rtn;
186 }
187 
188 
189 static int socket_msu_receive(struct local_msu *self, struct msu_msg *msg) {
190  int rtn = socket_handler_main_loop(self);
191  if (rtn < 0) {
192  log_error("Error exiting socket handler main loop");
193  }
194  init_call_local_msu(self, self, &self_key, 0, NULL);
195  return 0;
196 }
197 
198 
199 static void socket_msu_destroy(struct local_msu *self) {
200  struct sock_msu_state *state = self->msu_state;
201 
202  int rtn = close(state->sock_fd);
203  if (rtn == -1) {
204  log_error("Error closing socket");
205  }
206 
207  rtn = close(state->epoll_fd);
208  if (rtn == -1) {
209  log_error("Error closing epoll fd");
210  }
211 
212  free(state);
213  instance = NULL;
214 }
215 
216 #define DEFAULT_PORT 8080
217 #define DEFAULT_TARGET 501
218 #define INIT_SYNTAX "<port>, <target_msu_type>"
219 
220 struct sock_init {
221  int port;
223 };
224 
225 static int parse_init_payload (char *to_parse, struct sock_init *parsed) {
226 
227  parsed->port = DEFAULT_PORT;
228  parsed->target_type = DEFAULT_TARGET;
229 
230  if (to_parse == NULL) {
231  log_warn("Initializing socket handler MSU with default parameters. "
232  "(FYI: init syntax is [" INIT_SYNTAX "])");
233  } else {
234  char *saveptr, *tok;
235  if ( (tok = strtok_r(to_parse, " ,", &saveptr)) == NULL) {
236  log_warn("Couldn't get port from initialization string");
237  return 0;
238  }
239  parsed->port = atoi(tok);
240 
241  if ( (tok = strtok_r(NULL, " ,", &saveptr)) == NULL) {
242  log_warn("Couldn't get target MSU from initialization string");
243  return 0;
244  }
245  parsed->target_type = atoi(tok);
246 
247  if ( (tok = strtok_r(NULL, " ,", &saveptr)) != NULL) {
248  log_warn("Discarding extra tokens from socket initialzation: %s", tok);
249  }
250  }
251  return 0;
252 }
253 
254 
255 static int socket_msu_init(struct local_msu *self, struct msu_init_data *init_data) {
256 
257  if (instance != NULL) {
258  log_error("Socket MSU already instantiated. There can be only one!");
259  return -1;
260  }
261 
262  char *init_cmd = init_data->init_data;
263  struct sock_init init;
264  parse_init_payload(init_cmd, &init);
265 
266  log(LOG_SOCKET_INIT, "Initializing socket handler with port %d, target %d",
267  init.port, init.target_type);
268 
269  struct sock_msu_state *state = malloc(sizeof(*state));
270  self->msu_state = state;
271 
272  state->default_target = get_msu_type(init.target_type);
273 
274  if (state->default_target == NULL) {
275  log_error("Cannot get type %d for socket handler", init.target_type);
276  return -1;
277  }
278 
279  state->sock_fd = init_listening_socket(init.port);
280  if (state->sock_fd == -1) {
281  log_error("Couldn't initialize socket for socket handler MSU %d", self->id);
282  return -1;
283  }
284  log(LOG_SOCKET_INIT, "Listening for traffic on port %d", init.port);
285 
286  state->epoll_fd = init_epoll(state->sock_fd);
287  if (state->epoll_fd == -1) {
288  log_error("Couldn't initialize epoll_Fd for socket handler MSU %d", self->id);
289  return -1;
290  }
291 
292  state->self = self;
293  instance = self;
294 
295 #ifdef MONITOR_NUM_FDS
297 #endif
298 
299  init_call_local_msu(self, self, &self_key, 0, NULL);
300 
301  return 0;
302 }
303 
305  .name = "socket_msu",
306  .id = SOCKET_MSU_TYPE_ID,
307  .init = socket_msu_init,
308  .destroy = socket_msu_destroy,
309  .receive = socket_msu_receive
310 };
311 
Interface for general-purpose socket communication.
Collecting statistics within the runtime.
int msu_error(struct local_msu *msu, struct msu_msg_hdr *hdr, int broadcast)
Definition: local_msu.c:353
struct msu_msg_hdr hdr_mask[65536]
Definition: socket_msu.c:43
#define INIT_SYNTAX
Definition: socket_msu.c:218
uint32_t local_runtime_ip()
Definition: runtime_dfg.c:107
Header for messages passed to MSUs.
Definition: msu_message.h:85
struct composite_key key
The full, arbitrary-length, unique key (used in state)
Definition: msu_message.h:48
static int socket_handler_main_loop(struct local_msu *self)
Definition: socket_msu.c:177
int msu_remove_fd_monitor(int fd)
Definition: socket_msu.c:87
Wrapper functions for epoll to manage event-based communication.
#define MAX_FDS
Definition: socket_msu.c:34
#define SOCKET_HANDLER_TIMEOUT
Definition: socket_msu.c:48
int target_type
Definition: socket_msu.c:222
struct local_msu * self
Definition: socket_msu.c:41
static void socket_msu_destroy(struct local_msu *self)
Definition: socket_msu.c:199
static int socket_msu_init(struct local_msu *self, struct msu_init_data *init_data)
Definition: socket_msu.c:255
void * msu_state
State related to the entire instance of the MSU, not just individual items.
Definition: local_msu.h:66
int init_call_msu_type(struct local_msu *sender, struct msu_type *dst_type, struct msu_msg_key *key, size_t data_size, void *data)
Sends an MSU message to a destination of the specified type.
Definition: msu_calls.c:205
#define log_perror(fmt,...)
Definition: logging.h:102
#define SOCKET_MSU_TYPE_ID
Definition: msu_ids.h:23
struct msu_type * get_msu_type(int id)
Gets the MSU type with the provided ID.
Definition: msu_type.c:80
Logging of status messages to the terminal.
unsigned int id
Unique ID for a local MSU.
Definition: local_msu.h:54
int seed_msg_key(void *seed, size_t seed_size, struct msu_msg_key *key)
Sets the key's composite-ID to the provided value, and sets the key's ID to a hash of the value...
Definition: msu_message.c:62
Interactions with global dfg from individual runtime.
int add_to_epoll(int epoll_fd, int new_fd, uint32_t events, bool oneshot)
Adds a file descriptor to an epoll instance.
Definition: epollops.c:59
static int parse_init_payload(char *to_parse, struct sock_init *parsed)
Definition: socket_msu.c:225
Declares the methods available for calling an MSU from another MSU.
int init_epoll(int socket_fd)
Initializes a new instance of an epoll file descriptor and adds a socket to it, listening for input o...
Definition: epollops.c:182
For custom MSU statistics.
Definition: stat_ids.h:53
#define log_error(fmt,...)
Definition: logging.h:101
int enable_epoll(int epoll_fd, int new_fd, uint32_t events)
Enables a file descriptor which has already been aded to an epoll instance.
Definition: epollops.c:42
Declares the structures and functions applicable to MSUs on the local machine.
int init_stat_item(enum stat_id stat_id, unsigned int item_id)
Initializes a stat item so that statistics can be logged to it.
Definition: rt_stats.c:719
struct msu_msg_key self_key
Definition: socket_msu.c:107
struct msu_msg_hdr blank_hdr
Definition: socket_msu.c:85
static int socket_msu_receive(struct local_msu *self, struct msu_msg *msg)
Definition: socket_msu.c:189
The structure that represents an MSU located on the local machine.
Definition: local_msu.h:38
Data with which an MSU is initialized, and the payload for messages of type CTRL_CREATE_MSU.
Definition: dfg.h:66
#define DEFAULT_PORT
Definition: socket_msu.c:216
int init_call_local_msu(struct local_msu *sender, struct local_msu *dest, struct msu_msg_key *key, size_t data_size, void *data)
Enqueues a new message in the queue of a local MSU.
Definition: msu_calls.c:134
static int process_connection(int fd, void *v_state)
Definition: socket_msu.c:113
static int set_default_target(int fd, void *v_state)
Definition: socket_msu.c:168
#define SOCKET_HANDLER_BATCH_SIZE
Definition: socket_msu.c:49
int epoll_loop(int socket_fd, int epoll_fd, int batch_size, int timeout, bool oneshot, int(*connection_handler)(int, void *), int(*accept_handler)(int, void *), void *data)
The event-based loop for epoll_wait.
Definition: epollops.c:132
struct msu_type SOCKET_MSU_TYPE
Definition: socket_msu.c:304
struct local_msu * instance
Definition: socket_msu.c:36
Defines a type of MSU.
Definition: msu_type.h:46
struct sockaddr_in sockaddr
Definition: socket_msu.c:52
int init_listening_socket(int port)
Initializes a socket which is bound to and listening on the given port.
int increment_stat(enum stat_id stat_id, unsigned int item_id, double value)
Increments the given statistic by the provided value.
Definition: rt_stats.c:389
char init_data[64]
Definition: dfg.h:67
struct msu_type * default_target
Definition: socket_msu.c:42
int msu_monitor_fd(int fd, uint32_t events, struct local_msu *destination, struct msu_msg_hdr *hdr)
Definition: socket_msu.c:58
#define DEFAULT_TARGET
Definition: socket_msu.c:217
unsigned int uint32_t
Definition: uthash.h:96
#define log(level, fmt,...)
Log at a custom level.
Definition: logging.h:147
A message that is to be delivered to an instance of an MSU.
Definition: msu_message.h:101
Messages passed to MSUs.
int call_local_msu(struct local_msu *sender, struct local_msu *dest, struct msu_msg_hdr *hdr, size_t data_size, void *data)
Enqueues a message in the queue of a local MSU.
Definition: msu_calls.c:77
char * name
Name for the msu type.
Definition: msu_type.h:48
Used to uniquely identify the source of a message, used in state storage as well as routing...
Definition: msu_message.h:46
uint32_t local_ip
Definition: socket_msu.c:53
struct local_msu * destinations[65536]
Definition: socket_msu.c:44
#define log_warn(fmt,...)
Definition: logging.h:113
state
Definition: http_parser.c:298