My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
message_queue.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
26 #include "message_queue.h"
27 #include "logging.h"
28 
29 #include <time.h>
30 
32 static struct timespec zero = {};
33 
34 int enqueue_msg(struct msg_queue *q, struct dedos_msg *msg) {
35  return schedule_msg(q, msg, &zero);
36 }
37 
38 int schedule_msg(struct msg_queue *q, struct dedos_msg *msg, struct timespec *interval) {
39  if (interval->tv_sec == 0 && interval->tv_nsec == 0) {
40  msg->delivery_time = *interval;
41  } else {
42  clock_gettime(CLOCK_MONOTONIC_RAW, &msg->delivery_time);
43  msg->delivery_time.tv_sec += interval->tv_sec;
44  msg->delivery_time.tv_nsec += interval->tv_nsec;
45  if (msg->delivery_time.tv_nsec > 1e9) {
46  msg->delivery_time.tv_nsec -= 1e9;
47  msg->delivery_time.tv_sec += 1;
48  }
49  }
50  if (q->shared) {
51  if (pthread_mutex_lock(&q->mutex) != 0) {
52  log_error("Error locking msg queue mutex");
53  return -1;
54  }
55  }
56 
57  msg->next = NULL;
58  if (!q->head) {
59  q->head = msg;
60  q->tail = msg;
61  q->num_msgs = 1;
62  } else {
63  q->tail->next = msg;
64  q->tail = msg;
65  q->num_msgs++;
66  }
67 
68  if (q->shared) {
69  if (pthread_mutex_unlock(&q->mutex) != 0) {
70  log_error("Error unlocking msg queue mutex");
71  return -1;
72  }
73  }
74 
75  if (q->sem) {
76  sem_post(q->sem);
77  }
78 
79  return 0;
80 }
81 
83 static double timediff_s(struct timespec *t1, struct timespec *t2) {
84  return (double)(t2->tv_sec - t1->tv_sec) + (double)(t2->tv_nsec - t1->tv_nsec) * 1e-9;
85 }
86 
87 struct dedos_msg *dequeue_msg(struct msg_queue *q) {
88  if (q->shared) {
89  pthread_mutex_lock(&q->mutex);
90  }
91 
92  struct dedos_msg *msg = q->head;
93  if (msg == NULL) {
94  if (q->shared) {
95  pthread_mutex_unlock(&q->mutex);
96  }
97  return NULL;
98  }
99 
100  int num_checked = 0;
101  // Loop until you find a message with a good delivery time
102  do {
103  // If delivery time is 0, always deliver now
104  if (msg->delivery_time.tv_sec == 0) {
105  break;
106  }
107  struct timespec cur_time;
108  clock_gettime(CLOCK_MONOTONIC_RAW, &cur_time);
109  double diff = timediff_s(&msg->delivery_time, &cur_time);
110  if (diff >= 0) {
111  break;
112  }
113  // Otherwise, move it to the back of the queue
114  q->tail->next = msg;
115  q->tail = msg;
116  q->head = msg->next;
117  msg->next = NULL;
118  // If it's the only message, return NULL
119  if (q->head == msg) {
120  if (q->shared) {
121  pthread_mutex_unlock(&q->mutex);
122  }
123  return NULL;
124  }
125  // Otherwise, move to the next message
126  msg = q->head;
127  num_checked++;
128  } while (num_checked < q->num_msgs);
129 
130  // If we've checked all the messages
131  if (num_checked == q->num_msgs) {
132  if (q->shared) {
133  pthread_mutex_unlock(&q->mutex);
134  }
135  return NULL;
136  }
137 
138  q->head = msg->next;
139  q->num_msgs--;
140  if (q->head == NULL) {
141  q->tail = NULL;
142  }
143 
144  msg->next = NULL;
145 
146  if (q->shared) {
147  pthread_mutex_unlock(&q->mutex);
148  }
149 
150  return msg;
151 }
152 
153 int init_msg_queue(struct msg_queue *q, sem_t *sem) {
154  q->num_msgs = 0;
155  q->head = NULL;
156  q->tail = NULL;
157  pthread_mutex_init(&q->mutex, NULL);
158  q->sem = sem;
159  q->shared = true;
160  return 0;
161 }
static double timediff_s(struct timespec *t1, struct timespec *t2)
Returns the difference in time in seconds, t2 - t1.
Definition: message_queue.c:83
struct dedos_msg * dequeue_msg(struct msg_queue *q)
Dequeues the first available message from q.
Definition: message_queue.c:87
Container for linked list message queue.
Definition: message_queue.h:56
Logging of status messages to the terminal.
static struct timespec zero
Interval 0 seconds from now.
Definition: message_queue.c:32
A linked-list entry containing a message.
Definition: message_queue.h:42
int schedule_msg(struct msg_queue *q, struct dedos_msg *msg, struct timespec *interval)
Schedules a message to be delivered once interval time has passed.
Definition: message_queue.c:38
#define log_error(fmt,...)
Definition: logging.h:101
sem_t * sem
Post to this semaphore on each new enqueue.
Definition: message_queue.h:62
struct dedos_msg * tail
Last entry in the queue.
Definition: message_queue.h:59
Structures and functions for enqueueing and dequeuing general-purpose messages from a queue...
int enqueue_msg(struct msg_queue *q, struct dedos_msg *msg)
Enqueues a message to be delivered as soon as possible.
Definition: message_queue.c:34
struct dedos_msg * head
First entry in the queue.
Definition: message_queue.h:58
struct timespec delivery_time
Message is ignored until this time (in CLOCK_MONOTONIC_COARSE) has passed.
Definition: message_queue.h:52
uint32_t num_msgs
Number of messages currently in the queue.
Definition: message_queue.h:57
struct dedos_msg * next
The next message in the list, or NULL if N/A.
Definition: message_queue.h:44
bool shared
Whether the queue needs to be locked (always true at the moment)
Definition: message_queue.h:61
pthread_mutex_t mutex
Mutex if the queue is shared.
Definition: message_queue.h:60
int init_msg_queue(struct msg_queue *q, sem_t *sem)
Initilializes a mesasge queue to have no messages in it, and sets up the mutex and semaphore...