37 #include <netinet/ip.h>
44 static_output_thread = output_thread;
52 log_error(
"Error sending message to controller");
72 struct sockaddr_in addr;
73 bzero(&addr,
sizeof(addr));
74 addr.sin_family = AF_INET;
75 addr.sin_addr.s_addr = msg->
ip;
76 addr.sin_port = htons(msg->
port);
91 pthread_join(static_output_thread->
pthread, NULL);
92 free(static_output_thread);
96 #define CHECK_MSG_SIZE(msg, target) \
97 if (msg->data_size != sizeof(target)) { \
98 log_warn("Message data size (%d) does not match size" \
99 "of target type (%d)" #target, (int)msg->data_size , \
100 (int)sizeof(target)); \
107 log(LOG_MAIN_THREAD,
"processing message %p with type id: %d",
117 log_warn(
"Error adding runtime peer");
126 log_warn(
"Error forwarding message to peer");
134 log_warn(
"Error sending message to controller");
140 log_error(
"Message (type: %d) meant for worker thread sent to output thread",
144 log_error(
"Unknown message type %d delivered to output thread", msg->
type);
162 log_error(
"Error processing thread msg");
168 #define STAT_REPORTING_DURATION_MS STAT_SAMPLE_PERIOD_MS
176 struct timespec elapsed;
177 struct timespec timeout_abs;
178 clock_gettime(CLOCK_REALTIME, &timeout_abs);
181 clock_gettime(CLOCK_REALTIME, &elapsed);
182 if (elapsed.tv_sec > timeout_abs.tv_sec || (elapsed.tv_sec == timeout_abs.tv_sec &&
183 elapsed.tv_nsec > timeout_abs.tv_nsec)) {
185 log(LOG_STATS_SEND,
"Error sending stats to controller");
187 log(LOG_STATS_SEND,
"Sent stats");
188 timeout_abs = elapsed;
190 while (timeout_abs.tv_nsec > 1e9) {
191 timeout_abs.tv_nsec -= 1e9;
192 timeout_abs.tv_sec += 1;
199 log_error(
"Error waiting on output thread semaphore");
205 log_info(
"Breaking from output loop "
206 "due to thread queue");
216 log_error(
"Error enqueuing message %p to main thread", msg);
219 log(MAIN_THREAD,
"Enqueued message %p to main thread queue", msg);
224 struct dedos_thread *output_thread = malloc(
sizeof(*output_thread));
225 if (output_thread == NULL) {
236 log_error(
"Error starting output thread loop");
239 log_info(
"Started output thread loop");
240 return output_thread;
struct rt_controller_msg_hdr hdr
ctrl_delete_msu_msg (ctrl_runtime_messages.h)
static int output_thread_send_to_peer(struct send_to_peer_msg *msg)
Process a SEND_TO_PEER message.
static int check_output_thread_queue(struct dedos_thread *output_thread)
Checks the queue of the output thread for messages and acts on them if present.
Messages to be delivered to dedos_threads.
#define log_info(fmt,...)
static int process_output_thread_msg(struct thread_msg *msg)
Processes a thread message that is delivered to the output thread.
Defines a type of MSU, including callback and accessor functions.
For delivery to the output monitor thread, a message to be sent to a peer runtime.
int send_to_peer(unsigned int runtime_id, struct inter_runtime_msg_hdr *hdr, void *payload)
Sends a message to the peer runtime with the provided id.
uint32_t ip
ip address of the runtime to connect to.
pthread_t pthread
The underlying pthread.
void join_output_thread()
Joins the underlying pthread.
int send_to_controller(struct rt_controller_msg_hdr *hdr, void *payload)
Sends a message to the global controller.
#define log_perror(fmt,...)
Logging of status messages to the terminal.
int enqueue_thread_msg(struct thread_msg *thread_msg, struct msg_queue *queue)
Enqueues a dedos_msg with a thread_msg as the payload to the appropriate queue.
int thread_wait(struct dedos_thread *thread, struct timespec *abs_timeout)
To be called from the thread, causes it to wait until a message has been received or the timeout has ...
Functions for the sending and receiving of statistics between ctrl and runtime.
enum thread_msg_type type
struct thread_msg * dequeue_thread_msg(struct msg_queue *queue)
Dequeues a thread_msg from the message queue.
struct dedos_thread * start_output_monitor_thread(void)
Starts the thread monitoring the queue for messages to be sent to other endpoints.
Control spawned threads with message queue within DeDOS.
int enqueue_for_output(struct thread_msg *msg)
Enqueues a thread_msg for delivery to the output thread.
static struct dedos_thread * static_output_thread
A static copy of the output thread, so others can enqueue messages.
payload: send_to_ctrl_msg (below)
For delivery to output monitor thread, a message to be sent to the controller.
void dedos_thread_stop(struct dedos_thread *thread)
Sets the exit signal for a thread, causing the main loop to quit.
#define log_error(fmt,...)
A message to be delivered to a dedos_thread.
Payload for messages of type CTRL_CONNECT_TO_RUNTIME.
static int output_thread_send_to_ctrl(struct send_to_ctrl_msg *msg)
Process a SEND_TO_CTRL message.
static int output_thread_loop(struct dedos_thread *self, void UNUSED *init_data)
The main thread loop for the output thread.
static int output_thread_connect_to_runtime(struct ctrl_add_runtime_msg *msg)
Process a CONNECT_TO_RUNTIME message.
int send_stats_to_controller()
Samples the relevant statistics and sends them to the controller.
A dedos_thread which monitors a queue for output to be sent to other runtimes or the global controlle...
static void * init_output_thread(struct dedos_thread *output_thread)
Initializes the static copy of the output thread.
Definitions of structures for sending messages from the global controller to runtimes.
struct inter_runtime_msg_hdr hdr
#define STAT_REPORTING_DURATION_MS
How often to report statistics.
void stop_output_monitor()
Triggers the output thread to stop execution.
ctrl_create_msu_msg (ctrl_runtime_messages.h)
int connect_to_runtime_peer(unsigned int id, struct sockaddr_in *addr)
Innitiates a connection to a runtime peer with the given ID at the given address. ...
#define CHECK_MSG_SIZE(msg, target)
Checks whether the size of the message matches the size of the target struct.
ctrl_msu_route_msg (ctrl_runtime_messages.h)
int dedos_thread_should_exit(struct dedos_thread *thread)
Returns 1 if the exit signal has been triggered, otherwise 0.
#define log(level, fmt,...)
Log at a custom level.
int start_dedos_thread(dedos_thread_fn thread_fn, dedos_thread_init_fn init_fn, dedos_thread_destroy_fn destroy_fn, enum blocking_mode mode, int id, struct dedos_thread *thread)
Initilizes and starts execution of a dedos_thread.
#define OUTPUT_THREAD_ID
A thread_msg marked for delivery to this thread ID will be enqueued to the output thread...
struct msg_queue queue
Queue for incoming message.
int port
port of the runtime to connect to
payload: send_to_runtime_msg (below)
#define log_warn(fmt,...)
payload: ctrl_add_runtime_msg (ctrl_runtime_messages.h)
unsigned int runtime_id
The runtime ID to which the message is delivered.
Structure representing any thread within DeDOS.
unsigned int runtime_id
ID of the runtime to connect to.
Communication with global controller from runtime.