LCOV - code coverage report
Current view: top level - runtime - controller_communication.c (source / functions) Hit Total Coverage
Test: unnamed Lines: 82 235 34.9 %
Date: 2018-01-11 Functions: 10 17 58.8 %

          Line data    Source code
       1             : /*
       2             : START OF LICENSE STUB
       3             :     DeDOS: Declarative Dispersion-Oriented Software
       4             :     Copyright (C) 2017 University of Pennsylvania, Georgetown University
       5             : 
       6             :     This program is free software: you can redistribute it and/or modify
       7             :     it under the terms of the GNU General Public License as published by
       8             :     the Free Software Foundation, either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     This program is distributed in the hope that it will be useful,
      12             :     but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :     GNU General Public License for more details.
      15             : 
      16             :     You should have received a copy of the GNU General Public License
      17             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             : END OF LICENSE STUB
      19             : */
      20             : /**
      21             :  * @file: controller_communication.c
      22             :  * Communication with global controller from runtime
      23             :  */
      24             : 
      25             : #include "controller_communication.h"
      26             : #include "communication.h"
      27             : #include "logging.h"
      28             : #include "socket_monitor.h"
      29             : #include "dedos_threads.h"
      30             : #include "thread_message.h"
      31             : #include "runtime_dfg.h"
      32             : #include "rt_stats.h"
      33             : #include "worker_thread.h"
      34             : #include "output_thread.h"
      35             : 
      36             : #include <stdlib.h>
      37             : #include <arpa/inet.h>
      38             : #include <string.h>
      39             : 
      40             : /**
      41             :  * Static (global) variable to hold the socket
      42             :  * connecting to the global controller
      43             :  */
      44             : static int controller_sock = -1;
      45             : 
      46             : 
      47           2 : int send_to_controller(struct rt_controller_msg_hdr *hdr, void *payload) {
      48             : 
      49           2 :     if (controller_sock < 0) {
      50           0 :         log_error("Controller socket not initialized");
      51           0 :         return -1;
      52             :     }
      53             : 
      54           2 :     int rtn = send_to_endpoint(controller_sock, hdr, sizeof(*hdr));
      55           2 :     if (rtn <= 0) {
      56           0 :         log_error("Error sending header to controller");
      57           0 :         return -1;
      58             :     }
      59           2 :     if (hdr->payload_size <= 0) {
      60           0 :         return 0;
      61             :     }
      62             : 
      63           2 :     rtn = send_to_endpoint(controller_sock, payload, hdr->payload_size);
      64           2 :     if (rtn <= 0) {
      65           0 :         log_error("Error sending payload to controller");
      66           0 :         return -1;
      67             :     }
      68             : 
      69           2 :     log(LOG_CONTROLLER_COMMUNICATION, "Sent payload of size %d type %d to controller",
      70             :                (int)hdr->payload_size, hdr->type);
      71           2 :     return 0;
      72             : }
      73             : 
      74             : /**
      75             :  * Sends the initilization message containing runtime ID, ip and port to
      76             :  * global controller
      77             :  */
      78           2 : static int send_ctl_init_msg() {
      79           2 :     int local_id = local_runtime_id();
      80           2 :     if (local_id < 0) {
      81           0 :         log_error("Could not get local runtime ID to send to controller");
      82           0 :         return -1;
      83             :     }
      84           2 :     uint32_t ip = local_runtime_ip();
      85           2 :     int port = local_runtime_port();
      86             : 
      87           2 :     struct rt_controller_init_msg msg = {
      88             :         .runtime_id = local_id,
      89             :         .ip = ip,
      90             :         .port = port
      91             :     };
      92             : 
      93           2 :     struct rt_controller_msg_hdr hdr = {
      94             :         .type = RT_CTL_INIT,
      95             :         .payload_size = sizeof(msg)
      96             :     };
      97             : 
      98           2 :     return send_to_controller(&hdr, &msg);
      99             : }
     100             : 
     101             : /**
     102             :  * Initializes a connection to the global controller
     103             :  * @returns 0 on success, -1 on error
     104             :  */
     105           1 : static int connect_to_controller(struct sockaddr_in *addr) {
     106             : 
     107           1 :     if (controller_sock != -1) {
     108           0 :         log_error("Controller socket already initialized");
     109           0 :         return -1;
     110             :     }
     111             : 
     112           1 :     controller_sock = init_connected_socket(addr);
     113             : 
     114           1 :     if (controller_sock < 0) {
     115           0 :         log_error("Error connecting to global controller!");
     116           0 :         return -1;
     117             :     }
     118             : 
     119             :     char ip[INET_ADDRSTRLEN];
     120           1 :     inet_ntop(AF_INET, &addr->sin_addr, ip, INET_ADDRSTRLEN);
     121           1 :     int port = ntohs(addr->sin_port);
     122             : 
     123           1 :     log_info("Connected to global controller at %s:%d", ip, port);
     124             : 
     125           1 :     int rtn = send_ctl_init_msg();
     126           1 :     if (rtn < 0) {
     127           0 :         log_error("Error sending initialization message to controller");
     128           0 :         return -1;
     129             :     }
     130           1 :     return controller_sock;
     131             : }
     132             : 
     133             : /**
     134             :  * Macro to check whether the size of a message matches the size
     135             :  * of the struct it's supposed to be
     136             :  */
     137             : #define CHECK_MSG_SIZE(msg, target) \
     138             :     if (msg->payload_size != sizeof(target)) { \
     139             :         log_warn("Message data size (%d) does not match size" \
     140             :                  "of target type (%d)" #target, (int)msg->payload_size , \
     141             :                  (int)sizeof(target)); \
     142             :         return -1; \
     143             :     } \
     144             :     return 0;
     145             : 
     146             : /**
     147             :  * Checks whether the size of a message matches the size of its target struct
     148             :  */
     149           2 : static int verify_msg_size(struct ctrl_runtime_msg_hdr *msg) {
     150           2 :     switch (msg->type) {
     151             :         case CTRL_CONNECT_TO_RUNTIME:
     152           0 :             CHECK_MSG_SIZE(msg, struct ctrl_add_runtime_msg);
     153             :         case CTRL_CREATE_THREAD:
     154           0 :             CHECK_MSG_SIZE(msg, struct ctrl_create_thread_msg);
     155             :         case CTRL_DELETE_THREAD:
     156           0 :             CHECK_MSG_SIZE(msg, struct ctrl_create_thread_msg);
     157             :         case CTRL_MODIFY_ROUTE:
     158           0 :             CHECK_MSG_SIZE(msg, struct ctrl_route_msg);
     159             :         case CTRL_CREATE_MSU:
     160           2 :             CHECK_MSG_SIZE(msg, struct ctrl_create_msu_msg);
     161             :         case CTRL_DELETE_MSU:
     162           0 :             CHECK_MSG_SIZE(msg, struct ctrl_delete_msu_msg);
     163             :         case CTRL_MSU_ROUTES:
     164           0 :             CHECK_MSG_SIZE(msg, struct ctrl_msu_route_msg);
     165             :         default:
     166           0 :             log_error("Received unknown message type: %d", msg->type);
     167           0 :             return -1;
     168             :     }
     169             : }
     170             : 
     171             : /**
     172             :  * Processes a received ctrl_add_runtime_msg
     173             :  */
     174           1 : static int process_connect_to_runtime(struct ctrl_add_runtime_msg *msg) {
     175           1 :     struct sockaddr_in addr = {};
     176           1 :     addr.sin_family = AF_INET;
     177           1 :     addr.sin_addr.s_addr = msg->ip;
     178           1 :     addr.sin_port = htons(msg->port);
     179             : 
     180           1 :     int rtn = connect_to_runtime_peer(msg->runtime_id, &addr);
     181           1 :     if (rtn < 0) {
     182           0 :         log_error("Could not add runtime peer");
     183           0 :         return -1;
     184             :     }
     185           1 :     return 0;
     186             : }
     187             : 
     188             : /**
     189             :  * Processes a received ctrl_create_thread_msg
     190             :  */
     191           2 : static int process_create_thread_msg(struct ctrl_create_thread_msg *msg) {
     192           2 :     int id = msg->thread_id;
     193           2 :     int rtn = create_worker_thread(id, msg->mode);
     194           2 :     if (rtn < 0) {
     195           1 :         log_error("Error creating worker thread %d", id);
     196           1 :         return -1;
     197             :     }
     198           1 :     log(LOG_THREAD_CREATION, "Created worker thread %d", id);
     199           1 :     return 0;
     200             : }
     201             : 
     202             : /**
     203             :  * Processes a received ctrl_route_msg
     204             :  */
     205           4 : static int process_ctrl_route_msg(struct ctrl_route_msg *msg) {
     206             :     int rtn;
     207           4 :     log(LOG_CONTROLLER_COMMUNICATION, "Got control route message of type %d", msg->type);
     208           4 :     switch (msg->type) {
     209             :         case CREATE_ROUTE:
     210           2 :             rtn = init_route(msg->route_id, msg->type_id);
     211           2 :             if (rtn < 0) {
     212           1 :                 log_error("Error creating new route of id %d, type %d",
     213             :                           msg->route_id, msg->type_id);
     214           1 :                 return 1;
     215             :             }
     216           1 :             return 0;
     217             :         case ADD_ENDPOINT:;
     218             :             struct msu_endpoint endpoint;
     219           2 :             int rtn = init_msu_endpoint(msg->msu_id, msg->runtime_id, &endpoint);
     220           2 :             if (rtn < 0) {
     221           1 :                 log_error("Cannot initilize runtime endpoint for adding "
     222             :                           "endpoint %d to route %d", msg->msu_id, msg->route_id);
     223           1 :                 return 1;
     224             :             }
     225           1 :             rtn = add_route_endpoint(msg->route_id, endpoint, msg->key);
     226           1 :             if (rtn < 0) {
     227           0 :                 log_error("Error adding endpoint %d to route %d with key %d",
     228             :                           msg->msu_id, msg->route_id, msg->key);
     229           0 :                 return 1;
     230             :             }
     231           1 :             return 0;
     232             :         case DEL_ENDPOINT:
     233           0 :             rtn = remove_route_endpoint(msg->route_id, msg->msu_id);
     234           0 :             if (rtn < 0) {
     235           0 :                 log_error("Error removing endpoint %d from route %d",
     236             :                           msg->msu_id, msg->route_id);
     237           0 :                 return 1;
     238             :             }
     239           0 :             return 0;
     240             :         case MOD_ENDPOINT:
     241           0 :             rtn = modify_route_endpoint(msg->route_id, msg->msu_id, msg->key);
     242           0 :             if (rtn < 0) {
     243           0 :                 log_error("Error modifying endpoint %d on route %d to have key %d",
     244             :                           msg->msu_id, msg->route_id, msg->key);
     245           0 :                 return 1;
     246             :             }
     247           0 :             return 0;
     248             :         default:
     249           0 :             log_error("Unknown route control message type received: %d", msg->type);
     250           0 :             return -1;
     251             :     }
     252             : }
     253             : 
     254             : /**
     255             :  * Gets the corresponding thread_msg_type for a ctrl_runtime_msg_type
     256             :  */
     257           2 : static enum thread_msg_type get_thread_msg_type(enum ctrl_runtime_msg_type type) {
     258           2 :     switch (type) {
     259             :         case CTRL_CREATE_MSU:
     260           2 :             return CREATE_MSU;
     261             :         case CTRL_DELETE_MSU:
     262           0 :             return DELETE_MSU;
     263             :         case CTRL_MSU_ROUTES:
     264           0 :             return MSU_ROUTE;
     265             :         default:
     266           0 :             log_error("Unknown thread message type %d", type);
     267           0 :             return UNKNOWN_THREAD_MSG;
     268             :     }
     269             : }
     270             : 
     271             : /**
     272             :  * Constructs a thread message from a ctrl_runtime_msg_hdr, reading any additional
     273             :  * information it needs off of the associated socket
     274             :  * @param hdr Header describing the information available to read
     275             :  * @param fd The file descriptor off of which to read the remainder of the control message
     276             :  * @return Created thread_msg on success, NULL on error
     277             :  */
     278           2 : static struct thread_msg *thread_msg_from_ctrl_hdr(struct ctrl_runtime_msg_hdr *hdr, int fd) {
     279           2 :     if (verify_msg_size(hdr) != 0) {
     280           0 :         log_warn("May not process message. Incorrect payload size for type.");
     281             :     }
     282             : 
     283           2 :     void *msg_data = malloc(hdr->payload_size);
     284           2 :     int rtn = read_payload(fd, hdr->payload_size, msg_data);
     285           2 :     if (rtn < 0) {
     286           0 :         log_error("Error reading control payload. Cannot process message");
     287           0 :         free(msg_data);
     288           0 :         return NULL;
     289             :     }
     290           2 :     log(LOG_CONTROLLER_COMMUNICATION, "Read control payload %p of size %d",
     291             :                msg_data, (int)hdr->payload_size);
     292           2 :     enum thread_msg_type type = get_thread_msg_type(hdr->type);
     293           2 :     struct thread_msg *thread_msg = construct_thread_msg(type, hdr->payload_size, msg_data);
     294           2 :     thread_msg->ack_id = hdr->id;
     295           2 :     return thread_msg;
     296             : }
     297             : 
     298             : /**
     299             :  * Constructs a thread_msg from a control-runtime message and passes it to the relevant thread.
     300             :  * @param hdr Header describing info available to read
     301             :  * @param fd File descriptor to read message from
     302             :  * @return 0 on success, -1 on error
     303             :  */
     304           1 : static int pass_ctrl_msg_to_thread(struct ctrl_runtime_msg_hdr *hdr, int fd) {
     305           1 :     struct thread_msg *thread_msg = thread_msg_from_ctrl_hdr(hdr, fd);
     306           1 :     if (thread_msg == NULL) {
     307           0 :         log_error("Error constructing thread msg from control hdr");
     308           0 :         return -1;
     309             :     }
     310           1 :     struct dedos_thread *thread = get_dedos_thread(hdr->thread_id);
     311           1 :     if (thread == NULL) {
     312           0 :         log_error("Error getting dedos thread %d to deliver control message",
     313             :                   hdr->thread_id);
     314           0 :         destroy_thread_msg(thread_msg);
     315           0 :         return -1;
     316             :     }
     317             : 
     318           1 :     int rtn = enqueue_thread_msg(thread_msg, &thread->queue);
     319           1 :     if (rtn < 0) {
     320           0 :         log_error("Error enquing control message on thread %d", hdr->thread_id);
     321           0 :         return -1;
     322             :     }
     323           1 :     return 0;
     324             : }
     325             : 
     326             : /**
     327             :  * Processes a received control message that is due for delivery to this thread.
     328             :  * @param hdr The header for the control message
     329             :  * @param fd File descriptor off of which to read the control message
     330             :  * @return 0 on success, -1 on error
     331             :  */
     332           0 : static int process_ctrl_message(struct ctrl_runtime_msg_hdr *hdr, int fd) {
     333           0 :     if (verify_msg_size(hdr) != 0) {
     334           0 :         log_warn("May not process message. Incorrect payload size for type");
     335             :     }
     336             : 
     337           0 :     char msg_data[hdr->payload_size];
     338           0 :     int rtn = read_payload(fd, hdr->payload_size, (void*)msg_data);
     339             : 
     340           0 :     if (rtn < 0) {
     341           0 :         log_error("Error reading control payload. Cannot process message");
     342           0 :         return -1;
     343             :     }
     344           0 :     log(LOG_CONTROLLER_COMMUNICATION, "Read control payload %p of size %d",
     345             :                msg_data, (int)hdr->payload_size);
     346             : 
     347           0 :     switch (hdr->type) {
     348             :         case CTRL_CONNECT_TO_RUNTIME:
     349           0 :             rtn = process_connect_to_runtime((struct ctrl_add_runtime_msg*) msg_data);
     350           0 :             if (rtn < 0) {
     351           0 :                 log_error("Error processing connect to runtime message");
     352           0 :                 return -1;
     353             :             }
     354           0 :             break;
     355             :         case CTRL_CREATE_THREAD:
     356           0 :             rtn = process_create_thread_msg((struct ctrl_create_thread_msg*) msg_data);
     357           0 :             if (rtn < 0) {
     358           0 :                 log_error("Error processing create thread message");
     359           0 :                 return -1;
     360             :             }
     361           0 :             break;
     362             :         case CTRL_DELETE_THREAD:
     363           0 :             log_critical("TODO!");
     364           0 :             break;
     365             :         case CTRL_MODIFY_ROUTE:
     366           0 :             rtn = process_ctrl_route_msg((struct ctrl_route_msg*) msg_data);
     367           0 :             if (rtn < 0) {
     368           0 :                 log_error("Error processing control route message");
     369           0 :                 return -1;
     370             :             }
     371           0 :             break;
     372             :         default:
     373           0 :             log_error("Unknown message type delivered to input thread: %d", hdr->type);
     374           0 :             return -1;
     375             :     }
     376           0 :     return 0;
     377             : }
     378             : 
     379             : //TODO: send_ack_message()
     380           0 : int send_ack_message(int id, bool success) {
     381             :     // Not implemented yet
     382           0 :     return 0;
     383             : }
     384             : 
     385             : /** Processes any received control message. */
     386           0 : static int process_ctrl_message_hdr(struct ctrl_runtime_msg_hdr *hdr, int fd) {
     387             : 
     388             :     int rtn;
     389           0 :     switch (hdr->type) {
     390             :         case CTRL_CREATE_MSU:
     391             :         case CTRL_DELETE_MSU:
     392             :         case CTRL_MSU_ROUTES:
     393           0 :             rtn = pass_ctrl_msg_to_thread(hdr, fd);
     394           0 :             if (rtn < 0) {
     395           0 :                 log_error("Error passing control message to thread");
     396           0 :                 return -1;
     397             :             }
     398           0 :             break;
     399             :         case CTRL_CONNECT_TO_RUNTIME:
     400             :         case CTRL_CREATE_THREAD:
     401             :         case CTRL_DELETE_THREAD:
     402             :         case CTRL_MODIFY_ROUTE:
     403           0 :             rtn = process_ctrl_message(hdr, fd);
     404           0 :             if (rtn < 0) {
     405           0 :                 log_error("Error processing control message");
     406           0 :                 send_ack_message(hdr->id, false);
     407           0 :                 return -1;
     408             :             }
     409           0 :             send_ack_message(hdr->id, true);
     410           0 :             break;
     411             :         default:
     412           0 :             log_error("Unknown header type %d in receiving thread", hdr->type);
     413           0 :             return -1;
     414             :     }
     415             : 
     416           0 :     return 0;
     417             : }
     418             : 
     419           0 : int handle_controller_communication(int fd) {
     420             :     struct ctrl_runtime_msg_hdr hdr;
     421           0 :     int rtn = read_payload(fd, sizeof(hdr), &hdr);
     422           0 :     if (rtn< 0) {
     423           0 :         log_error("Error reading control message");
     424           0 :         return -1;
     425           0 :     } else if (rtn == 1) {
     426           0 :         log_critical("Disconnected from global controller");
     427           0 :         close(fd);
     428           0 :         return 1;
     429             :     } else {
     430           0 :         log(LOG_CONTROLLER_COMMUNICATION,
     431             :                    "Read header (type %d) from controller", hdr.type);
     432             :     }
     433             : 
     434           0 :     rtn = process_ctrl_message_hdr(&hdr, fd);
     435           0 :     if (rtn < 0) {
     436           0 :         log_error("Error processing control message");
     437           0 :         return -1;
     438             :     }
     439             : 
     440           0 :     return 0;
     441             : }
     442             : 
     443           0 : bool is_controller_fd(int fd) {
     444           0 :     return fd == controller_sock;
     445             : }
     446             : 
     447             : 
     448           0 : int init_controller_socket(struct sockaddr_in *addr) {
     449           0 :     int sock = connect_to_controller(addr);
     450           0 :     if (sock < 0) {
     451           0 :         log_error("Error connecting to global controller");
     452           0 :         return -1;
     453             :     }
     454           0 :     if (monitor_controller_socket(sock) != 0) {
     455           0 :         log_error("Attempted to initialize controller socket "
     456             :                   "before initializing runtime epoll");
     457           0 :         return -1;
     458             :     }
     459           0 :     return sock;
     460             : }
     461             : 
     462           0 : int send_stats_to_controller() {
     463           0 :     if (controller_sock < 0) {
     464           0 :         log(LOG_STAT_SEND, "Skipping sending statistics: controller not initialized");
     465           0 :         return -1;
     466             :     }
     467           0 :     int rtn = 0;
     468           0 :     int total_items = 0;
     469             :     struct timespec now;
     470           0 :     clock_gettime(CLOCK_REALTIME_COARSE, &now);
     471           0 :     for (int i=0; i<N_REPORTED_STAT_TYPES; i++) {
     472           0 :         enum stat_id stat_id = reported_stat_types[i].id;
     473             :         int n_items;
     474           0 :         struct stat_sample *samples = get_stat_samples(stat_id, &now,  &n_items);
     475           0 :         total_items += n_items;
     476           0 :         if (samples == NULL) {
     477           0 :             log(LOG_STAT_SEND, "Error getting stat sample for send to controller");
     478           0 :             continue;
     479             :         }
     480           0 :         size_t serial_size = serialized_stat_sample_size(samples, n_items);
     481             : 
     482           0 :         char buffer[serial_size];
     483           0 :         size_t ser_rtn = serialize_stat_samples(samples, n_items, buffer, serial_size);
     484             :         if (ser_rtn < 0) {
     485             :             log_error("Error serializing stat sample");
     486             :             rtn = -1;
     487             :         }
     488             : 
     489           0 :         struct rt_controller_msg_hdr hdr = {
     490             :             .type = RT_STATS,
     491             :             .payload_size = ser_rtn
     492             :         };
     493             : 
     494           0 :         int rtn = send_to_controller(&hdr, buffer);
     495           0 :         if (rtn < 0) {
     496           0 :             log_error("Error sending statistics to controller");
     497           0 :             rtn = -1;
     498             :         }
     499             :     }
     500           0 :     log(LOG_STAT_SEND, "Sending %d statistics to controller", total_items);
     501           0 :     return rtn;
     502             : }

Generated by: LCOV version 1.10