37 #include <arpa/inet.h>
50 log_error(
"Controller socket not initialized");
56 log_error(
"Error sending header to controller");
65 log_error(
"Error sending payload to controller");
69 log(LOG_CONTROLLER_COMMUNICATION,
"Sent payload of size %d type %d to controller",
81 log_error(
"Could not get local runtime ID to send to controller");
95 .payload_size =
sizeof(msg)
108 log_error(
"Controller socket already initialized");
115 log_error(
"Error connecting to global controller!");
119 char ip[INET_ADDRSTRLEN];
120 inet_ntop(AF_INET, &addr->sin_addr, ip, INET_ADDRSTRLEN);
121 int port = ntohs(addr->sin_port);
123 log_info(
"Connected to global controller at %s:%d", ip, port);
127 log_error(
"Error sending initialization message to controller");
137 #define CHECK_MSG_SIZE(msg, target) \
138 if (msg->payload_size != sizeof(target)) { \
139 log_warn("Message data size (%d) does not match size" \
140 "of target type (%d)" #target, (int)msg->payload_size , \
141 (int)sizeof(target)); \
175 struct sockaddr_in addr = {};
176 addr.sin_family = AF_INET;
177 addr.sin_addr.s_addr = msg->
ip;
178 addr.sin_port = htons(msg->
port);
195 log_error(
"Error creating worker thread %d",
id);
198 log(LOG_THREAD_CREATION,
"Created worker thread %d",
id);
207 log(LOG_CONTROLLER_COMMUNICATION,
"Got control route message of type %d", msg->
type);
212 log_error(
"Error creating new route of id %d, type %d",
221 log_error(
"Cannot initilize runtime endpoint for adding "
227 log_error(
"Error adding endpoint %d to route %d with key %d",
235 log_error(
"Error removing endpoint %d from route %d",
243 log_error(
"Error modifying endpoint %d on route %d to have key %d",
249 log_error(
"Unknown route control message type received: %d", msg->
type);
266 log_error(
"Unknown thread message type %d", type);
280 log_warn(
"May not process message. Incorrect payload size for type.");
286 log_error(
"Error reading control payload. Cannot process message");
290 log(LOG_CONTROLLER_COMMUNICATION,
"Read control payload %p of size %d",
306 if (thread_msg == NULL) {
307 log_error(
"Error constructing thread msg from control hdr");
311 if (thread == NULL) {
312 log_error(
"Error getting dedos thread %d to deliver control message",
334 log_warn(
"May not process message. Incorrect payload size for type");
341 log_error(
"Error reading control payload. Cannot process message");
344 log(LOG_CONTROLLER_COMMUNICATION,
"Read control payload %p of size %d",
351 log_error(
"Error processing connect to runtime message");
358 log_error(
"Error processing create thread message");
368 log_error(
"Error processing control route message");
373 log_error(
"Unknown message type delivered to input thread: %d", hdr->
type);
395 log_error(
"Error passing control message to thread");
405 log_error(
"Error processing control message");
412 log_error(
"Unknown header type %d in receiving thread", hdr->
type);
423 log_error(
"Error reading control message");
425 }
else if (rtn == 1) {
430 log(LOG_CONTROLLER_COMMUNICATION,
431 "Read header (type %d) from controller", hdr.
type);
436 log_error(
"Error processing control message");
451 log_error(
"Error connecting to global controller");
455 log_error(
"Attempted to initialize controller socket "
456 "before initializing runtime epoll");
464 log(LOG_STAT_SEND,
"Skipping sending statistics: controller not initialized");
470 clock_gettime(CLOCK_REALTIME_COARSE, &now);
475 total_items += n_items;
476 if (samples == NULL) {
477 log(LOG_STAT_SEND,
"Error getting stat sample for send to controller");
482 char buffer[serial_size];
485 log_error(
"Error serializing stat sample");
491 .payload_size = ser_rtn
496 log_error(
"Error sending statistics to controller");
500 log(LOG_STAT_SEND,
"Sending %d statistics to controller", total_items);
int init_connected_socket(struct sockaddr_in *addr)
Initializes a socket that is connected to a given address.
int init_msu_endpoint(int msu_id, int runtime_id, struct msu_endpoint *endpoint)
Initializes an endpoint structure to point to the relevant msu.
payload: ctrl_create_thread_msg
Interface for general-purpose socket communication.
int add_route_endpoint(int route_id, struct msu_endpoint endpoint, uint32_t key)
Adds an endpoint to the route with the given ID.
size_t serialized_stat_sample_size(struct stat_sample *sample, int n_samples)
Determines the size needed to hold the serialized version of sample.
Collecting statistics within the runtime.
int init_route(int route_id, int type_id)
Initializes a new route with the given route_id and type_id.
Kept unknown at 0 to catch mis-labeled messages.
ctrl_delete_msu_msg (ctrl_runtime_messages.h)
static int process_ctrl_message(struct ctrl_runtime_msg_hdr *hdr, int fd)
Processes a received control message that is due for delivery to this thread.
uint32_t local_runtime_ip()
Messages to be delivered to dedos_threads.
#define log_info(fmt,...)
payload: ctrl_msu_route_msg
size_t payload_size
Payload size.
All messages sent from controller to runtime are prefixed with this header.
Payload for messages of type CTRL_DELETE_MSU.
#define log_critical(fmt,...)
uint32_t ip
Local IP address with which the runtime listens for other runtimes.
thread_msg_type
All messages that can be received by output thread or workers.
payload: ctrl_add_runtime_msg
static int verify_msg_size(struct ctrl_runtime_msg_hdr *msg)
Checks whether the size of a message matches the size of its target struct.
ssize_t serialize_stat_samples(struct stat_sample *samples, int n_samples, void *buffer, size_t buff_len)
Serializes from the provided samples into the buffer
A single stat sample for a single item.
int send_ack_message(int id, bool success)
WILL Send an acknoweledgement of success for a specific message.
ctrl_runtime_msg_type
The various top-level types of messages which can be sent from the controller to runtimes.
int read_payload(int fd, size_t size, void *buff)
Reads a buffer of a given size from a file descriptor.
uint32_t ip
ip address of the runtime to connect to.
enum blocking_mode mode
The mode of the thread.
Monitors an incoming port for messages from runtime or controller.
stat_id
The identifiers with which stats can be logged.
Payload for messages of type CTRL_MODIFY_ROUTE.
int send_to_controller(struct rt_controller_msg_hdr *hdr, void *payload)
Sends a message to the global controller.
int route_id
Route to which the message applies.
payload: ctrl_create_msu_msg
Logging of status messages to the terminal.
static struct stat_type_label reported_stat_types[]
Static structure so the reported stat types can be referenced as an array.
int enqueue_thread_msg(struct thread_msg *thread_msg, struct msg_queue *queue)
Enqueues a dedos_msg with a thread_msg as the payload to the appropriate queue.
static int controller_sock
Static (global) variable to hold the socket connecting to the global controller.
void destroy_thread_msg(struct thread_msg *msg)
Frees a thread message.
size_t payload_size
Payload will be serialized following this struct.
int init_controller_socket(struct sockaddr_in *addr)
Initilizes a connection with the global controller located at the provided address.
Interactions with global dfg from individual runtime.
enum thread_msg_type type
Payload for messages of type CTRL_CREATE_MSU.
Payload: output of serialize_stat_samples.
Control spawned threads with message queue within DeDOS.
Header for all messages from controller to runtime.
struct thread_msg * construct_thread_msg(enum thread_msg_type type, ssize_t data_size, void *data)
Allocates and initializes a thread message with the provided options.
#define log_error(fmt,...)
int thread_id
The ID to give to the created thread.
A message to be delivered to a dedos_thread.
Payload for messages of type CTRL_CONNECT_TO_RUNTIME.
Payload type: rt_controller_init_msg.
int id
Id for confirmation message (not implemented)
Payload for messages of type CTRL_CREATE_THREAD.
int monitor_controller_socket(int new_fd)
Adds the global controller to be monitored by the socket monitor.
static int process_ctrl_message_hdr(struct ctrl_runtime_msg_hdr *hdr, int fd)
Processes any received control message.
int send_stats_to_controller()
Samples the relevant statistics and sends them to the controller.
A dedos_thread which monitors a queue for output to be sent to other runtimes or the global controlle...
#define N_REPORTED_STAT_TYPES
Number of reported stat types.
static int process_ctrl_route_msg(struct ctrl_route_msg *msg)
Processes a received ctrl_route_msg.
Payload for messages of type CTRL_MSU_ROUTES.
enum rt_controller_msg_type type
Type of payload attached.
struct dedos_thread * get_dedos_thread(int id)
Returns the dedos_thread with the given ID.
static int send_ctl_init_msg()
Sends the initilization message containing runtime ID, ip and port to global controller.
Initialization message, sent to controller to identify runtime upon first connection.
int remove_route_endpoint(int route_id, int msu_id)
Removes destination from route with given ID.
static enum thread_msg_type get_thread_msg_type(enum ctrl_runtime_msg_type type)
Gets the corresponding thread_msg_type for a ctrl_runtime_msg_type.
Adds an endpoint to a route.
int modify_route_endpoint(int route_id, int msu_id, uint32_t new_key)
Modifies key associated with MSU in route.
ctrl_create_msu_msg (ctrl_runtime_messages.h)
payload: ctrl_delete_thread_msg
int msu_id
ID of MSU to add/delete/modify.
bool is_controller_fd(int fd)
Checks if fd is file descriptor for controller.
int connect_to_runtime_peer(unsigned int id, struct sockaddr_in *addr)
Innitiates a connection to a runtime peer with the given ID at the given address. ...
int thread_id
ID of the Thread to which the message is to be delivered.
#define CHECK_MSG_SIZE(msg, target)
Macro to check whether the size of a message matches the size of the struct it's supposed to be...
unsigned int runtime_id
Unique identifier for the runtime.
Deletes an endpoint from a route.
static struct thread_msg * thread_msg_from_ctrl_hdr(struct ctrl_runtime_msg_hdr *hdr, int fd)
Constructs a thread message from a ctrl_runtime_msg_hdr, reading any additional information it needs ...
int type_id
MSU Type of the route.
ctrl_msu_route_msg (ctrl_runtime_messages.h)
static int process_create_thread_msg(struct ctrl_create_thread_msg *msg)
Processes a received ctrl_create_thread_msg.
int ack_id
for sending acknowledgements to controller.
static int pass_ctrl_msg_to_thread(struct ctrl_runtime_msg_hdr *hdr, int fd)
Constructs a thread_msg from a control-runtime message and passes it to the relevant thread...
enum ctrl_runtime_msg_type type
Identifies the type of payload that follows.
enum ctrl_route_msg_type type
sub-type of message
Modifies the key corresponding to a route endpoint.
struct stat_sample * get_stat_samples(enum stat_id stat_id, struct timespec *sample_time, int *n_samples_out)
Samples the statistic with the provided stat_id.
#define log(level, fmt,...)
Log at a custom level.
int create_worker_thread(unsigned int thread_id, enum blocking_mode mode)
Starts a new worker thread with the given thread ID and pinned/unpinned status.
int handle_controller_communication(int fd)
Reads and processes a controller message off of the provided file descriptor.
ssize_t send_to_endpoint(int fd, void *data, size_t data_len)
Writes a buffer of a given size to a file descriptor.
struct msg_queue queue
Queue for incoming message.
int port
port of the runtime to connect to
An endpoint to which an msu_msg can be delivered.
#define log_warn(fmt,...)
static int connect_to_controller(struct sockaddr_in *addr)
Initializes a connection to the global controller.
payload: ctrl_delete_msu_msg
static int process_connect_to_runtime(struct ctrl_add_runtime_msg *msg)
Processes a received ctrl_add_runtime_msg.
Structure representing any thread within DeDOS.
unsigned int runtime_id
ID of the runtime to connect to.
Communication with global controller from runtime.