36 #define __USE_GNU // For some reason, necessary for RUSAGE_THREAD
37 #include <sys/resource.h>
44 #define MAX_MSU_ID 1024
59 struct local_msu *msu = calloc(1,
sizeof(*msu));
92 log_perror(
"Error opening write lock on msu registry");
96 if (local_msu_registry[
id] == NULL) {
97 log_warn(
"MSU with id %d does not exist and cannot be deleted",
id);
100 local_msu_registry[id] = NULL;
116 log_perror(
"Error opening write lock on msu registry");
120 if (local_msu_registry[msu->
id] != NULL) {
121 log_error(
"MSU with id %d already exists and cannot be added to registry", msu->
id);
124 local_msu_registry[msu->
id] = msu;
140 log_perror(
"Error opening read lock on MSU registry");
149 log_error(
"Could not get local msu with id %d. Not registered.",
id);
171 #define NUM_MSU_STAT_IDS sizeof(MSU_STAT_IDS) / sizeof(enum stat_id)
192 log_warn(
"Could not remove stat item %d for msu %d",
211 log_error(
"Error initializing msu queue");
218 log_error(
"Error registering MSU With thread");
224 log_info(
"Initializing msu (ID: %d, type: %s, data: '%s')",
id, type->
name,
229 if (type->
init(msu, data) != 0) {
230 log_error(
"Error running MSU %d (type: %s) type-specific initialization function",
240 log_error(
"Error adding MSU to local registry");
266 log_info(
"Removed msu (ID: %d, Type: %s)",
id, type);
284 log_error(
"Error executing MSU %d (%s) receive function",
292 int rtn = getrusage(RUSAGE_THREAD, before);
299 #define RECORD_DIFF(dstat, rstat, id) \
300 increment_stat(dstat, id, after.rstat - before->rstat)
302 #define RECORD_TIMEDIFF(dstat, rstat, id) \
303 increment_stat(dstat, id, ((double)after.rstat.tv_sec + after.rstat.tv_usec * 1e-6) - \
304 ((double)before->rstat.tv_sec + before->rstat.tv_usec * 1e-6))
308 int rtn = getrusage(RUSAGE_THREAD, &after);
328 log_error(
"MSU %d received error with no handler", msu->
id);
333 log_error(
"Error executing MSU error receive function");
337 log(LOG_MSU_DEQUEUES,
"Dequeued MSU message %p for msu %d", msg, msu->
id);
338 struct rusage before;
342 if (gather_err == 0) {
364 if (up_type == NULL) {
374 log_error(
"Error initializing msu endpoint");
379 log_error(
"Error calling MSU endpoint for error report");
Item in the chain of history kept track of in each MSU.
int init_msu_endpoint(int msu_id, int runtime_id, struct msu_endpoint *endpoint)
Initializes an endpoint structure to point to the relevant msu.
void(* destroy)(struct local_msu *self)
Type-specific destructor that frees any internal data or state.
int msu_dequeue(struct local_msu *msu)
Dequeus a message from a local MSU and calls its receive function.
Collecting statistics within the runtime.
#define MAX_MSU_ID
MOVEME: MAX_MSU_ID Defines the maximum ID that can be assigned to an MSU.
int msu_error(struct local_msu *msu, struct msu_msg_hdr *hdr, int broadcast)
void msu_free_all_state(struct local_msu *msu)
Frees all state structures associated with the given MSU.
Header for messages passed to MSUs.
Messages to be delivered to dedos_threads.
#define log_info(fmt,...)
int register_msu_with_thread(struct local_msu *msu)
Registers an MSU as one that should be run on its assigned thread.
int error_flag
0 if no error has been encountered
static void init_msu_stats(int msu_id)
Initializes the stat IDS that are relevant to an MSU.
Representation of a thread that holds MSUs, messages, and waits on a semaphore.
unsigned int scheduling_weight
The number of items that should be dequeued on this MSU each tick.
struct msu_msg * dequeue_msu_msg(struct msg_queue *q)
Dequeues an MSU message from the provided message queue.
State storage that is tied to a specific MSU mesasge.
static int gather_metrics_before(struct rusage *before)
stat_id
The identifiers with which stats can be logged.
static int rm_from_local_registry(int id)
Removes an MSU from the local MSU registry.
struct routing_table ** routes
#define log_perror(fmt,...)
static enum stat_id MSU_STAT_IDS[]
The stat IDs that are associated with an MSU, to be registered on MSU creation.
int record_start_time(enum stat_id stat_id, unsigned int item_id)
Starts a measurement of elapsed time.
int call_msu_error(struct local_msu *sender, struct msu_endpoint *endpoint, struct msu_type *endpoint_type, struct msu_msg_hdr *hdr, size_t data_size, void *data)
struct msu_type * get_msu_type(int id)
Gets the MSU type with the provided ID.
Logging of status messages to the terminal.
unsigned int id
Unique ID for a local MSU.
static int add_to_local_registry(struct local_msu *msu)
Adds an MSU to the local registry so it can be referred to elsewhere by ID.
void destroy_msu(struct local_msu *msu)
Calls type-specific destroy function and frees associated memory.
static void msu_free(struct local_msu *msu)
Frees the memory associated with an MSU structure, including any routes, messages in its queue...
int(* receive)(struct local_msu *self, struct msu_msg *msg)
Handles the receiving of data sent from other MSUs.
Declares the methods available for calling an MSU from another MSU.
struct route_set routes
Routing table set, containing all destination MSUs (see routing.h for details)
static void record_metrics(struct rusage *before, int msu_id)
size_t data_size
Payload size.
struct msg_provinance provinance
Message history.
struct local_msu * local_msu_registry[1024]
Mapping of MSU ID to the specific instance of the local MSU.
int(* receive_error)(struct local_msu *self, struct msu_msg *msg)
#define log_error(fmt,...)
Declares the structures and functions applicable to MSUs on the local machine.
int init_stat_item(enum stat_id stat_id, unsigned int item_id)
Initializes a stat item so that statistics can be logged to it.
struct local_msu * init_msu(unsigned int id, struct msu_type *type, struct worker_thread *thread, struct msu_init_data *data)
Allocates and creates a new MSU of the specified type and ID on the given thread. ...
static struct local_msu * msu_alloc()
Allocates the memory associated with an MSU structure.
struct local_msu * get_local_msu(unsigned int id)
Gets the local MSU with the given ID, or NULL if N/A.
int remove_stat_item(enum stat_id stat_id, unsigned int item_id)
Un-registers an item so it can no longer have statistics registered, and will not be reported to the ...
int path_len
The current length of msg_provinance::path.
static void unregister_msu_stats(int msu_id)
Unregisters the stat IDS that are relevant to an MSU.
struct msu_provinance_item path[8]
A list of each MSU that has seen this message TODO: For now, one MSU of each type.
The structure that represents an MSU located on the local machine.
int try_destroy_msu(struct local_msu *msu)
Destroys the MSU, but only if it has no states currently saved.
Data with which an MSU is initialized, and the payload for messages of type CTRL_CREATE_MSU.
#define RECORD_DIFF(dstat, rstat, id)
struct msg_queue queue
Input queue to MSU.
Definitions of the message types that can be passed between runtimes.
int record_stat(enum stat_id stat_id, unsigned int item_id, double stat, bool relog)
Records a statistic in the statlog.
struct worker_thread * thread
The worker thread on which this MSU is placed.
int unregister_msu_with_thread(struct local_msu *msu)
Removes an MSU from the list of MSUs within its thread.
uint32_t num_msgs
Number of messages currently in the queue.
int(* init)(struct local_msu *self, struct msu_init_data *initial_data)
Type-specific construction function.
static int msu_receive(struct local_msu *msu, struct msu_msg *msg)
Calls type-specific MSU receive function and records execution time.
struct dedos_thread * thread
The underlying dedos_thread.
int increment_stat(enum stat_id stat_id, unsigned int item_id, double value)
Increments the given statistic by the provided value.
#define log(level, fmt,...)
Log at a custom level.
A message that is to be delivered to an instance of an MSU.
sem_t sem
Locks thread until a message is available.
pthread_rwlock_t msu_registry_lock
Lock to protect access to local msu registry.
int init_msg_queue(struct msg_queue *q, sem_t *sem)
Initilializes a mesasge queue to have no messages in it, and sets up the mutex and semaphore...
char * name
Name for the msu type.
int msu_num_states(struct local_msu *msu)
An endpoint to which an msu_msg can be delivered.
#define log_warn(fmt,...)
struct msu_type * type
Pointer to struct containing information shared across all instances of this type of MSU...
#define RECORD_TIMEDIFF(dstat, rstat, id)
Declares strategies that MSUs can use for routing to endpoints.
int record_end_time(enum stat_id stat_id, unsigned int item_id)
Records the elapsed time since the previous call to record_start_time.