46 if (serialized == NULL) {
47 log_error(
"Error serializing MSU msg for delivery to msu %d", dst->
id);
56 if (thread_msg == NULL) {
57 log_error(
"Error creating thread message for sending MSU msg to msu %d",
66 log_error(
"Error enqueuing MSU msg for msu %u on main thread", dst->
id);
72 log(LOG_MSU_ENQUEUES,
"Enqueued message for remote send to dst %d on runtime %d",
80 log(LOG_MSU_ENQUEUES,
"Enqueing data %p directly to destination %d",
85 log_warn(
"Could not add provinance to message %p", msg);
90 log_error(
"Error enqueing data %p to local MSU %d", msg->
data, dest->
id);
100 log(LOG_MSU_ENQUEUES,
"Enqueing data %p directly to destination %d",
105 log_warn(
"Could not add provinance to message %p", msg);
110 log_error(
"Error enqueing data %p to local MSU %d", msg->
data, dest->
id);
116 log_warn(
"Error enqueing timeout to worker thread");
125 log_error(
"Error initializing message header");
138 log_error(
"Error initializing message header");
147 struct msu_msg_hdr *hdr,
size_t data_size,
void *data) {
153 log_warn(
"Could not add provinance to message %p", msg);
156 log(LOG_MSU_ENQUEUES,
"Sending data %p to destination type %s",
160 if (dst_type->
route) {
161 rtn = dst_type->
route(dst_type, sender, msg, &dst);
167 log_error(
"Could not find destination endpoint of type %s from msu %d (%s). "
168 "Dropping message %p",
180 log_error(
"Error enqueuing data %p to local MSU %d", msg->
data, dst.
id);
184 log(LOG_MSU_ENQUEUES,
"Enqueued data %p to local msu %d", msg->
data, dst.
id);
189 log_error(
"Error sending data %p to remote MSU %d", msg->
data, dst.
id);
192 log(LOG_MSU_ENQUEUES,
"Sending data %p to remote msu %d", msg->
data, dst.
id);
199 log_error(
"Unknown MSU locality for msu %d (from %d): %d",
206 struct msu_msg_key *key,
size_t data_size,
void *data) {
209 log_error(
"Error initializing message header");
214 return call_msu_type(sender, dst_type, &hdr, data_size, data);
219 struct msu_msg_hdr *hdr,
size_t data_size,
void *data) {
225 log_warn(
"Could not add provinance to message %p", msg);
228 log(LOG_MSU_ENQUEUES,
"Sending data %p to destination endpoint %d",
235 log_error(
"Error enqueuing data %p to local msu %d", msg->
data, endpoint->
id);
239 log(LOG_MSU_ENQUEUES,
"Enqueued data %p to local msu %d", msg->
data, endpoint->
id);
244 log_error(
"Error enqueueing data %p towards remote msu %d",
249 log(LOG_MSU_ENQUEUES,
"Enqueued data %p toward remote msu %d",
263 log_error(
"Error initializing message header");
268 return call_msu_endpoint(sender, endpoint, endpoint_type, &hdr, data_size, data);
273 struct msu_msg_hdr *hdr,
size_t data_size,
void *data) {
int id
A unque identifier for the endpoint (msu ID)
unsigned int runtime_id
The ID for the runtime on which the endpoint resides.
Header for messages passed to MSUs.
static int enqueue_for_remote_send(struct msu_msg *msg, struct msu_type *dst_type, struct msu_endpoint *dst)
Calls type-specific MSU serialization function and enqueues data into main thread queue so it can be ...
Messages to be delivered to dedos_threads.
int add_provinance(struct msg_provinance *prov, struct local_msu *sender)
Adds a new item to the path of MSUs taken within the mesasge provinance in the header.
int error_flag
0 if no error has been encountered
int enqueue_worker_timeout(struct worker_thread *thread, struct timespec *interval)
Signals that the given thread should break when waiting on its semaphore once interval time has passe...
int default_routing(struct msu_type *type, struct local_msu *sender, struct msu_msg *msg, struct msu_endpoint *output)
The defualt routing strategy, using the key of the MSU message to route to a pre-defined endpoint...
int init_call_msu_type(struct local_msu *sender, struct msu_type *dst_type, struct msu_msg_key *key, size_t data_size, void *data)
Sends an MSU message to a destination of the specified type.
int call_msu_error(struct local_msu *sender, struct msu_endpoint *endpoint, struct msu_type *endpoint_type, struct msu_msg_hdr *hdr, size_t data_size, void *data)
Logging of status messages to the terminal.
unsigned int id
Unique ID for a local MSU.
void destroy_thread_msg(struct thread_msg *msg)
Frees a thread message.
void destroy_msu_msg_and_contents(struct msu_msg *msg)
Frees both the message and message data.
struct msg_queue * queue
if msu_endpoint::locality == MSU_IS_LOCAL, the queue for the msu endpoint
int enqueue_for_output(struct thread_msg *msg)
Enqueues a thread_msg for delivery to the output thread.
For profiling the path of MSU messages through DeDOS.
Declares the methods available for calling an MSU from another MSU.
struct msu_msg * create_msu_msg(struct msu_msg_hdr *hdr, size_t data_size, void *data)
Creates an MSU message with the appropriate header, data size, and data.
size_t data_size
Payload size.
struct msg_provinance provinance
Message history.
int call_msu_endpoint(struct local_msu *sender, struct msu_endpoint *endpoint, struct msu_type *endpoint_type, struct msu_msg_hdr *hdr, size_t data_size, void *data)
Sends an MSU message to a speicific destination, either local or remote.
#define log_error(fmt,...)
A message to be delivered to a dedos_thread.
int schedule_local_msu_call(struct local_msu *sender, struct local_msu *dest, struct timespec *interval, struct msu_msg_hdr *hdr, size_t data_size, void *data)
Schedules a call to a local MSU to occur at some point in the future.
#define PROFILE_EVENT(hdr, stat_id)
If the header is marked for profiling, profiles the given event.
The structure that represents an MSU located on the local machine.
A dedos_thread which monitors a queue for output to be sent to other runtimes or the global controlle...
enum msu_locality locality
Whether the endpoint is on the local machine or remote.
int init_call_local_msu(struct local_msu *sender, struct local_msu *dest, struct msu_msg_key *key, size_t data_size, void *data)
Enqueues a new message in the queue of a local MSU.
struct msg_queue queue
Input queue to MSU.
int(* route)(struct msu_type *type, struct local_msu *sender, struct msu_msg *msg, struct msu_endpoint *output)
Choose which MSU of this type the previous MSU will route to.
struct worker_thread * thread
The worker thread on which this MSU is placed.
int call_msu_type(struct local_msu *sender, struct msu_type *dst_type, struct msu_msg_hdr *hdr, size_t data_size, void *data)
Sends an MSU message to a destination of the given type, utilizing the sending MSU's routing function...
int init_msu_msg_hdr(struct msu_msg_hdr *hdr, struct msu_msg_key *key)
Initializes and resets a message header, storing a copy of the provided key.
unsigned int route_id
The ID for the route used to get to the endpoint.
void * serialize_msu_msg(struct msu_msg *msg, struct msu_type *dst_type, size_t *size_out)
Converts an MSU message into a serializes stream of bytes.
struct thread_msg * init_send_thread_msg(unsigned int runtime_id, unsigned int target_id, size_t data_len, void *data)
Initializes a send_to_peer message (SEND_TO_PEER)
int group_id
Used to mark a route ID when storing state.
#define log(level, fmt,...)
Log at a custom level.
A message that is to be delivered to an instance of an MSU.
int call_local_msu(struct local_msu *sender, struct local_msu *dest, struct msu_msg_hdr *hdr, size_t data_size, void *data)
Enqueues a message in the queue of a local MSU.
char * name
Name for the msu type.
Used to uniquely identify the source of a message, used in state storage as well as routing...
An endpoint to which an msu_msg can be delivered.
int enqueue_msu_msg(struct msg_queue *q, struct msu_msg *data)
Enqueues a message for immediate delivery.
#define log_warn(fmt,...)
int init_call_msu_endpoint(struct local_msu *sender, struct msu_endpoint *endpoint, struct msu_type *endpoint_type, struct msu_msg_key *key, size_t data_size, void *data)
Sends an MSU message to a specific destination, either local or remote.
#define SET_PROFILING(hdr)
Sets whether the header should be profiled.
int schedule_msu_msg(struct msg_queue *q, struct msu_msg *data, struct timespec *interval)
Schedules an MSU message to be delivered after interval time has passed.
struct msu_type * type
Pointer to struct containing information shared across all instances of this type of MSU...
Declares strategies that MSUs can use for routing to endpoints.
struct msu_msg_key key
Routing/state key.
int schedule_local_msu_init_call(struct local_msu *sender, struct local_msu *dest, struct timespec *interval, struct msu_msg_key *key, size_t data_size, void *data)
Schedules a call to a local MSU to occur at some point in the future.