LCOV - code coverage report
Current view: top level - runtime - runtime_communication.c (source / functions) Hit Total Coverage
Test: unnamed Lines: 81 137 59.1 %
Date: 2018-01-11 Functions: 9 10 90.0 %

          Line data    Source code
       1             : /*
       2             : START OF LICENSE STUB
       3             :     DeDOS: Declarative Dispersion-Oriented Software
       4             :     Copyright (C) 2017 University of Pennsylvania, Georgetown University
       5             : 
       6             :     This program is free software: you can redistribute it and/or modify
       7             :     it under the terms of the GNU General Public License as published by
       8             :     the Free Software Foundation, either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     This program is distributed in the hope that it will be useful,
      12             :     but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :     GNU General Public License for more details.
      15             : 
      16             :     You should have received a copy of the GNU General Public License
      17             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             : END OF LICENSE STUB
      19             : */
      20             : /**
      21             :  * @file: runtime/runtime_communication.c
      22             :  * All communication to and from other runtimes
      23             :  */
      24             : #include "runtime_communication.h"
      25             : #include "logging.h"
      26             : #include "communication.h"
      27             : #include "msu_message.h"
      28             : #include "inter_runtime_messages.h"
      29             : #include "local_msu.h"
      30             : #include "runtime_dfg.h"
      31             : #include "thread_message.h"
      32             : #include "rt_stats.h"
      33             : #include "socket_monitor.h"
      34             : 
      35             : #include <sys/stat.h>
      36             : #include <sys/types.h>
      37             : #include <sys/socket.h>
      38             : #include <netinet/tcp.h>
      39             : #include <stdlib.h>
      40             : #include <unistd.h>
      41             : 
      42             : /**
      43             :  * Static (global) variable for the socket listening
      44             :  * for other runtimes
      45             :  */
      46             : static int runtime_sock = -1;
      47             : 
      48             : /** Holds the file descriptor for a single runtime peer */
      49             : struct runtime_peer {
      50             :     int fd;
      51             :     // ???: struct dfg_runtime rt
      52             :     // ???: uint32_t ip_address
      53             : };
      54             : 
      55             : /** Maximum number of other runtimes that can connect to this one */
      56             : #define MAX_RUNTIME_ID 32
      57             : 
      58             : /**
      59             :  * Other runtime peer sockets.
      60             :  * Structs will be initialized to 0 due to static initialization.
      61             :  */
      62             : static struct runtime_peer runtime_peers[MAX_RUNTIME_ID];
      63             : 
      64           7 : int send_to_peer(unsigned int runtime_id, struct inter_runtime_msg_hdr *hdr, void *payload) {
      65           7 :     if (runtime_id > MAX_RUNTIME_ID) {
      66           0 :         log_error("Requested peer %d is greater than max runtime ID %d",
      67             :                   runtime_id, MAX_RUNTIME_ID);
      68           0 :         return -1;
      69             :     }
      70           7 :     struct runtime_peer *peer = &runtime_peers[runtime_id];
      71           7 :     if (peer->fd <= 0) {
      72           1 :         log_error("Requested peer %d not instantiated", runtime_id);
      73           1 :         return -1;
      74             :     }
      75             : 
      76           6 :     int rtn = send_to_endpoint(peer->fd, hdr, sizeof(*hdr));
      77           6 :     if (rtn <= 0) {
      78           1 :         log_error("Error sending header to runtime %d", runtime_id);
      79           1 :         return -1;
      80             :     }
      81             : 
      82           5 :     if (hdr->payload_size <= 0) {
      83           0 :         return 0;
      84             :     }
      85             : 
      86           5 :     rtn = send_to_endpoint(peer->fd, payload, hdr->payload_size);
      87           5 :     if (rtn <= 0) {
      88           0 :         log_error("Error sending payload to runtime %d", runtime_id);
      89           0 :         return -1;
      90             :     }
      91           5 :     log(LOG_RUNTIME_COMMUNICATION, "Send a payload of size %d (type %d) to runtime %d (fd: %d)",
      92             :                (int)hdr->payload_size, hdr->type, runtime_id, peer->fd);
      93           5 :     return 0;
      94             : }
      95             : 
      96             : 
      97           4 : int add_runtime_peer(unsigned int runtime_id, int fd) {
      98             :     struct stat buf;
      99           4 :     if (fstat(fd, &buf) != 0) {
     100           1 :         log_error("Cannot register non-descriptor %d for runtime ID %d", fd, runtime_id);
     101           1 :         return -1;
     102             :     }
     103           3 :     if (runtime_id > MAX_RUNTIME_ID) {
     104           0 :         log_error("Runtime ID %d too high!", runtime_id);
     105           0 :         return -1;
     106             :     }
     107           3 :     if (runtime_peers[runtime_id].fd != 0) {
     108           0 :         log_warn("Replacing runtime peer with id %d", runtime_id);
     109             :     }
     110           3 :     runtime_peers[runtime_id].fd = fd;
     111             : 
     112           3 :     int val = 1;
     113           3 :     int rtn = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
     114           3 :     if (rtn < 0) {
     115           2 :         log_perror("Error setting TCP_NODELAY");
     116             :     }
     117             : 
     118           3 :     log_info("Added runtime peer %d (fd: %d)", runtime_id, fd);
     119           3 :     return 0;
     120             : }
     121             : 
     122             : /**
     123             :  * Sends the inter_runtime_init message to the runtime with the given ID
     124             :  */
     125           3 : static int send_init_msg(int id) {
     126           3 :     int local_id = local_runtime_id();
     127           3 :     if (local_id < 0) {
     128           0 :         log_error("Could not send local runtime ID to remote runtime %d", id);
     129           0 :         return -1;
     130             :     }
     131             : 
     132           3 :     struct inter_runtime_init_msg msg = {
     133             :         .origin_id = local_id
     134             :     };
     135             : 
     136           3 :     struct inter_runtime_msg_hdr hdr = {
     137             :         .type = INTER_RT_INIT,
     138             :         .target = 0,
     139             :         .payload_size = sizeof(msg)
     140             :     };
     141             : 
     142           3 :     int rtn = send_to_peer(id, &hdr, &msg);
     143           3 :     if (rtn < 0) {
     144           0 :         log_error("Could not send initial connection message to peer runtime %d", id);
     145           0 :         return -1;
     146             :     }
     147           3 :     return 0;
     148             : }
     149             : 
     150           3 : int connect_to_runtime_peer(unsigned int id, struct sockaddr_in *addr){
     151           3 :     if (runtime_peers[id].fd != 0) {
     152           0 :         log_warn("Attempting to replace runtime peer with id %d", id);
     153             :     }
     154           3 :     int fd = init_connected_socket(addr);
     155           3 :     if (fd < 0) {
     156           1 :         log_error("Could not connect to runtime %u", id);
     157           1 :         return -1;
     158             :     }
     159           2 :     runtime_peers[id].fd = fd;
     160             : 
     161           2 :     int val = 1;
     162           2 :     int rtn = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
     163           2 :     if (rtn < 0) {
     164           0 :         log_perror("Error setting TCP_NODELAY");
     165             :     }
     166             : 
     167           2 :     if (send_init_msg(id) != 0) {
     168           0 :         log_error("Failed to send initialization message to runtime %d (fd: %d)", id, fd);
     169           0 :         close(fd);
     170           0 :         return -1;
     171             :     }
     172           2 :     monitor_runtime_socket(fd);
     173           2 :     log_info("Connected to runtime peer %d (fd: %d)", id, fd);
     174           2 :     return 0;
     175             : }
     176             : 
     177           0 : int init_runtime_socket(int listen_port) {
     178           0 :     if (runtime_sock > 0) {
     179           0 :         log_error("Runtime socket already initialized");
     180           0 :         return -1;
     181             :     }
     182           0 :     int sock = init_listening_socket(listen_port);
     183           0 :     if (sock < 0) {
     184           0 :         log_error("Error initializing runtime socket");
     185           0 :         return -1;
     186             :     }
     187           0 :     return sock;
     188             : }
     189             : 
     190             : /**
     191             :  * Reads a header from a peer runtime
     192             :  */
     193           1 : static int read_runtime_message_hdr(int fd, struct inter_runtime_msg_hdr *msg) {
     194           1 :     if (read_payload(fd, sizeof(*msg), msg) != 0) {
     195           0 :         log_error("Could not read runtime message header from socket %d", fd);
     196           0 :         return -1;
     197             :     }
     198           1 :     return 0;
     199             : }
     200             : 
     201             : /*
     202             :  * Processes a fwd_to_msu message which has just been received from another runtime
     203             :  */
     204           1 : static int process_fwd_to_msu_message(size_t payload_size, int msu_id, int fd) {
     205             : 
     206           1 :     struct local_msu *msu = get_local_msu(msu_id);
     207           1 :     if (msu == NULL) {
     208           0 :         log_error("Error getting MSU with ID %d, requested from runtime fd %d",
     209             :                   msu_id, fd);
     210             :         // Read the payload anyway just so it doesn't mess future things up
     211           0 :         void *unused = malloc(payload_size);
     212           0 :         read_payload(fd, payload_size, unused);
     213           0 :         free(unused);
     214           0 :         return -1;
     215             :     }
     216             : 
     217           1 :     log(LOG_RUNTIME_CONNECTION, "Attempting to read MSU message");
     218           1 :     struct msu_msg *msu_msg = read_msu_msg(msu, fd, payload_size);
     219           1 :     if (msu_msg == NULL) {
     220           0 :         log_error("Error reading MSU msg off of fd %d", fd);
     221           0 :         return -1;
     222             :     }
     223             : 
     224           1 :     int rtn = enqueue_msu_msg(&msu->queue, msu_msg);
     225           1 :     if (rtn < 0) {
     226           0 :         log_error("Error enqueuing inter-msu message to MSU %d from runtime fd %d",
     227             :                   msu_id, fd);
     228           0 :         destroy_msu_msg_and_contents(msu_msg);
     229           0 :         return -1;
     230             :     }
     231           1 :     return 0;
     232             : }
     233             : 
     234             : /**
     235             :  * Processes an init message which has just been received from another runtime
     236             :  */
     237           2 : static int process_init_rt_message(size_t payload_size, int fd) {
     238             : 
     239           2 :     if (payload_size != sizeof(struct inter_runtime_init_msg)) {
     240           0 :         log_warn("Payload size of runtime initialization message does not match init_msg");
     241             :     }
     242             :     struct inter_runtime_init_msg msg;
     243           2 :     if (read_payload(fd, sizeof(msg), &msg) != 0) {
     244           0 :         log_error("Error reading inter_runtime_init_message from fd %d", fd);
     245           0 :         return -1;
     246             :     }
     247             : 
     248           2 :     int rtn = add_runtime_peer(msg.origin_id, fd);
     249           2 :     if (rtn < 0) {
     250           0 :         log_error("Could not add runtime peer %d (fd: %d)", msg.origin_id, fd);
     251           0 :         return -1;
     252             :     }
     253           2 :     log(LOG_RUNTIME_COMMUNICATION, "Runtime peer %d (fd: %d) added", 
     254             :         msg.origin_id, fd);
     255             : 
     256           2 :     return 0;
     257             : }
     258             : 
     259             : /**
     260             :  * Processes the header which has been received on `fd`, and processes the header's payload
     261             :  */
     262           1 : static int process_runtime_message_hdr(struct inter_runtime_msg_hdr *hdr, int fd) {
     263             :     int rtn;
     264           1 :     switch (hdr->type) {
     265             :         case RT_FWD_TO_MSU:
     266           0 :             rtn = process_fwd_to_msu_message(hdr->payload_size, hdr->target, fd);
     267           0 :             if (rtn < 0) {
     268           0 :                 log_error("Error processing forward message from fd %d", fd);
     269           0 :                 return 1;
     270             :             }
     271           0 :             return 0;
     272             :         case INTER_RT_INIT:
     273           1 :             rtn = process_init_rt_message(hdr->payload_size, fd);
     274           1 :             if (rtn < 0) {
     275           0 :                 log_error("Error processing init runtime message from fd %d", fd);
     276           0 :                 return 1;
     277             :             }
     278           1 :             return 0;
     279             :         default:
     280           0 :             log_error("Received unknown message type from fd %d: %d", fd, hdr->type);
     281             :             // Returning 1 because a return of -1 will make the epoll loop exit
     282           0 :             return 1;
     283             :     }
     284             : }
     285             : 
     286           1 : int handle_runtime_communication(int fd) {
     287             :     struct inter_runtime_msg_hdr hdr;
     288           1 :     int rtn = read_runtime_message_hdr(fd, &hdr);
     289             : 
     290           1 :     if (rtn < 0) {
     291           0 :         log_error("Error reading runtime message");
     292             :         // Return of -1 will make epoll loop exit
     293           0 :         return 1;
     294             :     } else {
     295           1 :         log(LOG_INTER_RUNTIME_COMMUNICATION,
     296             :                    "Read message from runtime with fd %d", fd);
     297             :     }
     298             : 
     299           1 :     rtn = process_runtime_message_hdr(&hdr, fd);
     300           1 :     if (rtn < 0) {
     301           0 :         log_error("Error processing inter-runtime message from fd %d", fd);
     302           0 :         return -1;
     303             :     }
     304           1 :     return 0;
     305             : }

Generated by: LCOV version 1.10