My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
output_thread.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
26 #include "logging.h"
27 #include "worker_thread.h"
28 #include "dedos_threads.h"
29 #include "thread_message.h"
30 #include "msu_type.h"
32 #include "ctrl_runtime_messages.h"
33 #include "output_thread.h"
34 #include "stats.h"
35 
36 #include <stdlib.h>
37 #include <netinet/ip.h>
38 
41 
43 static void *init_output_thread(struct dedos_thread *output_thread) {
44  static_output_thread = output_thread;
45  return NULL;
46 }
47 
50  int rtn = send_to_controller(&msg->hdr, msg->data);
51  if (rtn < 0) {
52  log_error("Error sending message to controller");
53  return -1;
54  }
55  free(msg->data);
56  return 0;
57 }
58 
61  int rtn = send_to_peer(msg->runtime_id, &msg->hdr, msg->data);
62  if (rtn < 0) {
63  log_error("Error sending message to runtime %d", msg->runtime_id);
64  return -1;
65  }
66  free(msg->data);
67  return 0;
68 }
69 
72  struct sockaddr_in addr;
73  bzero(&addr, sizeof(addr));
74  addr.sin_family = AF_INET;
75  addr.sin_addr.s_addr = msg->ip;
76  addr.sin_port = htons(msg->port);
77 
78  int rtn = connect_to_runtime_peer(msg->runtime_id, &addr);
79  if (rtn < 0) {
80  log_error("Could not add runtime peer");
81  return -1;
82  }
83  return 0;
84 }
85 
87  dedos_thread_stop(static_output_thread);
88 }
89 
91  pthread_join(static_output_thread->pthread, NULL);
92  free(static_output_thread);
93 }
94 
96 #define CHECK_MSG_SIZE(msg, target) \
97  if (msg->data_size != sizeof(target)) { \
98  log_warn("Message data size (%d) does not match size" \
99  "of target type (%d)" #target, (int)msg->data_size , \
100  (int)sizeof(target)); \
101  return -1; \
102  } \
103 
104 
105 static int process_output_thread_msg(struct thread_msg *msg) {
106 
107  log(LOG_MAIN_THREAD, "processing message %p with type id: %d",
108  msg, msg->type);
109  int rtn = -1;
110  switch (msg->type) {
111  case CONNECT_TO_RUNTIME:
113  struct ctrl_add_runtime_msg *runtime_msg = msg->data;
114  rtn = output_thread_connect_to_runtime(runtime_msg);
115 
116  if (rtn < 0) {
117  log_warn("Error adding runtime peer");
118  }
119  break;
120  case SEND_TO_PEER:
121  CHECK_MSG_SIZE(msg, struct send_to_peer_msg);
122  struct send_to_peer_msg *forward_msg = msg->data;
123  rtn = output_thread_send_to_peer(forward_msg);
124 
125  if (rtn < 0) {
126  log_warn("Error forwarding message to peer");
127  }
128  break;
129  case SEND_TO_CTRL:
130  CHECK_MSG_SIZE(msg, struct send_to_ctrl_msg);
131  struct send_to_ctrl_msg *ctrl_msg = msg->data;
132  rtn = output_thread_send_to_ctrl(ctrl_msg);
133  if (rtn < 0) {
134  log_warn("Error sending message to controller");
135  }
136  break;
137  case CREATE_MSU:
138  case DELETE_MSU:
139  case MSU_ROUTE:
140  log_error("Message (type: %d) meant for worker thread sent to output thread",
141  msg->type);
142  break;
143  default:
144  log_error("Unknown message type %d delivered to output thread", msg->type);
145  break;
146  }
147  return rtn;
148 }
149 
151 static int check_output_thread_queue(struct dedos_thread *output_thread) {
152 
153  struct thread_msg *msg = dequeue_thread_msg(&output_thread->queue);
154 
155  if (msg == NULL) {
156  return 0;
157  }
158 
159  int rtn = process_output_thread_msg(msg);
160  free(msg);
161  if (rtn < 0) {
162  log_error("Error processing thread msg");
163  }
164  return 0;
165 }
166 
168 #define STAT_REPORTING_DURATION_MS STAT_SAMPLE_PERIOD_MS
169 
174 static int output_thread_loop(struct dedos_thread *self, void UNUSED *init_data) {
175 
176  struct timespec elapsed;
177  struct timespec timeout_abs;
178  clock_gettime(CLOCK_REALTIME, &timeout_abs);
179  while (!dedos_thread_should_exit(self)) {
180 
181  clock_gettime(CLOCK_REALTIME, &elapsed);
182  if (elapsed.tv_sec > timeout_abs.tv_sec || (elapsed.tv_sec == timeout_abs.tv_sec &&
183  elapsed.tv_nsec > timeout_abs.tv_nsec)) {
184  if (send_stats_to_controller() < 0) {
185  log(LOG_STATS_SEND, "Error sending stats to controller");
186  }
187  log(LOG_STATS_SEND, "Sent stats");
188  timeout_abs = elapsed;
189  timeout_abs.tv_nsec += (long)STAT_REPORTING_DURATION_MS * 1e6;
190  while (timeout_abs.tv_nsec > 1e9) {
191  timeout_abs.tv_nsec -= 1e9;
192  timeout_abs.tv_sec += 1;
193  }
194  timeout_abs.tv_sec += (long)STAT_REPORTING_DURATION_MS / 1000;
195  }
196 
197  int rtn = thread_wait(self, &timeout_abs);
198  if (rtn < 0) {
199  log_error("Error waiting on output thread semaphore");
200  return -1;
201  }
202 
203  rtn = check_output_thread_queue(self);
204  if (rtn != 0) {
205  log_info("Breaking from output loop "
206  "due to thread queue");
207  break;
208  }
209  }
210  return 0;
211 }
212 
213 int enqueue_for_output(struct thread_msg *msg) {
214  int rtn = enqueue_thread_msg(msg, &static_output_thread->queue);
215  if (rtn < 0) {
216  log_error("Error enqueuing message %p to main thread", msg);
217  return -1;
218  }
219  log(MAIN_THREAD, "Enqueued message %p to main thread queue", msg);
220  return 0;
221 }
222 
224  struct dedos_thread *output_thread = malloc(sizeof(*output_thread));
225  if (output_thread == NULL) {
226  log_perror("Error allocating output thread");
227  return NULL;
228  }
231  NULL,
234  output_thread);
235  if (rtn < 0) {
236  log_error("Error starting output thread loop");
237  return NULL;
238  }
239  log_info("Started output thread loop");
240  return output_thread;
241 }
242 
struct rt_controller_msg_hdr hdr
ctrl_delete_msu_msg (ctrl_runtime_messages.h)
static int output_thread_send_to_peer(struct send_to_peer_msg *msg)
Process a SEND_TO_PEER message.
Definition: output_thread.c:60
static int check_output_thread_queue(struct dedos_thread *output_thread)
Checks the queue of the output thread for messages and acts on them if present.
Messages to be delivered to dedos_threads.
#define log_info(fmt,...)
Definition: logging.h:88
static int process_output_thread_msg(struct thread_msg *msg)
Processes a thread message that is delivered to the output thread.
Defines a type of MSU, including callback and accessor functions.
For delivery to the output monitor thread, a message to be sent to a peer runtime.
int send_to_peer(unsigned int runtime_id, struct inter_runtime_msg_hdr *hdr, void *payload)
Sends a message to the peer runtime with the provided id.
uint32_t ip
ip address of the runtime to connect to.
pthread_t pthread
The underlying pthread.
Definition: dedos_threads.h:37
void join_output_thread()
Joins the underlying pthread.
Definition: output_thread.c:90
int send_to_controller(struct rt_controller_msg_hdr *hdr, void *payload)
Sends a message to the global controller.
#define log_perror(fmt,...)
Definition: logging.h:102
Logging of status messages to the terminal.
int enqueue_thread_msg(struct thread_msg *thread_msg, struct msg_queue *queue)
Enqueues a dedos_msg with a thread_msg as the payload to the appropriate queue.
Threads that hold MSUs.
int thread_wait(struct dedos_thread *thread, struct timespec *abs_timeout)
To be called from the thread, causes it to wait until a message has been received or the timeout has ...
Functions for the sending and receiving of statistics between ctrl and runtime.
enum thread_msg_type type
struct thread_msg * dequeue_thread_msg(struct msg_queue *queue)
Dequeues a thread_msg from the message queue.
struct dedos_thread * start_output_monitor_thread(void)
Starts the thread monitoring the queue for messages to be sent to other endpoints.
Control spawned threads with message queue within DeDOS.
int enqueue_for_output(struct thread_msg *msg)
Enqueues a thread_msg for delivery to the output thread.
static struct dedos_thread * static_output_thread
A static copy of the output thread, so others can enqueue messages.
Definition: output_thread.c:40
payload: send_to_ctrl_msg (below)
For delivery to output monitor thread, a message to be sent to the controller.
void dedos_thread_stop(struct dedos_thread *thread)
Sets the exit signal for a thread, causing the main loop to quit.
#define log_error(fmt,...)
Definition: logging.h:101
A message to be delivered to a dedos_thread.
Payload for messages of type CTRL_CONNECT_TO_RUNTIME.
static int output_thread_send_to_ctrl(struct send_to_ctrl_msg *msg)
Process a SEND_TO_CTRL message.
Definition: output_thread.c:49
static int output_thread_loop(struct dedos_thread *self, void UNUSED *init_data)
The main thread loop for the output thread.
static int output_thread_connect_to_runtime(struct ctrl_add_runtime_msg *msg)
Process a CONNECT_TO_RUNTIME message.
Definition: output_thread.c:71
int send_stats_to_controller()
Samples the relevant statistics and sends them to the controller.
A dedos_thread which monitors a queue for output to be sent to other runtimes or the global controlle...
static void * init_output_thread(struct dedos_thread *output_thread)
Initializes the static copy of the output thread.
Definition: output_thread.c:43
Definitions of structures for sending messages from the global controller to runtimes.
struct inter_runtime_msg_hdr hdr
#define STAT_REPORTING_DURATION_MS
How often to report statistics.
#define UNUSED
void stop_output_monitor()
Triggers the output thread to stop execution.
Definition: output_thread.c:86
ctrl_create_msu_msg (ctrl_runtime_messages.h)
int connect_to_runtime_peer(unsigned int id, struct sockaddr_in *addr)
Innitiates a connection to a runtime peer with the given ID at the given address. ...
#define CHECK_MSG_SIZE(msg, target)
Checks whether the size of the message matches the size of the target struct.
Definition: output_thread.c:96
ctrl_msu_route_msg (ctrl_runtime_messages.h)
int dedos_thread_should_exit(struct dedos_thread *thread)
Returns 1 if the exit signal has been triggered, otherwise 0.
#define log(level, fmt,...)
Log at a custom level.
Definition: logging.h:147
int start_dedos_thread(dedos_thread_fn thread_fn, dedos_thread_init_fn init_fn, dedos_thread_destroy_fn destroy_fn, enum blocking_mode mode, int id, struct dedos_thread *thread)
Initilizes and starts execution of a dedos_thread.
#define OUTPUT_THREAD_ID
A thread_msg marked for delivery to this thread ID will be enqueued to the output thread...
Definition: output_thread.h:35
struct msg_queue queue
Queue for incoming message.
Definition: dedos_threads.h:43
int port
port of the runtime to connect to
payload: send_to_runtime_msg (below)
#define log_warn(fmt,...)
Definition: logging.h:113
payload: ctrl_add_runtime_msg (ctrl_runtime_messages.h)
unsigned int runtime_id
The runtime ID to which the message is delivered.
Structure representing any thread within DeDOS.
Definition: dedos_threads.h:35
unsigned int runtime_id
ID of the runtime to connect to.
Communication with global controller from runtime.