LCOV - code coverage report
Current view: top level - msus - socket_msu.c (source / functions) Hit Total Coverage
Test: unnamed Lines: 97 132 73.5 %
Date: 2018-01-11 Functions: 9 9 100.0 %

          Line data    Source code
       1             : /*
       2             : START OF LICENSE STUB
       3             :     DeDOS: Declarative Dispersion-Oriented Software
       4             :     Copyright (C) 2017 University of Pennsylvania, Georgetown University
       5             : 
       6             :     This program is free software: you can redistribute it and/or modify
       7             :     it under the terms of the GNU General Public License as published by
       8             :     the Free Software Foundation, either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     This program is distributed in the hope that it will be useful,
      12             :     but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :     GNU General Public License for more details.
      15             : 
      16             :     You should have received a copy of the GNU General Public License
      17             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             : END OF LICENSE STUB
      19             : */
      20             : #include "socket_msu.h"
      21             : #include "local_msu.h"
      22             : #include "epollops.h"
      23             : #include "logging.h"
      24             : #include "msu_message.h"
      25             : #include "runtime_dfg.h"
      26             : #include "communication.h"
      27             : #include "msu_calls.h"
      28             : #include "rt_stats.h"
      29             : 
      30             : #include <sys/epoll.h>
      31             : #include <stdlib.h>
      32             : #include <netinet/ip.h>
      33             : 
      34             : #define MAX_FDS 65536
      35             : 
      36             : struct local_msu *instance;
      37             : 
      38             : struct sock_msu_state {
      39             :     int sock_fd;
      40             :     int epoll_fd;
      41             :     struct local_msu *self;
      42             :     struct msu_type *default_target;
      43             :     struct msu_msg_hdr hdr_mask[MAX_FDS];
      44             :     struct local_msu *destinations[MAX_FDS];
      45             : };
      46             : 
      47             : 
      48             : #define SOCKET_HANDLER_TIMEOUT 500
      49             : #define SOCKET_HANDLER_BATCH_SIZE 1000
      50             : 
      51             : struct key_seed {
      52             :     struct sockaddr_in sockaddr;
      53             :     uint32_t local_ip;
      54             : };
      55             : 
      56             : #define MONITOR_NUM_FDS
      57             : 
      58         250 : int msu_monitor_fd(int fd, uint32_t events, struct local_msu *destination,
      59             :                    struct msu_msg_hdr *hdr) {
      60         250 :     if (instance == NULL) {
      61           0 :         log_error("Socket monitor instance is NULL! Must instantiate before monitoring fd");
      62           0 :         return -1;
      63             :     }
      64         250 :     struct sock_msu_state *state = instance->msu_state;
      65             : 
      66         250 :     state->hdr_mask[fd] = *hdr;
      67         250 :     state->destinations[fd] = destination;
      68             : 
      69         250 :     int rtn = enable_epoll(state->epoll_fd, fd, events);
      70             : 
      71         250 :     if (rtn < 0) {
      72           2 :         rtn = add_to_epoll(state->epoll_fd, fd, events, true);
      73             : #ifdef MONITOR_NUM_FDS
      74           2 :         increment_stat(MSU_STAT1, instance->id, 1);
      75             : #endif
      76           2 :         if (rtn < 0) {
      77           1 :             log_perror("Error enabling epoll for fd %d", fd);
      78           1 :             return -1;
      79             :         }
      80             :     }
      81         249 :     log(LOG_SOCKET_MSU, "Added fd %d to epoll", fd);
      82         249 :     return 0;
      83             : }
      84             : 
      85             : struct msu_msg_hdr blank_hdr = {};
      86             : 
      87         121 : int msu_remove_fd_monitor(int fd) {
      88         121 :     struct sock_msu_state *state = instance->msu_state;
      89             : 
      90         121 :     state->hdr_mask[fd] = blank_hdr;
      91         121 :     state->destinations[fd] = NULL;
      92             : 
      93         121 :     int rtn = epoll_ctl(state->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
      94             : 
      95         121 :     if (rtn < 0) {
      96           0 :         log_perror("Error removing fd %d from epoll", fd);
      97           0 :         return -1;
      98             :     } else {
      99             : #ifdef MONITOR_NUM_FDS
     100         121 :         increment_stat(MSU_STAT1, instance->id, -1);
     101             : #endif
     102             :     }
     103             : 
     104         121 :     return 0;
     105             : }
     106             : 
     107             : struct msu_msg_key self_key = {
     108             :     .key = {0},
     109             :     .key_len = 0,
     110             :     .id = 0
     111             : };
     112             : 
     113         371 : static int process_connection(int fd, void *v_state) {
     114         371 :     struct sock_msu_state *state = v_state;
     115         371 :     log(LOG_SOCKET_HANDLER, "Processing connection: fd = %d", fd);
     116             : 
     117         371 :     struct socket_msg *msg = malloc(sizeof(*msg));
     118         371 :     msg->fd = fd;
     119             : 
     120         371 :     struct local_msu *destination= state->destinations[fd];
     121         371 :     if (destination == NULL) {
     122             :         // This is a file descriptor we haven't seen before
     123         125 :         log(LOG_SOCKET_HANDLER, "New connection: fd = %d", fd);
     124             : 
     125             :         // Generate the key for the message
     126             :         struct msu_msg_key key;
     127             :         struct key_seed seed;
     128         125 :         socklen_t addrlen = sizeof(seed.sockaddr);
     129         125 :         int rtn = getpeername(fd, (struct sockaddr*)&seed.sockaddr, &addrlen);
     130         125 :         if (rtn < 0) {
     131           0 :             log_perror("Could not getpeername() on fd %d", fd);
     132           0 :             msu_error(instance, NULL, 0);
     133           0 :             return -1;
     134             :         }
     135         125 :         seed.local_ip = local_runtime_ip();
     136         125 :         seed_msg_key(&seed, sizeof(seed), &key);
     137             : 
     138         125 :         struct msu_type *type = state->default_target;
     139             : 
     140         125 :         rtn = init_call_msu_type(state->self, type, &key, sizeof(*msg), msg);
     141         125 :         if (rtn < 0) {
     142           0 :             log_error("Error enqueing to destination");
     143           0 :             free(msg);
     144           0 :             msu_error(instance, NULL, 0);
     145           0 :             msu_monitor_fd(fd, EPOLLIN | EPOLLOUT, NULL, &blank_hdr);
     146           0 :             return -1;
     147             :         }
     148         125 :         return 0;
     149             :     } else {
     150             :         // This file descriptor was enqueued with a particular target in mind
     151         246 :         struct msu_msg_hdr *hdr = &state->hdr_mask[fd];
     152         246 :         if (hdr == NULL) {
     153           0 :             log_error("Existing destination with null header for fd %d", fd);
     154           0 :             return -1;
     155             :         }
     156         246 :         int rtn = call_local_msu(state->self, destination, hdr, sizeof(*msg), msg);
     157         246 :         if (rtn < 0) {
     158           0 :             log_error("Error enqueueing to next MSU");
     159           0 :             msu_error(instance, NULL, 0);
     160           0 :             msu_monitor_fd(fd, EPOLLIN | EPOLLOUT, destination, hdr);
     161           0 :             return -1;
     162             :         }
     163         246 :         log(LOG_SOCKET_HANDLER,"Enqueued to MSU %d", destination->id);
     164         246 :         return 0;
     165             :     }
     166             : }
     167             : 
     168         125 : static int set_default_target(int fd, void *v_state) {
     169             : #ifdef MONITOR_NUM_FDS
     170         125 :     increment_stat(MSU_STAT1, instance->id, 1);
     171             : #endif
     172         125 :     struct sock_msu_state *state = v_state;
     173         125 :     state->destinations[fd] = NULL;
     174         125 :     return 0;
     175             : }
     176             : 
     177          14 : static int socket_handler_main_loop(struct local_msu *self) {
     178          14 :     struct sock_msu_state *state = self->msu_state;
     179             : 
     180          14 :     int rtn = epoll_loop(state->sock_fd, state->epoll_fd,
     181             :                          SOCKET_HANDLER_BATCH_SIZE, SOCKET_HANDLER_TIMEOUT, true,
     182             :                          process_connection,
     183             :                          set_default_target,
     184             :                          state);
     185          14 :     return rtn;
     186             : }
     187             : 
     188             : 
     189          14 : static int socket_msu_receive(struct local_msu *self, struct msu_msg *msg) {
     190          14 :     int rtn = socket_handler_main_loop(self);
     191          14 :     if (rtn < 0) {
     192           0 :         log_error("Error exiting socket handler main loop");
     193             :     }
     194          14 :     init_call_local_msu(self, self, &self_key, 0, NULL);
     195          14 :     return 0;
     196             : }
     197             : 
     198             : 
     199          14 : static void socket_msu_destroy(struct local_msu *self) {
     200          14 :     struct sock_msu_state *state = self->msu_state;
     201             : 
     202          14 :     int rtn = close(state->sock_fd);
     203          14 :     if (rtn == -1) {
     204           0 :         log_error("Error closing socket");
     205             :     }
     206             : 
     207          14 :     rtn = close(state->epoll_fd);
     208          14 :     if (rtn == -1) {
     209           0 :         log_error("Error closing epoll fd");
     210             :     }
     211             : 
     212          14 :     free(state);
     213          14 :     instance = NULL;
     214          14 : }
     215             : 
     216             : #define DEFAULT_PORT 8080
     217             : #define DEFAULT_TARGET 501
     218             : #define INIT_SYNTAX "<port>, <target_msu_type>"
     219             : 
     220             : struct sock_init {
     221             :     int port;
     222             :     int target_type;
     223             : };
     224             : 
     225          14 : static int parse_init_payload (char *to_parse, struct sock_init *parsed) {
     226             : 
     227          14 :     parsed->port = DEFAULT_PORT;
     228          14 :     parsed->target_type = DEFAULT_TARGET;
     229             : 
     230          14 :     if (to_parse == NULL) {
     231           0 :         log_warn("Initializing socket handler MSU with default parameters. "
     232             :                  "(FYI: init syntax is [" INIT_SYNTAX "])");
     233             :     } else {
     234             :         char *saveptr, *tok;
     235          14 :         if ( (tok = strtok_r(to_parse, " ,", &saveptr)) == NULL) {
     236           0 :             log_warn("Couldn't get port from initialization string");
     237           0 :             return 0;
     238             :         }
     239          14 :         parsed->port = atoi(tok);
     240             : 
     241          14 :         if ( (tok = strtok_r(NULL, " ,", &saveptr)) == NULL) {
     242           0 :             log_warn("Couldn't get target MSU from initialization string");
     243           0 :             return 0;
     244             :         }
     245          14 :         parsed->target_type = atoi(tok);
     246             : 
     247          14 :         if ( (tok = strtok_r(NULL, " ,", &saveptr)) != NULL) {
     248           0 :             log_warn("Discarding extra tokens from socket initialzation: %s", tok);
     249             :         }
     250             :     }
     251          14 :     return 0;
     252             : }
     253             : 
     254             : 
     255          14 : static int socket_msu_init(struct local_msu *self, struct msu_init_data *init_data) {
     256             : 
     257          14 :     if (instance != NULL) {
     258           0 :         log_error("Socket MSU already instantiated. There can be only one!");
     259           0 :         return -1;
     260             :     }
     261             : 
     262          14 :     char *init_cmd = init_data->init_data;
     263             :     struct sock_init init;
     264          14 :     parse_init_payload(init_cmd, &init);
     265             : 
     266          14 :     log(LOG_SOCKET_INIT, "Initializing socket handler with port %d, target %d",
     267             :                init.port, init.target_type);
     268             : 
     269          14 :     struct sock_msu_state *state = malloc(sizeof(*state));
     270          14 :     self->msu_state = state;
     271             : 
     272          14 :     state->default_target = get_msu_type(init.target_type);
     273             : 
     274          14 :     if (state->default_target == NULL) {
     275           0 :         log_error("Cannot get type %d for socket handler", init.target_type);
     276           0 :         return -1;
     277             :     }
     278             : 
     279          14 :     state->sock_fd = init_listening_socket(init.port);
     280          14 :     if (state->sock_fd == -1) {
     281           0 :         log_error("Couldn't initialize socket for socket handler MSU %d", self->id);
     282           0 :         return -1;
     283             :     }
     284          14 :     log(LOG_SOCKET_INIT, "Listening for traffic on port %d", init.port);
     285             : 
     286          14 :     state->epoll_fd = init_epoll(state->sock_fd);
     287          14 :     if (state->epoll_fd == -1) {
     288           0 :         log_error("Couldn't initialize epoll_Fd for socket handler MSU %d", self->id);
     289           0 :         return -1;
     290             :     }
     291             : 
     292          14 :     state->self = self;
     293          14 :     instance = self;
     294             : 
     295             : #ifdef MONITOR_NUM_FDS
     296          14 :     init_stat_item(MSU_STAT1, self->id);
     297             : #endif
     298             : 
     299          14 :     init_call_local_msu(self, self, &self_key, 0, NULL);
     300             : 
     301          14 :     return 0;
     302             : }
     303             : 
     304             : struct msu_type SOCKET_MSU_TYPE = {
     305             :     .name = "socket_msu",
     306             :     .id = SOCKET_MSU_TYPE_ID,
     307             :     .init = socket_msu_init,
     308             :     .destroy = socket_msu_destroy,
     309             :     .receive = socket_msu_receive
     310             : };
     311             : 

Generated by: LCOV version 1.10