36 memset(hdr,
'\0',
sizeof(*hdr));
55 memcpy(&key->
key, &
id,
sizeof(
id));
64 if (seed_size >
sizeof(key->
key)) {
65 log_warn(
"Key length too large for composite key!");
66 seed_size =
sizeof(key->
key);
68 memcpy(&key->
key, seed, seed_size);
89 log_warn(
"Cannot record provinance in path: Path too short");
97 log(LOG_ADD_PROVINANCE,
"Path len of prov %p is now %d", prov, prov->
path_len);
102 struct timespec zero = {};
107 struct dedos_msg *msg = malloc(
sizeof(*msg));
131 log_error(
"Received non-MSU message on msu queue!");
136 log_warn(
"Data size of dequeued MSU message (%d) does not meet expected (%d)",
149 log_error(
"Error reading msu msg header from fd %d", fd);
164 struct msu_msg *msg = malloc(
sizeof(*msg));
173 log_error(
"Size of incoming message is not big enough to fit header");
176 log(LOG_MSU_MSG_READ,
"Reading header from %d", fd);
177 struct msu_msg *msg = malloc(
sizeof(*msg));
183 log_error(
"Error reading msu message header");
191 data = malloc(data_size);
193 log_perror(
"Error allocating msu msg of size %d", (
int)data_size);
197 log(LOG_MSU_MSG_READ,
"Reading payload of size %d from %d", (
int)data_size, fd);
199 log_perror(
"Error reading msu msg payload of size %d", (
int)data_size);
204 log(LOG_MSU_MSG_DESERIALIZE,
"Deserialized MSU message of size %d", (
int)data_size);
221 if (serializer != NULL) {
222 void *payload = NULL;
223 ssize_t payload_size = serializer(dst_type, msg, &payload);
225 "Serialized message into payload of size %d using %s serialization",
226 (
int)payload_size, dst_type->
name);
227 if (payload_size < 0) {
228 log_error(
"Error running destination type %s's serialize function",
232 size_t serialized_size =
sizeof(
struct msu_msg_hdr) + payload_size;
233 void *output = malloc(serialized_size);
234 if (output == NULL) {
235 log_error(
"Could not allocate serialized MSU msg");
239 memcpy(output, &msg->
hdr,
sizeof(msg->
hdr));
240 memcpy(output +
sizeof(msg->
hdr), payload, payload_size);
244 *size_out = serialized_size;
247 size_t serialized_size =
sizeof(
struct msu_msg_hdr) + msg->data_size;
248 void *output = malloc(serialized_size);
250 if (output == NULL) {
251 log_error(
"Could not allocate serialized MSU msg");
255 memcpy(output, &msg->hdr,
sizeof(msg->hdr));
256 memcpy(output +
sizeof(msg->hdr), msg->data, msg->data_size);
258 *size_out = serialized_size;
Item in the chain of history kept track of in each MSU.
Interface for general-purpose socket communication.
Header for messages passed to MSUs.
struct composite_key key
The full, arbitrary-length, unique key (used in state)
int add_provinance(struct msg_provinance *prov, struct local_msu *sender)
Adds a new item to the path of MSUs taken within the mesasge provinance in the header.
int error_flag
0 if no error has been encountered
struct dedos_msg * dequeue_msg(struct msg_queue *q)
Dequeues the first available message from q.
int read_payload(int fd, size_t size, void *buff)
Reads a buffer of a given size from a file descriptor.
struct msu_msg * dequeue_msu_msg(struct msg_queue *q)
Dequeues an MSU message from the provided message queue.
struct msu_msg * read_msu_msg(struct local_msu *msu, int fd, size_t size)
Reads an MSU message of the given size from the provided file descriptor.
unsigned int msu_msg_sender_type(struct msg_provinance *prov)
Container for linked list message queue.
int read_msu_msg_hdr(int fd, struct msu_msg_hdr *hdr)
int32_t id
A shorter, often hashed id for the key of fixed length (used in routing)
#define log_perror(fmt,...)
int set_msg_key(int32_t id, struct msu_msg_key *key)
Sets the key's ID and composite-ID to be equal to the provided id.
struct msu_provinance_item sender
The last MSU to see this message.
Logging of status messages to the terminal.
unsigned int id
Unique ID for a local MSU.
struct msu_provinance_item origin
The first MSU to see this message.
int seed_msg_key(void *seed, size_t seed_size, struct msu_msg_key *key)
Sets the key's composite-ID to the provided value, and sets the key's ID to a hash of the value...
Interactions with global dfg from individual runtime.
A linked-list entry containing a message.
Keeps track of which MSUs have seen a given message header.
void destroy_msu_msg_and_contents(struct msu_msg *msg)
Frees both the message and message data.
int schedule_msg(struct msg_queue *q, struct dedos_msg *msg, struct timespec *interval)
Schedules a message to be delivered once interval time has passed.
For profiling the path of MSU messages through DeDOS.
enum dedos_msg_type type
Target of delivery: runtime, thread, or MSU.
struct msu_provinance_item * get_provinance_item(struct msg_provinance *p, struct msu_type *type)
struct msu_msg * create_msu_msg(struct msu_msg_hdr *hdr, size_t data_size, void *data)
Creates an MSU message with the appropriate header, data size, and data.
size_t data_size
Payload size.
unsigned int id
Numerical identifier for the MSU.
#define log_error(fmt,...)
Declares the structures and functions applicable to MSUs on the local machine.
int path_len
The current length of msg_provinance::path.
#define PROFILE_EVENT(hdr, stat_id)
If the header is marked for profiling, profiles the given event.
struct msu_provinance_item path[8]
A list of each MSU that has seen this message TODO: For now, one MSU of each type.
The structure that represents an MSU located on the local machine.
serialization_fn serialize
Defines serialization protocol for data sent to this MSU type If NULL, assumes no pointers in the msu...
ssize_t(* serialization_fn)(struct msu_type *, struct msu_msg *, void **output)
serialization_fn serialize_error
Defines serialization protocol for errors returned to this MSU type If NULL, assumes no pointers in t...
deserialization_fn deserialize
Defines deserialization protocl for data received by this MSU type If NULL, assumes no pointers in th...
int init_msu_msg_hdr(struct msu_msg_hdr *hdr, struct msu_msg_key *key)
Initializes and resets a message header, storing a copy of the provided key.
#define MAX_PATH_LEN
The maximum path length recorded for a messages path through MSUs.
void * serialize_msu_msg(struct msu_msg *msg, struct msu_type *dst_type, size_t *size_out)
Converts an MSU message into a serializes stream of bytes.
int group_id
Used to mark a route ID when storing state.
#define log(level, fmt,...)
Log at a custom level.
A message that is to be delivered to an instance of an MSU.
#define HASH_VALUE(keyptr, keylen, hashv)
char * name
Name for the msu type.
Used to uniquely identify the source of a message, used in state storage as well as routing...
size_t key_len
The length of the composite key.
int enqueue_msu_msg(struct msg_queue *q, struct msu_msg *data)
Enqueues a message for immediate delivery.
#define log_warn(fmt,...)
ssize_t data_size
Size of payload.
int schedule_msu_msg(struct msg_queue *q, struct msu_msg *data, struct timespec *interval)
Schedules an MSU message to be delivered after interval time has passed.
struct msu_type * type
Pointer to struct containing information shared across all instances of this type of MSU...
struct msu_msg_key key
Routing/state key.