LCOV - code coverage report
Current view: top level - runtime - output_thread.c (source / functions) Hit Total Coverage
Test: unnamed Lines: 0 112 0.0 %
Date: 2018-01-11 Functions: 0 11 0.0 %

          Line data    Source code
       1             : /*
       2             : START OF LICENSE STUB
       3             :     DeDOS: Declarative Dispersion-Oriented Software
       4             :     Copyright (C) 2017 University of Pennsylvania, Georgetown University
       5             : 
       6             :     This program is free software: you can redistribute it and/or modify
       7             :     it under the terms of the GNU General Public License as published by
       8             :     the Free Software Foundation, either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     This program is distributed in the hope that it will be useful,
      12             :     but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :     GNU General Public License for more details.
      15             : 
      16             :     You should have received a copy of the GNU General Public License
      17             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             : END OF LICENSE STUB
      19             : */
      20             : /**
      21             :  * @file output_thread.c
      22             :  *
      23             :  * A dedos_thread which monitors a queue for output to be sent to
      24             :  * other runtimes or the global controller
      25             :  */
      26             : #include "logging.h"
      27             : #include "worker_thread.h"
      28             : #include "dedos_threads.h"
      29             : #include "thread_message.h"
      30             : #include "msu_type.h"
      31             : #include "controller_communication.h"
      32             : #include "ctrl_runtime_messages.h"
      33             : #include "output_thread.h"
      34             : #include "stats.h"
      35             : 
      36             : #include <stdlib.h>
      37             : #include <netinet/ip.h>
      38             : 
      39             : /** A static copy of the output thread, so others can enqueue messages */
      40             : static struct dedos_thread *static_output_thread;
      41             : 
      42             : /** Initializes the static copy of the output thread */
      43           0 : static void *init_output_thread(struct dedos_thread *output_thread) {
      44           0 :     static_output_thread = output_thread;
      45           0 :     return NULL;
      46             : }
      47             : 
      48             : /** Process a ::SEND_TO_CTRL message */
      49           0 : static int output_thread_send_to_ctrl(struct send_to_ctrl_msg *msg) {
      50           0 :     int rtn = send_to_controller(&msg->hdr, msg->data);
      51           0 :     if (rtn < 0) {
      52           0 :         log_error("Error sending message to controller");
      53           0 :         return -1;
      54             :     }
      55           0 :     free(msg->data);
      56           0 :     return 0;
      57             : }
      58             : 
      59             : /** Process a ::SEND_TO_PEER message */
      60           0 : static int output_thread_send_to_peer(struct send_to_peer_msg *msg) {
      61           0 :     int rtn = send_to_peer(msg->runtime_id, &msg->hdr, msg->data);
      62           0 :     if (rtn < 0) {
      63           0 :         log_error("Error sending message to runtime %d", msg->runtime_id);
      64           0 :         return -1;
      65             :     }
      66           0 :     free(msg->data);
      67           0 :     return 0;
      68             : }
      69             : 
      70             : /** Process a ::CONNECT_TO_RUNTIME message */
      71           0 : static int output_thread_connect_to_runtime(struct ctrl_add_runtime_msg *msg){
      72             :     struct sockaddr_in addr;
      73           0 :     bzero(&addr, sizeof(addr));
      74           0 :     addr.sin_family = AF_INET;
      75           0 :     addr.sin_addr.s_addr = msg->ip;
      76           0 :     addr.sin_port = htons(msg->port);
      77             : 
      78           0 :     int rtn = connect_to_runtime_peer(msg->runtime_id, &addr);
      79           0 :     if (rtn < 0) {
      80           0 :         log_error("Could not add runtime peer");
      81           0 :         return -1;
      82             :     }
      83           0 :     return 0;
      84             : }
      85             : 
      86           0 : void stop_output_monitor() {
      87           0 :     dedos_thread_stop(static_output_thread);
      88           0 : }
      89             : 
      90           0 : void join_output_thread() {
      91           0 :     pthread_join(static_output_thread->pthread, NULL);
      92           0 :     free(static_output_thread);
      93           0 : }
      94             : 
      95             : /** Checks whether the size of the message matches the size of the target struct */
      96             : #define CHECK_MSG_SIZE(msg, target) \
      97             :     if (msg->data_size != sizeof(target)) { \
      98             :         log_warn("Message data size (%d) does not match size" \
      99             :                  "of target type (%d)" #target, (int)msg->data_size , \
     100             :                  (int)sizeof(target)); \
     101             :         return -1; \
     102             :     } \
     103             : 
     104             : /** Processes a thread message that is  delivered to the output thread */
     105           0 : static int process_output_thread_msg(struct thread_msg *msg) {
     106             : 
     107           0 :     log(LOG_MAIN_THREAD, "processing message %p with type id: %d",
     108             :         msg, msg->type);
     109           0 :     int rtn = -1;
     110           0 :     switch (msg->type) {
     111             :         case CONNECT_TO_RUNTIME:
     112           0 :             CHECK_MSG_SIZE(msg, struct ctrl_add_runtime_msg);
     113           0 :             struct ctrl_add_runtime_msg *runtime_msg = msg->data;
     114           0 :             rtn = output_thread_connect_to_runtime(runtime_msg);
     115             : 
     116           0 :             if (rtn < 0) {
     117           0 :                 log_warn("Error adding runtime peer");
     118             :             }
     119           0 :             break;
     120             :         case SEND_TO_PEER:
     121           0 :             CHECK_MSG_SIZE(msg, struct send_to_peer_msg);
     122           0 :             struct send_to_peer_msg *forward_msg = msg->data;
     123           0 :             rtn = output_thread_send_to_peer(forward_msg);
     124             : 
     125           0 :             if (rtn < 0) {
     126           0 :                 log_warn("Error forwarding message to peer");
     127             :             }
     128           0 :             break;
     129             :         case SEND_TO_CTRL:
     130           0 :             CHECK_MSG_SIZE(msg, struct send_to_ctrl_msg);
     131           0 :             struct send_to_ctrl_msg *ctrl_msg = msg->data;
     132           0 :             rtn = output_thread_send_to_ctrl(ctrl_msg);
     133           0 :             if (rtn < 0) {
     134           0 :                 log_warn("Error sending message to controller");
     135             :             }
     136           0 :             break;
     137             :         case CREATE_MSU:
     138             :         case DELETE_MSU:
     139             :         case MSU_ROUTE:
     140           0 :             log_error("Message (type: %d) meant for worker thread sent to output thread",
     141             :                        msg->type);
     142           0 :             break;
     143             :         default:
     144           0 :             log_error("Unknown message type %d delivered to output thread", msg->type);
     145           0 :             break;
     146             :     }
     147           0 :     return rtn;
     148             : }
     149             : 
     150             : /** Checks the queue of the output thread for messages and acts on them if present */
     151           0 : static int check_output_thread_queue(struct dedos_thread *output_thread) {
     152             : 
     153           0 :     struct thread_msg *msg = dequeue_thread_msg(&output_thread->queue);
     154             : 
     155           0 :     if (msg == NULL) {
     156           0 :         return 0;
     157             :     }
     158             : 
     159           0 :     int rtn = process_output_thread_msg(msg);
     160           0 :     free(msg);
     161           0 :     if (rtn < 0) {
     162           0 :         log_error("Error processing thread msg");
     163             :     }
     164           0 :     return 0;
     165             : }
     166             : 
     167             : /** How often to report statistics */
     168             : #define STAT_REPORTING_DURATION_MS STAT_SAMPLE_PERIOD_MS
     169             : 
     170             : /**
     171             :  * The main thread loop for the output thread.
     172             :  * Checks the queue for messages, sends them, sends stats messages
     173             :  */
     174           0 : static int output_thread_loop(struct dedos_thread *self, void UNUSED *init_data) {
     175             : 
     176             :     struct timespec elapsed;
     177             :     struct timespec timeout_abs;
     178           0 :     clock_gettime(CLOCK_REALTIME, &timeout_abs);
     179           0 :     while (!dedos_thread_should_exit(self)) {
     180             : 
     181           0 :         clock_gettime(CLOCK_REALTIME, &elapsed);
     182           0 :         if (elapsed.tv_sec > timeout_abs.tv_sec || (elapsed.tv_sec == timeout_abs.tv_sec &&
     183           0 :                                                     elapsed.tv_nsec > timeout_abs.tv_nsec)) {
     184           0 :             if (send_stats_to_controller() < 0) {
     185           0 :                 log(LOG_STATS_SEND, "Error sending stats to controller");
     186             :             }
     187           0 :             log(LOG_STATS_SEND, "Sent stats");
     188           0 :             timeout_abs = elapsed;
     189           0 :             timeout_abs.tv_nsec += (long)STAT_REPORTING_DURATION_MS * 1e6;
     190           0 :             while (timeout_abs.tv_nsec > 1e9) {
     191           0 :                 timeout_abs.tv_nsec -= 1e9;
     192           0 :                 timeout_abs.tv_sec += 1;
     193             :             }
     194           0 :             timeout_abs.tv_sec += (long)STAT_REPORTING_DURATION_MS / 1000;
     195             :         }
     196             : 
     197           0 :         int rtn = thread_wait(self, &timeout_abs);
     198           0 :         if (rtn < 0) {
     199           0 :             log_error("Error waiting on output thread semaphore");
     200           0 :             return -1;
     201             :         }
     202             : 
     203           0 :         rtn = check_output_thread_queue(self);
     204           0 :         if (rtn != 0) {
     205           0 :             log_info("Breaking from output loop "
     206             :                      "due to thread queue");
     207           0 :             break;
     208             :         }
     209             :     }
     210           0 :     return 0;
     211             : }
     212             : 
     213           0 : int enqueue_for_output(struct thread_msg *msg) {
     214           0 :     int rtn = enqueue_thread_msg(msg, &static_output_thread->queue);
     215           0 :     if (rtn < 0) {
     216           0 :         log_error("Error enqueuing message %p to main thread", msg);
     217           0 :         return -1;
     218             :     }
     219           0 :     log(MAIN_THREAD, "Enqueued message %p to main thread queue", msg);
     220           0 :     return 0;
     221             : }
     222             : 
     223           0 : struct dedos_thread *start_output_monitor_thread(void) {
     224           0 :     struct dedos_thread *output_thread = malloc(sizeof(*output_thread));
     225           0 :     if (output_thread == NULL) {
     226           0 :         log_perror("Error allocating output thread");
     227           0 :         return NULL;
     228             :     }
     229           0 :     int rtn = start_dedos_thread(output_thread_loop,
     230             :                                  init_output_thread,
     231             :                                  NULL,
     232             :                                  UNPINNED_THREAD,
     233             :                                  OUTPUT_THREAD_ID,
     234             :                                  output_thread);
     235           0 :     if (rtn < 0) {
     236           0 :         log_error("Error starting output thread loop");
     237           0 :         return NULL;
     238             :     }
     239           0 :     log_info("Started output thread loop");
     240           0 :     return output_thread;
     241             : }
     242             : 

Generated by: LCOV version 1.10