My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
routing.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
26 #include "dfg.h"
27 #include "routing.h"
28 #include "logging.h"
29 #include "runtime_dfg.h"
30 #include "local_msu.h"
31 #include "unused_def.h"
32 #include <stdlib.h>
33 #include <limits.h>
34 #include <pthread.h>
35 
37 #define MAX_DESTINATIONS 128
38 
40 #define MAX_ROUTE_ID 10000
41 
48  int id;
49  int type_id;
50  pthread_rwlock_t rwlock;
55 };
56 
57 #ifdef LOG_ROUTING_CHANGES
58 static void print_routing_table(struct routing_table *table) {
59  char output[1024];
60  int offset = sprintf(output,"\n---------- ROUTE TYPE: %d ----------\n", table->type_id);
61  for (int i=0; i<table->n_endpoints; i++) {
62  struct msu_endpoint *destination = &table->endpoints[i];
63  offset += sprintf(output+offset, "- %4d: %3d\n", destination->id, (int)table->keys[i]);
64  }
65  log(LOG_ROUTING_CHANGES, "%s", output);
66 }
67 #else
68 #define print_routing_table(t)
69 #endif
70 
75 
81 static int read_lock(struct routing_table *table) {
82  return pthread_rwlock_rdlock(&table->rwlock);
83 }
84 
90 static int write_lock(struct routing_table *table) {
91  return pthread_rwlock_wrlock(&table->rwlock);
92 }
93 
99 static int unlock(struct routing_table *table) {
100  return pthread_rwlock_unlock(&table->rwlock);
101 }
102 
110 static int find_value_index(struct routing_table *table, uint32_t value) {
111  //TODO: Binary search?
112  int i = -1;
113  if (table->n_endpoints == 0)
114  return -1;
115  value = value % table->keys[table->n_endpoints-1];
116  for (i=0; i<table->n_endpoints; i++) {
117  if (table->keys[i] > value)
118  return i;
119  }
120  return i;
121 }
122 
130 static int find_id_index(struct routing_table *table, int msu_id) {
131  for (int i=0; i<table->n_endpoints; i++) {
132  if (table->endpoints[i].id == msu_id)
133  return i;
134  }
135  return -1;
136 }
137 
144 static int rm_routing_table_entry(struct routing_table *table, int msu_id) {
145  write_lock(table);
146  int index = find_id_index(table, msu_id);
147  if (index == -1) {
148  log_error("MSU %d does not exist in route %d", msu_id, table->id);
149  unlock(table);
150  return -1;
151  }
152 
153  // Shift destinations after removed index back by one
154  for (int i=index; i<table->n_endpoints-1; ++i) {
155  table->keys[i] = table->keys[i+1];
156  table->endpoints[i] = table->endpoints[i+1];
157  }
158  table->n_endpoints--;
159  unlock(table);
160  log(LOG_ROUTING_CHANGES, "Removed destination %d from table %d (type %d)",
161  msu_id, table->id, table->type_id);
162  print_routing_table(table);
163  return 0;
164 }
165 
175  struct msu_endpoint *dest, uint32_t key) {
176  write_lock(table);
177 
178  int i;
179  if (table->n_endpoints >= MAX_DESTINATIONS) {
180  log_error("Too many endpoints to add to route. Max: %d", MAX_DESTINATIONS);
181  unlock(table);
182  return -1;
183  }
184 
185  // Shift all endpoints greater than the provided endpoint up by one
186  for (i = table->n_endpoints; i > 0 && key < table->keys[i-1]; --i) {
187  table->keys[i] = table->keys[i-1];
188  table->endpoints[i] = table->endpoints[i-1];
189  }
190  table->keys[i] = key;
191  table->endpoints[i] = *dest;
192  table->endpoints[i].route_id = table->id;
193  table->n_endpoints++;
194  unlock(table);
195 
196  log(LOG_ROUTING_CHANGES, "Added destination %d to table %d (type: %d)",
197  dest->id, table->id, table->type_id);
198  print_routing_table(table);
199  return 0;
200 }
201 
209 static int alter_routing_table_entry(struct routing_table *table,
210  int msu_id, uint32_t new_key) {
211  write_lock(table);
212  int index = find_id_index(table, msu_id);
213  if (index == -1) {
214  log_error("MSU %d does not exist in route %d", msu_id, table->id);
215  unlock(table);
216  return -1;
217  }
218  // TODO: Next two steps could be done in one operation
219 
220  int UNUSED orig_key = table->keys[index];
221 
222  struct msu_endpoint endpoint = table->endpoints[index];
223  // Shift indices after removed back
224  for (int i=index; i<table->n_endpoints-1; i++) {
225  table->keys[i] = table->keys[i+1];
226  table->endpoints[i] = table->endpoints[i+1];
227  }
228 
229  // Shift indices after inserted forward
230  int i;
231  for (i = table->n_endpoints-1; i > 0 && new_key < table->keys[i-1]; i--) {
232  table->keys[i] = table->keys[i-1];
233  table->endpoints[i] = table->endpoints[i-1];
234  }
235  table->keys[i] = new_key;
236  table->endpoints[i] = endpoint;
237  unlock(table);
238  log(LOG_ROUTING_CHANGES, "Changed key of msu %d in table %d from %d to %d",
239  msu_id, table->id, orig_key, new_key);
240  print_routing_table(table);
241  return 0;
242 }
243 
250 struct routing_table *init_routing_table(int route_id, int type_id) {
251  struct routing_table *table = calloc(1, sizeof(*table));
252  if (table == NULL) {
253  log_error("Error allocating routing table");
254  return NULL;
255  }
256  table->id = route_id;
257  table->type_id = type_id;
258  pthread_rwlock_init(&table->rwlock, NULL);
259 
260  return table;
261 }
262 
267 static struct routing_table *get_routing_table(int route_id) {
268  if (route_id > MAX_ROUTE_ID) {
269  log_error("Cannot get route with ID > %d", MAX_ROUTE_ID);
270  return NULL;
271  }
272  return all_routes[route_id];
273 }
274 
275 
276 int init_route(int route_id, int type_id) {
277  if (route_id > MAX_ROUTE_ID) {
278  log_error("Cannot initialize route with ID > %d", MAX_ROUTE_ID);
279  return -1;
280  }
281  struct routing_table *table = get_routing_table(route_id);
282  if (table != NULL) {
283  log_error("Route with id %d already exists on runtime", route_id);
284  return -1;
285  }
286  table = init_routing_table(route_id, type_id);
287  if (table == NULL) {
288  log_error("Error initializing routing table %d", route_id);
289  return -1;
290  }
291  all_routes[route_id] = table;
292  log(LOG_ROUTING_CHANGES, "Initialized route %d (type %d)", route_id, type_id);
293  return 0;
294 }
295 
296 // TODO: Redefine, move MAX_MSU_Q_SIZE!!
297 #define MAX_MSU_Q_SIZE INT_MAX
298 
300  uint32_t key, struct msu_endpoint *endpoint) {
301  read_lock(table);
302 
303  unsigned int shortest_queue = MAX_MSU_Q_SIZE;
304  int n_shortest = 0;
305  struct msu_endpoint *endpoints[table->n_endpoints];
306 
307  for (int i=0; i<table->n_endpoints; i++) {
308  if (table->endpoints[i].locality == MSU_IS_REMOTE) {
309  continue;
310  }
311  int length = table->endpoints[i].queue->num_msgs;
312  if (length < shortest_queue) {
313  endpoints[0] = &table->endpoints[i];
314  shortest_queue = length;
315  n_shortest = 1;
316  } else if (length == shortest_queue) {
317  endpoints[n_shortest] = &table->endpoints[i];
318  n_shortest++;
319  }
320  }
321 
322  if (n_shortest == 0) {
323  unlock(table);
324  return -1;
325  }
326 
327  *endpoint = *endpoints[key % (int)n_shortest];
328  unlock(table);
329  return 0;
330 }
331 
332 int get_endpoint_by_index(struct routing_table *table, int index,
333  struct msu_endpoint *endpoint) {
334  int rtn = -1;
335  read_lock(table);
336  if (table->n_endpoints > index) {
337  *endpoint = table->endpoints[index];
338  rtn = 0;
339  }
340  unlock(table);
341  return rtn;
342 }
343 
344 int get_endpoint_by_id(struct routing_table *table, int msu_id,
345  struct msu_endpoint *endpoint) {
346  int rtn = -1;
347  read_lock(table);
348  for (int i=0; i < table->n_endpoints; i++) {
349  if (table->endpoints[i].id == msu_id) {
350  *endpoint = table->endpoints[i];
351  rtn = 0;
352  break;
353  }
354  }
355  unlock(table);
356  return rtn;
357 }
358 
360  struct msu_endpoint *endpoints, int n_endpoints) {
361  int found_endpoints = 0;
362  read_lock(table);
363  for (int i=0; i < table->n_endpoints; i++) {
364  if (table->endpoints[i].runtime_id == runtime_id) {
365  if (n_endpoints <= found_endpoints) {
366  log_error("Not enough endpoints passed in to hold results");
367  unlock(table);
368  return -1;
369  }
370  endpoints[found_endpoints] = table->endpoints[i];
371  found_endpoints++;
372  }
373  }
374  unlock(table);
375  return found_endpoints;
376 }
377 
378 int get_n_endpoints(struct routing_table *table) {
379  read_lock(table);
380  int n_endpoints = table->n_endpoints;
381  unlock(table);
382  return n_endpoints;
383 }
384 
385 int get_route_endpoint(struct routing_table *table, uint32_t key, struct msu_endpoint *endpoint) {
386  read_lock(table);
387  int index = find_value_index(table, key);
388  if (index < 0) {
389  log_error("Could not find index for key %d", key);
390  unlock(table);
391  return -1;
392  }
393  *endpoint = table->endpoints[index];
394  log(LOG_ROUTING_DECISIONS, "Endpoint for key %u is %d", key, endpoint->id);
395  unlock(table);
396  return 0;
397 }
398 
400  for (int i=0; i<set->n_routes; i++) {
401  if (set->routes[i]->type_id == type_id) {
402  return set->routes[i];
403  }
404  }
405  log_error("No route available of type %d", type_id);
406  return NULL;
407 }
408 
409 int add_route_endpoint(int route_id, struct msu_endpoint endpoint, uint32_t key) {
410  log(LOG_ROUTING_CHANGES, "Adding endpoint %d to route %d", endpoint.id, route_id);
411  struct routing_table *table = get_routing_table(route_id);
412  if (table == NULL) {
413  log_error("Route %d does not exist", route_id);
414  return -1;
415  }
416  return add_routing_table_entry(table, &endpoint, key);
417 }
418 
419 int remove_route_endpoint(int route_id, int msu_id) {
420  struct routing_table *table = get_routing_table(route_id);
421  if (table == NULL) {
422  log_error("Route %d does not exist", route_id);
423  return -1;
424  }
425  int rtn = rm_routing_table_entry(table, msu_id);
426  if (rtn == -1) {
427  log_error("Error removing msu %d from route %d", msu_id, route_id);
428  return -1;
429  }
430  log(LOG_ROUTING_CHANGES, "Removed destination %d from route %d", msu_id, route_id);
431  return 0;
432 }
433 
434 int modify_route_endpoint(int route_id, int msu_id, uint32_t new_key) {
435  struct routing_table *table = get_routing_table(route_id);
436  if (table == NULL) {
437  log_error("Route %d does not exist", route_id);
438  return -1;
439  }
440  int rtn = alter_routing_table_entry(table, msu_id, new_key);
441  if (rtn < 0) {
442  log_error("Error altering routing for msu %d on route %d", msu_id, route_id);
443  return -1;
444  }
445  log(LOG_ROUTING_CHANGES, "Altered key %d for msu %d in route %d",
446  new_key, msu_id, route_id);
447  return 0;
448 }
449 
450 int add_route_to_set(struct route_set *set, int route_id) {
451  struct routing_table *table = get_routing_table(route_id);
452  if (table == NULL) {
453  log_error("Route %d does not exist", route_id);
454  return -1;
455  }
456  set->routes = realloc(set->routes, sizeof(*set->routes) * (set->n_routes + 1));
457  if (set->routes == NULL) {
458  log_error("Error reallocating routes");
459  return -1;
460  }
461  set->routes[set->n_routes] = table;
462  set->n_routes++;
463  return 0;
464 }
465 int rm_route_from_set(struct route_set *set, int route_id) {
466  int i;
467  for (i=0; i<set->n_routes; i++) {
468  if (set->routes[i]->id == route_id) {
469  break;
470  }
471  }
472  if (i == set->n_routes) {
473  log_error("route %d does not exist in set", route_id);
474  return -1;
475  }
476  for (; i<set->n_routes - 1; i++) {
477  set->routes[i] = set->routes[i+1];
478  }
479  set->n_routes--;
480  return 0;
481 }
482 
483 int init_msu_endpoint(int msu_id, int runtime_id, struct msu_endpoint *endpoint) {
484  endpoint->id = msu_id;
485  int local_id = local_runtime_id();
486  if (local_id < 0) {
487  log_error("Cannot get local runtime ID");
488  return -1;
489  }
490  if (runtime_id == local_id) {
491  endpoint->locality = MSU_IS_LOCAL;
492  struct local_msu *msu = get_local_msu(msu_id);
493  if (msu == NULL) {
494  log_error("Cannot find local MSU with id %d for initializing endpoint",
495  msu_id);
496  return -1;
497  }
498  endpoint->queue = &msu->queue;
499  endpoint->runtime_id = runtime_id;
500  } else {
501  endpoint->locality = MSU_IS_REMOTE;
502  endpoint->runtime_id = runtime_id;
503  }
504  return 0;
505 }
506 
507 
int init_msu_endpoint(int msu_id, int runtime_id, struct msu_endpoint *endpoint)
Initializes an endpoint structure to point to the relevant msu.
Definition: routing.c:483
int add_route_endpoint(int route_id, struct msu_endpoint endpoint, uint32_t key)
Adds an endpoint to the route with the given ID.
Definition: routing.c:409
int init_route(int route_id, int type_id)
Initializes a new route with the given route_id and type_id.
Definition: routing.c:276
int get_endpoint_by_id(struct routing_table *table, int msu_id, struct msu_endpoint *endpoint)
Gets endpoint with the given MSU ID in the provided route.
Definition: routing.c:344
The publicly accessible copy of the routing table.
Definition: routing.h:55
int id
A unque identifier for the endpoint (msu ID)
Definition: routing.h:34
unsigned int runtime_id
The ID for the runtime on which the endpoint resides.
Definition: routing.h:40
Macro for declaring functions or variables as unused to avoid compiler warnings.
static int rm_routing_table_entry(struct routing_table *table, int msu_id)
Removes a destination from the routing table.
Definition: routing.c:144
The core of the routing system, the routing table lists a route's destinations.
Definition: routing.c:47
int n_endpoints
The number of destinations this route contains.
Definition: routing.c:52
pthread_rwlock_t rwlock
Protects access to the destinations so they cannot be changed while they are being read...
Definition: routing.c:50
static int alter_routing_table_entry(struct routing_table *table, int msu_id, uint32_t new_key)
Modifies the key associated with an existing destination in a routing table.
Definition: routing.c:209
static struct routing_table * get_routing_table(int route_id)
Gets the routing table associated with the route_id.
Definition: routing.c:267
uint32_t keys[128]
The keys associated with each of the destinations.
Definition: routing.c:53
int add_routing_table_entry(struct routing_table *table, struct msu_endpoint *dest, uint32_t key)
Adds a (copy of the) destination to a routing table.
Definition: routing.c:174
struct routing_table ** routes
Definition: routing.h:57
int get_route_endpoint(struct routing_table *table, uint32_t key, struct msu_endpoint *endpoint)
Gets the endpoint associated with the given key in the provided route.
Definition: routing.c:385
Logging of status messages to the terminal.
#define MAX_ROUTE_ID
The maximum ID that may be assigned to a route.
Definition: routing.c:40
static struct routing_table * all_routes[10000]
All of the created routes in this runtime.
Definition: routing.c:74
int rm_route_from_set(struct route_set *set, int route_id)
Removes a route from a set of routes.
Definition: routing.c:465
Interactions with global dfg from individual runtime.
static int unlock(struct routing_table *table)
Simple wrapper to unlock a locked routing table.
Definition: routing.c:99
#define MAX_MSU_Q_SIZE
Definition: routing.c:297
struct msg_queue * queue
if msu_endpoint::locality == MSU_IS_LOCAL, the queue for the msu endpoint
Definition: routing.h:42
int get_endpoints_by_runtime_id(struct routing_table *table, int runtime_id, struct msu_endpoint *endpoints, int n_endpoints)
Gets all of the endpoints in the provided routing table with the right runtime id.
Definition: routing.c:359
#define MAX_DESTINATIONS
The maximum number of destinations a route can have.
Definition: routing.c:37
int type_id
The type-id associated with the destinations in this table.
Definition: routing.c:49
#define log_error(fmt,...)
Definition: logging.h:101
Declares the structures and functions applicable to MSUs on the local machine.
static int write_lock(struct routing_table *table)
Simple wrapper to lock a routing table for writing.
Definition: routing.c:90
struct local_msu * get_local_msu(unsigned int id)
Gets the local MSU with the given ID, or NULL if N/A.
Definition: local_msu.c:134
The structure that represents an MSU located on the local machine.
Definition: local_msu.h:38
int local_runtime_id()
Definition: runtime_dfg.c:91
int add_route_to_set(struct route_set *set, int route_id)
Adds a route to a set of routes.
Definition: routing.c:450
Functions for routing MSU messages between MSUs.
enum msu_locality locality
Whether the endpoint is on the local machine or remote.
Definition: routing.h:36
int get_n_endpoints(struct routing_table *table)
Definition: routing.c:378
struct routing_table * init_routing_table(int route_id, int type_id)
Initializes and returns a new routing table.
Definition: routing.c:250
struct msg_queue queue
Input queue to MSU.
Definition: local_msu.h:60
int remove_route_endpoint(int route_id, int msu_id)
Removes destination from route with given ID.
Definition: routing.c:419
#define print_routing_table(t)
Definition: routing.c:68
struct msu_endpoint endpoints[128]
The destinations themselves.
Definition: routing.c:54
#define UNUSED
int modify_route_endpoint(int route_id, int msu_id, uint32_t new_key)
Modifies key associated with MSU in route.
Definition: routing.c:434
uint32_t num_msgs
Number of messages currently in the queue.
Definition: message_queue.h:57
struct routing_table * get_type_from_route_set(struct route_set *set, int type_id)
Gets the route from the provided array of routes which has the correct type ID.
Definition: routing.c:399
Interfaces for the creation and modification of the data-flow-graph and and general description of th...
unsigned int route_id
The ID for the route used to get to the endpoint.
Definition: routing.h:38
int get_endpoint_by_index(struct routing_table *table, int index, struct msu_endpoint *endpoint)
Gets endpoint witih given index in route set.
Definition: routing.c:332
static int find_id_index(struct routing_table *table, int msu_id)
Finds the index of the entry in the routing table with the destination that has the provided msu id...
Definition: routing.c:130
static int runtime_id(int runtime_fd)
unsigned int uint32_t
Definition: uthash.h:96
#define log(level, fmt,...)
Log at a custom level.
Definition: logging.h:147
static int read_lock(struct routing_table *table)
Simple wrapper to lock a routing table for reading.
Definition: routing.c:81
static int find_value_index(struct routing_table *table, uint32_t value)
Searches through the entries in the routing table, returning the index of the first key which surpass...
Definition: routing.c:110
An endpoint to which an msu_msg can be delivered.
Definition: routing.h:32
int get_shortest_queue_endpoint(struct routing_table *table, uint32_t key, struct msu_endpoint *endpoint)
Gets the local endpoint from a route with the shortest queue length.
Definition: routing.c:299
int n_routes
Definition: routing.h:56