Line data Source code
1 : /*
2 : START OF LICENSE STUB
3 : DeDOS: Declarative Dispersion-Oriented Software
4 : Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 :
6 : This program is free software: you can redistribute it and/or modify
7 : it under the terms of the GNU General Public License as published by
8 : the Free Software Foundation, either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : This program is distributed in the hope that it will be useful,
12 : but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : GNU General Public License for more details.
15 :
16 : You should have received a copy of the GNU General Public License
17 : along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : END OF LICENSE STUB
19 : */
20 : /**
21 : * @file output_thread.c
22 : *
23 : * A dedos_thread which monitors a queue for output to be sent to
24 : * other runtimes or the global controller
25 : */
26 : #include "logging.h"
27 : #include "worker_thread.h"
28 : #include "dedos_threads.h"
29 : #include "thread_message.h"
30 : #include "msu_type.h"
31 : #include "controller_communication.h"
32 : #include "ctrl_runtime_messages.h"
33 : #include "output_thread.h"
34 : #include "stats.h"
35 :
36 : #include <stdlib.h>
37 : #include <netinet/ip.h>
38 :
39 : /** A static copy of the output thread, so others can enqueue messages */
40 : static struct dedos_thread *static_output_thread;
41 :
42 : /** Initializes the static copy of the output thread */
43 0 : static void *init_output_thread(struct dedos_thread *output_thread) {
44 0 : static_output_thread = output_thread;
45 0 : return NULL;
46 : }
47 :
48 : /** Process a ::SEND_TO_CTRL message */
49 0 : static int output_thread_send_to_ctrl(struct send_to_ctrl_msg *msg) {
50 0 : int rtn = send_to_controller(&msg->hdr, msg->data);
51 0 : if (rtn < 0) {
52 0 : log_error("Error sending message to controller");
53 0 : return -1;
54 : }
55 0 : free(msg->data);
56 0 : return 0;
57 : }
58 :
59 : /** Process a ::SEND_TO_PEER message */
60 0 : static int output_thread_send_to_peer(struct send_to_peer_msg *msg) {
61 0 : int rtn = send_to_peer(msg->runtime_id, &msg->hdr, msg->data);
62 0 : if (rtn < 0) {
63 0 : log_error("Error sending message to runtime %d", msg->runtime_id);
64 0 : return -1;
65 : }
66 0 : free(msg->data);
67 0 : return 0;
68 : }
69 :
70 : /** Process a ::CONNECT_TO_RUNTIME message */
71 0 : static int output_thread_connect_to_runtime(struct ctrl_add_runtime_msg *msg){
72 : struct sockaddr_in addr;
73 0 : bzero(&addr, sizeof(addr));
74 0 : addr.sin_family = AF_INET;
75 0 : addr.sin_addr.s_addr = msg->ip;
76 0 : addr.sin_port = htons(msg->port);
77 :
78 0 : int rtn = connect_to_runtime_peer(msg->runtime_id, &addr);
79 0 : if (rtn < 0) {
80 0 : log_error("Could not add runtime peer");
81 0 : return -1;
82 : }
83 0 : return 0;
84 : }
85 :
86 0 : void stop_output_monitor() {
87 0 : dedos_thread_stop(static_output_thread);
88 0 : }
89 :
90 0 : void join_output_thread() {
91 0 : pthread_join(static_output_thread->pthread, NULL);
92 0 : free(static_output_thread);
93 0 : }
94 :
95 : /** Checks whether the size of the message matches the size of the target struct */
96 : #define CHECK_MSG_SIZE(msg, target) \
97 : if (msg->data_size != sizeof(target)) { \
98 : log_warn("Message data size (%d) does not match size" \
99 : "of target type (%d)" #target, (int)msg->data_size , \
100 : (int)sizeof(target)); \
101 : return -1; \
102 : } \
103 :
104 : /** Processes a thread message that is delivered to the output thread */
105 0 : static int process_output_thread_msg(struct thread_msg *msg) {
106 :
107 0 : log(LOG_MAIN_THREAD, "processing message %p with type id: %d",
108 : msg, msg->type);
109 0 : int rtn = -1;
110 0 : switch (msg->type) {
111 : case CONNECT_TO_RUNTIME:
112 0 : CHECK_MSG_SIZE(msg, struct ctrl_add_runtime_msg);
113 0 : struct ctrl_add_runtime_msg *runtime_msg = msg->data;
114 0 : rtn = output_thread_connect_to_runtime(runtime_msg);
115 :
116 0 : if (rtn < 0) {
117 0 : log_warn("Error adding runtime peer");
118 : }
119 0 : break;
120 : case SEND_TO_PEER:
121 0 : CHECK_MSG_SIZE(msg, struct send_to_peer_msg);
122 0 : struct send_to_peer_msg *forward_msg = msg->data;
123 0 : rtn = output_thread_send_to_peer(forward_msg);
124 :
125 0 : if (rtn < 0) {
126 0 : log_warn("Error forwarding message to peer");
127 : }
128 0 : break;
129 : case SEND_TO_CTRL:
130 0 : CHECK_MSG_SIZE(msg, struct send_to_ctrl_msg);
131 0 : struct send_to_ctrl_msg *ctrl_msg = msg->data;
132 0 : rtn = output_thread_send_to_ctrl(ctrl_msg);
133 0 : if (rtn < 0) {
134 0 : log_warn("Error sending message to controller");
135 : }
136 0 : break;
137 : case CREATE_MSU:
138 : case DELETE_MSU:
139 : case MSU_ROUTE:
140 0 : log_error("Message (type: %d) meant for worker thread sent to output thread",
141 : msg->type);
142 0 : break;
143 : default:
144 0 : log_error("Unknown message type %d delivered to output thread", msg->type);
145 0 : break;
146 : }
147 0 : return rtn;
148 : }
149 :
150 : /** Checks the queue of the output thread for messages and acts on them if present */
151 0 : static int check_output_thread_queue(struct dedos_thread *output_thread) {
152 :
153 0 : struct thread_msg *msg = dequeue_thread_msg(&output_thread->queue);
154 :
155 0 : if (msg == NULL) {
156 0 : return 0;
157 : }
158 :
159 0 : int rtn = process_output_thread_msg(msg);
160 0 : free(msg);
161 0 : if (rtn < 0) {
162 0 : log_error("Error processing thread msg");
163 : }
164 0 : return 0;
165 : }
166 :
167 : /** How often to report statistics */
168 : #define STAT_REPORTING_DURATION_MS STAT_SAMPLE_PERIOD_MS
169 :
170 : /**
171 : * The main thread loop for the output thread.
172 : * Checks the queue for messages, sends them, sends stats messages
173 : */
174 0 : static int output_thread_loop(struct dedos_thread *self, void UNUSED *init_data) {
175 :
176 : struct timespec elapsed;
177 : struct timespec timeout_abs;
178 0 : clock_gettime(CLOCK_REALTIME, &timeout_abs);
179 0 : while (!dedos_thread_should_exit(self)) {
180 :
181 0 : clock_gettime(CLOCK_REALTIME, &elapsed);
182 0 : if (elapsed.tv_sec > timeout_abs.tv_sec || (elapsed.tv_sec == timeout_abs.tv_sec &&
183 0 : elapsed.tv_nsec > timeout_abs.tv_nsec)) {
184 0 : if (send_stats_to_controller() < 0) {
185 0 : log(LOG_STATS_SEND, "Error sending stats to controller");
186 : }
187 0 : log(LOG_STATS_SEND, "Sent stats");
188 0 : timeout_abs = elapsed;
189 0 : timeout_abs.tv_nsec += (long)STAT_REPORTING_DURATION_MS * 1e6;
190 0 : while (timeout_abs.tv_nsec > 1e9) {
191 0 : timeout_abs.tv_nsec -= 1e9;
192 0 : timeout_abs.tv_sec += 1;
193 : }
194 0 : timeout_abs.tv_sec += (long)STAT_REPORTING_DURATION_MS / 1000;
195 : }
196 :
197 0 : int rtn = thread_wait(self, &timeout_abs);
198 0 : if (rtn < 0) {
199 0 : log_error("Error waiting on output thread semaphore");
200 0 : return -1;
201 : }
202 :
203 0 : rtn = check_output_thread_queue(self);
204 0 : if (rtn != 0) {
205 0 : log_info("Breaking from output loop "
206 : "due to thread queue");
207 0 : break;
208 : }
209 : }
210 0 : return 0;
211 : }
212 :
213 0 : int enqueue_for_output(struct thread_msg *msg) {
214 0 : int rtn = enqueue_thread_msg(msg, &static_output_thread->queue);
215 0 : if (rtn < 0) {
216 0 : log_error("Error enqueuing message %p to main thread", msg);
217 0 : return -1;
218 : }
219 0 : log(MAIN_THREAD, "Enqueued message %p to main thread queue", msg);
220 0 : return 0;
221 : }
222 :
223 0 : struct dedos_thread *start_output_monitor_thread(void) {
224 0 : struct dedos_thread *output_thread = malloc(sizeof(*output_thread));
225 0 : if (output_thread == NULL) {
226 0 : log_perror("Error allocating output thread");
227 0 : return NULL;
228 : }
229 0 : int rtn = start_dedos_thread(output_thread_loop,
230 : init_output_thread,
231 : NULL,
232 : UNPINNED_THREAD,
233 : OUTPUT_THREAD_ID,
234 : output_thread);
235 0 : if (rtn < 0) {
236 0 : log_error("Error starting output thread loop");
237 0 : return NULL;
238 : }
239 0 : log_info("Started output thread loop");
240 0 : return output_thread;
241 : }
242 :
|