LCOV - code coverage report
Current view: top level - runtime - message_queue.c (source / functions) Hit Total Coverage
Test: unnamed Lines: 48 78 61.5 %
Date: 2018-01-11 Functions: 4 5 80.0 %

          Line data    Source code
       1             : /*
       2             : START OF LICENSE STUB
       3             :     DeDOS: Declarative Dispersion-Oriented Software
       4             :     Copyright (C) 2017 University of Pennsylvania, Georgetown University
       5             : 
       6             :     This program is free software: you can redistribute it and/or modify
       7             :     it under the terms of the GNU General Public License as published by
       8             :     the Free Software Foundation, either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     This program is distributed in the hope that it will be useful,
      12             :     but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :     GNU General Public License for more details.
      15             : 
      16             :     You should have received a copy of the GNU General Public License
      17             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             : END OF LICENSE STUB
      19             : */
      20             : /**
      21             :  * @file message_queue.c
      22             :  *
      23             :  * Structures and functions for enqueueing and dequeuing general-purpose
      24             :  * messages from a queue
      25             :  */
      26             : #include "message_queue.h"
      27             : #include "logging.h"
      28             : 
      29             : #include <time.h>
      30             : 
      31             : /** Interval 0 seconds from now */
      32             : static struct timespec zero = {};
      33             : 
      34           4 : int enqueue_msg(struct msg_queue *q, struct dedos_msg *msg) {
      35           4 :     return schedule_msg(q, msg, &zero);
      36             : }
      37             : 
      38           4 : int schedule_msg(struct msg_queue *q, struct dedos_msg *msg, struct timespec *interval) {
      39           4 :     if (interval->tv_sec == 0 && interval->tv_nsec == 0) {
      40           4 :         msg->delivery_time = *interval;
      41             :     } else {
      42           0 :         clock_gettime(CLOCK_MONOTONIC_RAW, &msg->delivery_time);
      43           0 :         msg->delivery_time.tv_sec += interval->tv_sec;
      44           0 :         msg->delivery_time.tv_nsec += interval->tv_nsec;
      45           0 :         if (msg->delivery_time.tv_nsec > 1e9) {
      46           0 :             msg->delivery_time.tv_nsec -= 1e9;
      47           0 :             msg->delivery_time.tv_sec += 1;
      48             :         }
      49             :     }
      50           4 :     if (q->shared) {
      51           4 :         if (pthread_mutex_lock(&q->mutex) != 0) {
      52           0 :             log_error("Error locking msg queue mutex");
      53           0 :             return -1;
      54             :         }
      55             :     }
      56             : 
      57           4 :     msg->next = NULL;
      58           4 :     if (!q->head) {
      59           2 :         q->head = msg;
      60           2 :         q->tail = msg;
      61           2 :         q->num_msgs = 1;
      62             :     } else {
      63           2 :         q->tail->next = msg;
      64           2 :         q->tail = msg;
      65           2 :         q->num_msgs++;
      66             :     }
      67             : 
      68           4 :     if (q->shared) {
      69           4 :         if (pthread_mutex_unlock(&q->mutex) != 0) {
      70           0 :             log_error("Error unlocking msg queue mutex");
      71           0 :             return -1;
      72             :         }
      73             :     }
      74             : 
      75           4 :     if (q->sem) {
      76           1 :         sem_post(q->sem);
      77             :     }
      78             : 
      79           4 :     return 0;
      80             : }
      81             : 
      82             : /** Returns the difference in time in seconds, t2 - t1 */
      83           0 : static double timediff_s(struct timespec *t1, struct timespec *t2) {
      84           0 :     return (double)(t2->tv_sec - t1->tv_sec) + (double)(t2->tv_nsec - t1->tv_nsec) * 1e-9;
      85             : }
      86             : 
      87           4 : struct dedos_msg *dequeue_msg(struct msg_queue *q) {
      88           4 :     if (q->shared) {
      89           4 :         pthread_mutex_lock(&q->mutex);
      90             :     }
      91             : 
      92           4 :     struct dedos_msg *msg = q->head;
      93           4 :     if (msg == NULL) {
      94           1 :         if (q->shared) {
      95           1 :             pthread_mutex_unlock(&q->mutex);
      96             :         }
      97           1 :         return NULL;
      98             :     }
      99             : 
     100           3 :     int num_checked = 0;
     101             :     // Loop until you find a message with a good delivery time
     102             :     do {
     103             :         // If delivery time is 0, always deliver now
     104           3 :         if (msg->delivery_time.tv_sec == 0) {
     105           6 :             break;
     106             :         }
     107             :         struct timespec cur_time;
     108           0 :         clock_gettime(CLOCK_MONOTONIC_RAW, &cur_time);
     109           0 :         double diff = timediff_s(&msg->delivery_time, &cur_time);
     110           0 :         if (diff >= 0) {
     111           0 :             break;
     112             :         }
     113             :         // Otherwise, move it to the back of the queue
     114           0 :         q->tail->next = msg;
     115           0 :         q->tail = msg;
     116           0 :         q->head = msg->next;
     117           0 :         msg->next = NULL;
     118             :         // If it's the only message, return NULL
     119           0 :         if (q->head == msg) {
     120           0 :             if (q->shared) {
     121           0 :                 pthread_mutex_unlock(&q->mutex);
     122             :             }
     123           0 :             return NULL;
     124             :         }
     125             :         // Otherwise, move to the next message
     126           0 :         msg = q->head;
     127           0 :         num_checked++;
     128           0 :     } while (num_checked < q->num_msgs);
     129             : 
     130             :     // If we've checked all the messages
     131           3 :     if (num_checked == q->num_msgs) {
     132           0 :         if (q->shared) {
     133           0 :             pthread_mutex_unlock(&q->mutex);
     134             :         }
     135           0 :         return NULL;
     136             :     }
     137             : 
     138           3 :     q->head = msg->next;
     139           3 :     q->num_msgs--;
     140           3 :     if (q->head == NULL) {
     141           1 :         q->tail = NULL;
     142             :     }
     143             : 
     144           3 :     msg->next = NULL;
     145             : 
     146           3 :     if (q->shared) {
     147           3 :         pthread_mutex_unlock(&q->mutex);
     148             :     }
     149             : 
     150           3 :     return msg;
     151             : }
     152             : 
     153           2 : int init_msg_queue(struct msg_queue *q, sem_t *sem) {
     154           2 :     q->num_msgs = 0;
     155           2 :     q->head = NULL;
     156           2 :     q->tail = NULL;
     157           2 :     pthread_mutex_init(&q->mutex, NULL);
     158           2 :     q->sem = sem;
     159           2 :     q->shared = true;
     160           2 :     return 0;
     161             : }

Generated by: LCOV version 1.10