My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
local_msu.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
25 #include "local_msu.h"
26 #include "routing_strategies.h"
27 #include "logging.h"
28 #include "rt_stats.h"
29 #include "msu_message.h"
30 #include "inter_runtime_messages.h"
31 #include "thread_message.h"
32 #include "msu_state.h"
33 #include "msu_calls.h"
34 
35 #include <stdlib.h>
36 #define __USE_GNU // For some reason, necessary for RUSAGE_THREAD
37 #include <sys/resource.h>
38 #include <sys/time.h>
44 #define MAX_MSU_ID 1024
45 
46 // TODO: This lock might not be useful, as I believe this registry will
47 // TODO: only ever be accessed from the socket handler thread
48 
50 pthread_rwlock_t msu_registry_lock;
53 
58 static struct local_msu *msu_alloc() {
59  struct local_msu *msu = calloc(1, sizeof(*msu));
60  if (msu == NULL) {
61  log_error("Failed to allocate MSU");
62  return NULL;
63  }
64  return msu;
65 }
66 
71 static void msu_free(struct local_msu *msu) {
72  struct msu_msg *msg = dequeue_msu_msg(&msu->queue);
73  while (msg != NULL) {
74  if (msg->data_size > 0) {
75  free(msg->data);
76  }
77  free(msg);
78  msg = dequeue_msu_msg(&msu->queue);
79  }
80  msu_free_all_state(msu);
81  free(msu->routes.routes);
82  free(msu);
83 }
84 
90 static int rm_from_local_registry(int id) {
91  if (pthread_rwlock_wrlock(&msu_registry_lock) != 0) {
92  log_perror("Error opening write lock on msu registry");
93  return -1;
94  }
95  int rtn = 0;
96  if (local_msu_registry[id] == NULL) {
97  log_warn("MSU with id %d does not exist and cannot be deleted", id);
98  rtn = -1;
99  } else {
100  local_msu_registry[id] = NULL;
101  }
102  if (pthread_rwlock_unlock(&msu_registry_lock) != 0) {
103  log_perror("Error unlocking msu registry");
104  return -1;
105  }
106  return rtn;
107 }
108 
114 static int add_to_local_registry(struct local_msu *msu) {
115  if (pthread_rwlock_wrlock(&msu_registry_lock) != 0) {
116  log_perror("Error opening write lock on msu registry");
117  return -1;
118  }
119  int rtn = 0;
120  if (local_msu_registry[msu->id] != NULL) {
121  log_error("MSU with id %d already exists and cannot be added to registry", msu->id);
122  rtn = -1;
123  } else {
124  local_msu_registry[msu->id] = msu;
125  }
126  if (pthread_rwlock_unlock(&msu_registry_lock) != 0) {
127  log_perror("Error unlocking msu registry");
128  return -1;
129  }
130  return rtn;
131 }
132 
133 
134 struct local_msu *get_local_msu(unsigned int id) {
135  if (id >= MAX_MSU_ID) {
136  log_error("MSU id %u too high!", id);
137  return NULL;
138  }
139  if (pthread_rwlock_rdlock(&msu_registry_lock) != 0) {
140  log_perror("Error opening read lock on MSU registry");
141  return NULL;
142  }
143  struct local_msu *msu = local_msu_registry[id];
144  if (pthread_rwlock_unlock(&msu_registry_lock) != 0) {
145  log_perror("Error unlocking msu registry");
146  return NULL;
147  }
148  if (msu == NULL) {
149  log_error("Could not get local msu with id %d. Not registered.", id);
150  }
151  return msu;
152 }
153 
155 static enum stat_id MSU_STAT_IDS[] = {
163  MSU_UCPUTIME,
164  MSU_SCPUTIME,
165  MSU_MINFLT,
166  MSU_MAJFLT,
167  MSU_VCSW,
168  MSU_IVCSW
169 };
170 
171 #define NUM_MSU_STAT_IDS sizeof(MSU_STAT_IDS) / sizeof(enum stat_id)
172 
177 static void init_msu_stats(int msu_id) {
178  for (int i=0; i<NUM_MSU_STAT_IDS; i++) {
179  if (init_stat_item(MSU_STAT_IDS[i], msu_id) != 0) {
180  log_warn("Could not initialize stat item %d for msu %d", MSU_STAT_IDS[i], msu_id);
181  }
182  }
183 }
184 
189 static void unregister_msu_stats(int msu_id) {
190  for (int i=0; i < NUM_MSU_STAT_IDS; i++) {
191  if (remove_stat_item(MSU_STAT_IDS[i], msu_id) != 0) {
192  log_warn("Could not remove stat item %d for msu %d",
193  MSU_STAT_IDS[i], msu_id);
194  }
195  }
196 }
197 
198 struct local_msu *init_msu(unsigned int id,
199  struct msu_type *type,
200  struct worker_thread *thread,
201  struct msu_init_data *data) {
202  struct local_msu *msu = msu_alloc();
203  init_msu_stats(id);
204  msu->thread = thread;
205  msu->id = id;
206  msu->type = type;
207  msu->scheduling_weight = 0;
208 
209  if (init_msg_queue(&msu->queue, &thread->thread->sem) != 0) {
210  msu_free(msu);
211  log_error("Error initializing msu queue");
212  return NULL;
213  }
214 
215  // Must be done before running init function, or the msu cannot enqueue to itself
216  int rtn = register_msu_with_thread(msu);
217  if (rtn < 0) {
218  log_error("Error registering MSU With thread");
219  msu_free(msu);
220  return NULL;
221  }
222 
223  // TODO: Unregister if creation fails
224  log_info("Initializing msu (ID: %d, type: %s, data: '%s')", id, type->name,
225  data->init_data);
226 
227  // Run the MSU's type-specific init function if it has one
228  if (type->init) {
229  if (type->init(msu, data) != 0) {
230  log_error("Error running MSU %d (type: %s) type-specific initialization function",
231  id, type->name);
233  msu_free(msu);
234  return NULL;
235  }
236  }
237 
238  rtn = add_to_local_registry(msu);
239  if (rtn < 0) {
240  log_error("Error adding MSU to local registry");
241  msu_free(msu);
243  return NULL;
244  }
245 
246  return msu;
247 }
248 
249 int try_destroy_msu(struct local_msu *msu) {
250  if (msu_num_states(msu) > 0) {
251  return 1;
252  }
253  destroy_msu(msu);
254  return 0;
255 }
256 
257 void destroy_msu(struct local_msu *msu) {
258  int id = msu->id;
259  char *type = msu->type->name;
260  unregister_msu_stats(msu->id);
261  if (msu->type->destroy) {
262  msu->type->destroy(msu);
263  }
264  msu_free(msu);
266  log_info("Removed msu (ID: %d, Type: %s)", id, type);
267 }
268 
275 static int msu_receive(struct local_msu *msu, struct msu_msg *msg) {
276 
279  int rtn = msu->type->receive(msu, msg);
282 
283  if (rtn != 0) {
284  log_error("Error executing MSU %d (%s) receive function",
285  msu->id, msu->type->name);
286  return -1;
287  }
288  return 0;
289 }
290 
291 static inline int gather_metrics_before(struct rusage *before) {
292  int rtn = getrusage(RUSAGE_THREAD, before);
293  if (rtn < 0) {
294  log_error("Error getting MSU rusage");
295  }
296  return rtn;
297 }
298 
299 #define RECORD_DIFF(dstat, rstat, id) \
300  increment_stat(dstat, id, after.rstat - before->rstat)
301 
302 #define RECORD_TIMEDIFF(dstat, rstat, id) \
303  increment_stat(dstat, id, ((double)after.rstat.tv_sec + after.rstat.tv_usec * 1e-6) - \
304  ((double)before->rstat.tv_sec + before->rstat.tv_usec * 1e-6))
305 
306 static inline void record_metrics(struct rusage *before, int msu_id) {
307  struct rusage after;
308  int rtn = getrusage(RUSAGE_THREAD, &after);
309  if (rtn < 0) {
310  log_error("Error getting MSU rusage");
311  return;
312  }
313 
314  RECORD_TIMEDIFF(MSU_UCPUTIME, ru_utime, msu_id);
315  RECORD_TIMEDIFF(MSU_SCPUTIME, ru_stime, msu_id);
316  RECORD_DIFF(MSU_MINFLT, ru_minflt, msu_id);
317  RECORD_DIFF(MSU_MAJFLT, ru_majflt, msu_id);
318  RECORD_DIFF(MSU_VCSW, ru_nvcsw, msu_id);
319  RECORD_DIFF(MSU_IVCSW, ru_nivcsw, msu_id);
320 }
321 
322 
323 int msu_dequeue(struct local_msu *msu) {
324  struct msu_msg *msg = dequeue_msu_msg(&msu->queue);
325  if (msg) {
326  if (msg->hdr.error_flag) {
327  if (msu->type->receive_error == NULL) {
328  log_error("MSU %d received error with no handler", msu->id);
329  return -1;
330  }
331  int rtn = msu->type->receive_error(msu, msg);
332  if (rtn < 0) {
333  log_error("Error executing MSU error receive function");
334  return -1;
335  }
336  } else {
337  log(LOG_MSU_DEQUEUES, "Dequeued MSU message %p for msu %d", msg, msu->id);
338  struct rusage before;
339  record_stat(MSU_QUEUE_LEN, msu->id, msu->queue.num_msgs, false);
340  int gather_err = gather_metrics_before(&before);
341  int rtn = msu_receive(msu, msg);
342  if (gather_err == 0) {
343  record_metrics(&before, msu->id);
344  }
346  free(msg);
347  return rtn;
348  }
349  }
350  return 1;
351 }
352 
353 int msu_error(struct local_msu *msu, struct msu_msg_hdr *hdr, int broadcast) {
354 
355  increment_stat(MSU_ERROR_CNT, msu->id, 1);
356 
357  if (!broadcast) {
358  return 0;
359  }
360 
361  for (int i=0; i < hdr->provinance.path_len; i++) {
362  struct msu_provinance_item *upstream = &hdr->provinance.path[i];
363  struct msu_type *up_type = get_msu_type(upstream->type_id);
364  if (up_type == NULL) {
365  log_error("Cannot get type %d", upstream->type_id);
366  continue;
367  }
368  if (up_type->receive_error == NULL) {
369  continue;
370  }
371  struct msu_endpoint receiver;
372  int rtn = init_msu_endpoint(upstream->msu_id, upstream->runtime_id, &receiver);
373  if (rtn < 0) {
374  log_error("Error initializing msu endpoint");
375  continue;
376  }
377  rtn = call_msu_error(msu, &receiver, up_type, hdr, 0, NULL);
378  if (rtn < 0) {
379  log_error("Error calling MSU endpoint for error report");
380  continue;
381  }
382  }
383  return 0;
384 }
Item in the chain of history kept track of in each MSU.
Definition: msu_message.h:59
int init_msu_endpoint(int msu_id, int runtime_id, struct msu_endpoint *endpoint)
Initializes an endpoint structure to point to the relevant msu.
Definition: routing.c:483
void(* destroy)(struct local_msu *self)
Type-specific destructor that frees any internal data or state.
Definition: msu_type.h:81
int msu_dequeue(struct local_msu *msu)
Dequeus a message from a local MSU and calls its receive function.
Definition: local_msu.c:323
Collecting statistics within the runtime.
#define MAX_MSU_ID
MOVEME: MAX_MSU_ID Defines the maximum ID that can be assigned to an MSU.
Definition: local_msu.c:44
int msu_error(struct local_msu *msu, struct msu_msg_hdr *hdr, int broadcast)
Definition: local_msu.c:353
void msu_free_all_state(struct local_msu *msu)
Frees all state structures associated with the given MSU.
Definition: msu_state.c:117
Header for messages passed to MSUs.
Definition: msu_message.h:85
Messages to be delivered to dedos_threads.
#define log_info(fmt,...)
Definition: logging.h:88
int register_msu_with_thread(struct local_msu *msu)
Registers an MSU as one that should be run on its assigned thread.
int error_flag
0 if no error has been encountered
Definition: msu_message.h:91
static void init_msu_stats(int msu_id)
Initializes the stat IDS that are relevant to an MSU.
Definition: local_msu.c:177
Representation of a thread that holds MSUs, messages, and waits on a semaphore.
Definition: worker_thread.h:40
unsigned int scheduling_weight
The number of items that should be dequeued on this MSU each tick.
Definition: local_msu.h:57
struct msu_msg * dequeue_msu_msg(struct msg_queue *q)
Dequeues an MSU message from the provided message queue.
Definition: msu_message.c:124
State storage that is tied to a specific MSU mesasge.
static int gather_metrics_before(struct rusage *before)
Definition: local_msu.c:291
stat_id
The identifiers with which stats can be logged.
Definition: stat_ids.h:32
static int rm_from_local_registry(int id)
Removes an MSU from the local MSU registry.
Definition: local_msu.c:90
void * data
Payload.
Definition: msu_message.h:104
struct routing_table ** routes
Definition: routing.h:57
#define log_perror(fmt,...)
Definition: logging.h:102
static enum stat_id MSU_STAT_IDS[]
The stat IDs that are associated with an MSU, to be registered on MSU creation.
Definition: local_msu.c:155
int record_start_time(enum stat_id stat_id, unsigned int item_id)
Starts a measurement of elapsed time.
Definition: rt_stats.c:320
int call_msu_error(struct local_msu *sender, struct msu_endpoint *endpoint, struct msu_type *endpoint_type, struct msu_msg_hdr *hdr, size_t data_size, void *data)
Definition: msu_calls.c:271
struct msu_type * get_msu_type(int id)
Gets the MSU type with the provided ID.
Definition: msu_type.c:80
Logging of status messages to the terminal.
unsigned int type_id
Definition: msu_message.h:60
unsigned int id
Unique ID for a local MSU.
Definition: local_msu.h:54
static int add_to_local_registry(struct local_msu *msu)
Adds an MSU to the local registry so it can be referred to elsewhere by ID.
Definition: local_msu.c:114
unsigned int runtime_id
Definition: msu_message.h:62
void destroy_msu(struct local_msu *msu)
Calls type-specific destroy function and frees associated memory.
Definition: local_msu.c:257
static void msu_free(struct local_msu *msu)
Frees the memory associated with an MSU structure, including any routes, messages in its queue...
Definition: local_msu.c:71
int(* receive)(struct local_msu *self, struct msu_msg *msg)
Handles the receiving of data sent from other MSUs.
Definition: msu_type.h:89
Declares the methods available for calling an MSU from another MSU.
struct route_set routes
Routing table set, containing all destination MSUs (see routing.h for details)
Definition: local_msu.h:51
static void record_metrics(struct rusage *before, int msu_id)
Definition: local_msu.c:306
size_t data_size
Payload size.
Definition: msu_message.h:103
struct msg_provinance provinance
Message history.
Definition: msu_message.h:89
struct local_msu * local_msu_registry[1024]
Mapping of MSU ID to the specific instance of the local MSU.
Definition: local_msu.c:52
int(* receive_error)(struct local_msu *self, struct msu_msg *msg)
Definition: msu_type.h:91
#define log_error(fmt,...)
Definition: logging.h:101
Declares the structures and functions applicable to MSUs on the local machine.
int init_stat_item(enum stat_id stat_id, unsigned int item_id)
Initializes a stat item so that statistics can be logged to it.
Definition: rt_stats.c:719
struct local_msu * init_msu(unsigned int id, struct msu_type *type, struct worker_thread *thread, struct msu_init_data *data)
Allocates and creates a new MSU of the specified type and ID on the given thread. ...
Definition: local_msu.c:198
static struct local_msu * msu_alloc()
Allocates the memory associated with an MSU structure.
Definition: local_msu.c:58
struct local_msu * get_local_msu(unsigned int id)
Gets the local MSU with the given ID, or NULL if N/A.
Definition: local_msu.c:134
int remove_stat_item(enum stat_id stat_id, unsigned int item_id)
Un-registers an item so it can no longer have statistics registered, and will not be reported to the ...
Definition: rt_stats.c:683
int path_len
The current length of msg_provinance::path.
Definition: msu_message.h:78
static void unregister_msu_stats(int msu_id)
Unregisters the stat IDS that are relevant to an MSU.
Definition: local_msu.c:189
struct msu_provinance_item path[8]
A list of each MSU that has seen this message TODO: For now, one MSU of each type.
Definition: msu_message.h:76
The structure that represents an MSU located on the local machine.
Definition: local_msu.h:38
#define NUM_MSU_STAT_IDS
Definition: local_msu.c:171
int try_destroy_msu(struct local_msu *msu)
Destroys the MSU, but only if it has no states currently saved.
Definition: local_msu.c:249
Data with which an MSU is initialized, and the payload for messages of type CTRL_CREATE_MSU.
Definition: dfg.h:66
#define RECORD_DIFF(dstat, rstat, id)
Definition: local_msu.c:299
struct msg_queue queue
Input queue to MSU.
Definition: local_msu.h:60
Definitions of the message types that can be passed between runtimes.
int record_stat(enum stat_id stat_id, unsigned int item_id, double stat, bool relog)
Records a statistic in the statlog.
Definition: rt_stats.c:426
struct worker_thread * thread
The worker thread on which this MSU is placed.
Definition: local_msu.h:69
int unregister_msu_with_thread(struct local_msu *msu)
Removes an MSU from the list of MSUs within its thread.
Defines a type of MSU.
Definition: msu_type.h:46
uint32_t num_msgs
Number of messages currently in the queue.
Definition: message_queue.h:57
int(* init)(struct local_msu *self, struct msu_init_data *initial_data)
Type-specific construction function.
Definition: msu_type.h:75
static int msu_receive(struct local_msu *msu, struct msu_msg *msg)
Calls type-specific MSU receive function and records execution time.
Definition: local_msu.c:275
struct msu_msg_hdr hdr
Definition: msu_message.h:102
unsigned int msu_id
Definition: msu_message.h:61
struct dedos_thread * thread
The underlying dedos_thread.
Definition: worker_thread.h:42
int increment_stat(enum stat_id stat_id, unsigned int item_id, double value)
Increments the given statistic by the provided value.
Definition: rt_stats.c:389
char init_data[64]
Definition: dfg.h:67
#define log(level, fmt,...)
Log at a custom level.
Definition: logging.h:147
A message that is to be delivered to an instance of an MSU.
Definition: msu_message.h:101
sem_t sem
Locks thread until a message is available.
Definition: dedos_threads.h:45
Messages passed to MSUs.
pthread_rwlock_t msu_registry_lock
Lock to protect access to local msu registry.
Definition: local_msu.c:50
int init_msg_queue(struct msg_queue *q, sem_t *sem)
Initilializes a mesasge queue to have no messages in it, and sets up the mutex and semaphore...
char * name
Name for the msu type.
Definition: msu_type.h:48
int msu_num_states(struct local_msu *msu)
Definition: msu_state.c:51
An endpoint to which an msu_msg can be delivered.
Definition: routing.h:32
#define log_warn(fmt,...)
Definition: logging.h:113
struct msu_type * type
Pointer to struct containing information shared across all instances of this type of MSU...
Definition: local_msu.h:45
#define RECORD_TIMEDIFF(dstat, rstat, id)
Definition: local_msu.c:302
Declares strategies that MSUs can use for routing to endpoints.
int record_end_time(enum stat_id stat_id, unsigned int item_id)
Records the elapsed time since the previous call to record_start_time.
Definition: rt_stats.c:355