Line data Source code
1 : /*
2 : START OF LICENSE STUB
3 : DeDOS: Declarative Dispersion-Oriented Software
4 : Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 :
6 : This program is free software: you can redistribute it and/or modify
7 : it under the terms of the GNU General Public License as published by
8 : the Free Software Foundation, either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : This program is distributed in the hope that it will be useful,
12 : but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : GNU General Public License for more details.
15 :
16 : You should have received a copy of the GNU General Public License
17 : along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : END OF LICENSE STUB
19 : */
20 : /**
21 : * @file message_queue.c
22 : *
23 : * Structures and functions for enqueueing and dequeuing general-purpose
24 : * messages from a queue
25 : */
26 : #include "message_queue.h"
27 : #include "logging.h"
28 :
29 : #include <time.h>
30 :
31 : /** Interval 0 seconds from now */
32 : static struct timespec zero = {};
33 :
34 4 : int enqueue_msg(struct msg_queue *q, struct dedos_msg *msg) {
35 4 : return schedule_msg(q, msg, &zero);
36 : }
37 :
38 4 : int schedule_msg(struct msg_queue *q, struct dedos_msg *msg, struct timespec *interval) {
39 4 : if (interval->tv_sec == 0 && interval->tv_nsec == 0) {
40 4 : msg->delivery_time = *interval;
41 : } else {
42 0 : clock_gettime(CLOCK_MONOTONIC_RAW, &msg->delivery_time);
43 0 : msg->delivery_time.tv_sec += interval->tv_sec;
44 0 : msg->delivery_time.tv_nsec += interval->tv_nsec;
45 0 : if (msg->delivery_time.tv_nsec > 1e9) {
46 0 : msg->delivery_time.tv_nsec -= 1e9;
47 0 : msg->delivery_time.tv_sec += 1;
48 : }
49 : }
50 4 : if (q->shared) {
51 4 : if (pthread_mutex_lock(&q->mutex) != 0) {
52 0 : log_error("Error locking msg queue mutex");
53 0 : return -1;
54 : }
55 : }
56 :
57 4 : msg->next = NULL;
58 4 : if (!q->head) {
59 2 : q->head = msg;
60 2 : q->tail = msg;
61 2 : q->num_msgs = 1;
62 : } else {
63 2 : q->tail->next = msg;
64 2 : q->tail = msg;
65 2 : q->num_msgs++;
66 : }
67 :
68 4 : if (q->shared) {
69 4 : if (pthread_mutex_unlock(&q->mutex) != 0) {
70 0 : log_error("Error unlocking msg queue mutex");
71 0 : return -1;
72 : }
73 : }
74 :
75 4 : if (q->sem) {
76 1 : sem_post(q->sem);
77 : }
78 :
79 4 : return 0;
80 : }
81 :
82 : /** Returns the difference in time in seconds, t2 - t1 */
83 0 : static double timediff_s(struct timespec *t1, struct timespec *t2) {
84 0 : return (double)(t2->tv_sec - t1->tv_sec) + (double)(t2->tv_nsec - t1->tv_nsec) * 1e-9;
85 : }
86 :
87 4 : struct dedos_msg *dequeue_msg(struct msg_queue *q) {
88 4 : if (q->shared) {
89 4 : pthread_mutex_lock(&q->mutex);
90 : }
91 :
92 4 : struct dedos_msg *msg = q->head;
93 4 : if (msg == NULL) {
94 1 : if (q->shared) {
95 1 : pthread_mutex_unlock(&q->mutex);
96 : }
97 1 : return NULL;
98 : }
99 :
100 3 : int num_checked = 0;
101 : // Loop until you find a message with a good delivery time
102 : do {
103 : // If delivery time is 0, always deliver now
104 3 : if (msg->delivery_time.tv_sec == 0) {
105 6 : break;
106 : }
107 : struct timespec cur_time;
108 0 : clock_gettime(CLOCK_MONOTONIC_RAW, &cur_time);
109 0 : double diff = timediff_s(&msg->delivery_time, &cur_time);
110 0 : if (diff >= 0) {
111 0 : break;
112 : }
113 : // Otherwise, move it to the back of the queue
114 0 : q->tail->next = msg;
115 0 : q->tail = msg;
116 0 : q->head = msg->next;
117 0 : msg->next = NULL;
118 : // If it's the only message, return NULL
119 0 : if (q->head == msg) {
120 0 : if (q->shared) {
121 0 : pthread_mutex_unlock(&q->mutex);
122 : }
123 0 : return NULL;
124 : }
125 : // Otherwise, move to the next message
126 0 : msg = q->head;
127 0 : num_checked++;
128 0 : } while (num_checked < q->num_msgs);
129 :
130 : // If we've checked all the messages
131 3 : if (num_checked == q->num_msgs) {
132 0 : if (q->shared) {
133 0 : pthread_mutex_unlock(&q->mutex);
134 : }
135 0 : return NULL;
136 : }
137 :
138 3 : q->head = msg->next;
139 3 : q->num_msgs--;
140 3 : if (q->head == NULL) {
141 1 : q->tail = NULL;
142 : }
143 :
144 3 : msg->next = NULL;
145 :
146 3 : if (q->shared) {
147 3 : pthread_mutex_unlock(&q->mutex);
148 : }
149 :
150 3 : return msg;
151 : }
152 :
153 2 : int init_msg_queue(struct msg_queue *q, sem_t *sem) {
154 2 : q->num_msgs = 0;
155 2 : q->head = NULL;
156 2 : q->tail = NULL;
157 2 : pthread_mutex_init(&q->mutex, NULL);
158 2 : q->sem = sem;
159 2 : q->shared = true;
160 2 : return 0;
161 : }
|