Line data Source code
1 : /*
2 : START OF LICENSE STUB
3 : DeDOS: Declarative Dispersion-Oriented Software
4 : Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 :
6 : This program is free software: you can redistribute it and/or modify
7 : it under the terms of the GNU General Public License as published by
8 : the Free Software Foundation, either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : This program is distributed in the hope that it will be useful,
12 : but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : GNU General Public License for more details.
15 :
16 : You should have received a copy of the GNU General Public License
17 : along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : END OF LICENSE STUB
19 : */
20 : /**
21 : * @file routing.c
22 : *
23 : * Functions for routing MSU messages between MSUs
24 : */
25 :
26 : #include "dfg.h"
27 : #include "routing.h"
28 : #include "logging.h"
29 : #include "runtime_dfg.h"
30 : #include "local_msu.h"
31 : #include "unused_def.h"
32 : #include <stdlib.h>
33 : #include <limits.h>
34 : #include <pthread.h>
35 :
36 : /** The maximum number of destinations a route can have */
37 : #define MAX_DESTINATIONS 128
38 :
39 : /** The maximum ID that may be assigned to a route */
40 : #define MAX_ROUTE_ID 10000
41 :
42 : /**
43 : * The core of the routing system, the routing table lists a route's destinations.
44 : * The routing_table is kept private so the rwlock can be enfoced.
45 : * All destinations in a routing table must have the same type_id.
46 : */
47 : struct routing_table{
48 : int id;
49 : int type_id; /**< The type-id associated with the destinations in this table */
50 : pthread_rwlock_t rwlock; /**< Protects access to the destinations so they cannot be changed
51 : while they are being read */
52 : int n_endpoints; /**< The number of destinations this route contains */
53 : uint32_t keys[MAX_DESTINATIONS]; /**< The keys associated with each of the destinations */
54 : struct msu_endpoint endpoints[MAX_DESTINATIONS]; /**< The destinations themselves */
55 : };
56 :
57 : #ifdef LOG_ROUTING_CHANGES
58 : static void print_routing_table(struct routing_table *table) {
59 : char output[1024];
60 : int offset = sprintf(output,"\n---------- ROUTE TYPE: %d ----------\n", table->type_id);
61 : for (int i=0; i<table->n_endpoints; i++) {
62 : struct msu_endpoint *destination = &table->endpoints[i];
63 : offset += sprintf(output+offset, "- %4d: %3d\n", destination->id, (int)table->keys[i]);
64 : }
65 : log(LOG_ROUTING_CHANGES, "%s", output);
66 : }
67 : #else
68 : #define print_routing_table(t)
69 : #endif
70 :
71 : /**
72 : * All of the created routes in this runtime
73 : */
74 : static struct routing_table *all_routes[MAX_ROUTE_ID];
75 :
76 : /**
77 : * Simple wrapper to lock a routing table for reading
78 : * @param table the routing table to be locked for reading
79 : * @return 0 on success, -1 on error
80 : */
81 8 : static int read_lock(struct routing_table *table) {
82 8 : return pthread_rwlock_rdlock(&table->rwlock);
83 : }
84 :
85 : /**
86 : * Simple wrapper to lock a routing table for writing
87 : * @param table the routing table to be locked for writing
88 : * @return 0 on success, -1 on error
89 : */
90 6 : static int write_lock(struct routing_table *table) {
91 6 : return pthread_rwlock_wrlock(&table->rwlock);
92 : }
93 :
94 : /**
95 : * Simple wrapper to unlock a locked routing table
96 : * @param table the routing table to be unlocked
97 : * @return 0 on success, -1 on error
98 : */
99 14 : static int unlock(struct routing_table *table) {
100 14 : return pthread_rwlock_unlock(&table->rwlock);
101 : }
102 :
103 : /**
104 : * Searches through the entries in the routing table, returning the index of the first key
105 : * which surpasses the provided value
106 : * @param table the table to search
107 : * @param value the value to use in the search
108 : * @return the index of the destination with the appropriate value
109 : */
110 10 : static int find_value_index(struct routing_table *table, uint32_t value) {
111 : //TODO: Binary search?
112 10 : int i = -1;
113 10 : if (table->n_endpoints == 0)
114 0 : return -1;
115 10 : value = value % table->keys[table->n_endpoints-1];
116 21 : for (i=0; i<table->n_endpoints; i++) {
117 21 : if (table->keys[i] > value)
118 10 : return i;
119 : }
120 0 : return i;
121 : }
122 :
123 : /**
124 : * Finds the index of the entry in the routing table with the destination that has the provided
125 : * msu id
126 : * @param table the table to search
127 : * @param msu_id the ID to search for in the table
128 : * @return the index of the destination with the appropriate msu id
129 : */
130 6 : static int find_id_index(struct routing_table *table, int msu_id) {
131 13 : for (int i=0; i<table->n_endpoints; i++) {
132 12 : if (table->endpoints[i].id == msu_id)
133 5 : return i;
134 : }
135 1 : return -1;
136 : }
137 :
138 : /**
139 : * Removes a destination from the routing table
140 : * @param table The table from which to remove the entry
141 : * @param msu_id The ID of the msu to remove from the table
142 : * @return 0 on success, -1 on error
143 : */
144 1 : static int rm_routing_table_entry(struct routing_table *table, int msu_id) {
145 1 : write_lock(table);
146 1 : int index = find_id_index(table, msu_id);
147 1 : if (index == -1) {
148 0 : log_error("MSU %d does not exist in route %d", msu_id, table->id);
149 0 : unlock(table);
150 0 : return -1;
151 : }
152 :
153 : // Shift destinations after removed index back by one
154 2 : for (int i=index; i<table->n_endpoints-1; ++i) {
155 1 : table->keys[i] = table->keys[i+1];
156 1 : table->endpoints[i] = table->endpoints[i+1];
157 : }
158 1 : table->n_endpoints--;
159 1 : unlock(table);
160 1 : log(LOG_ROUTING_CHANGES, "Removed destination %d from table %d (type %d)",
161 : msu_id, table->id, table->type_id);
162 : print_routing_table(table);
163 1 : return 0;
164 : }
165 :
166 : /**
167 : * Adds a (copy of the) destination to a routing table.
168 : * Note: Kept non-static but excluded from header so it can be used for testing
169 : * @param table The table to which the destination is to be added
170 : * @param destination The endpoint to add (a copy is made)
171 : * @param key The key to be associated with this destination
172 : * @return 0 on success, -1 on error
173 : */
174 4 : int add_routing_table_entry(struct routing_table *table,
175 : struct msu_endpoint *dest, uint32_t key) {
176 4 : write_lock(table);
177 :
178 : int i;
179 4 : if (table->n_endpoints >= MAX_DESTINATIONS) {
180 0 : log_error("Too many endpoints to add to route. Max: %d", MAX_DESTINATIONS);
181 0 : unlock(table);
182 0 : return -1;
183 : }
184 :
185 : // Shift all endpoints greater than the provided endpoint up by one
186 5 : for (i = table->n_endpoints; i > 0 && key < table->keys[i-1]; --i) {
187 1 : table->keys[i] = table->keys[i-1];
188 1 : table->endpoints[i] = table->endpoints[i-1];
189 : }
190 4 : table->keys[i] = key;
191 4 : table->endpoints[i] = *dest;
192 4 : table->endpoints[i].route_id = table->id;
193 4 : table->n_endpoints++;
194 4 : unlock(table);
195 :
196 4 : log(LOG_ROUTING_CHANGES, "Added destination %d to table %d (type: %d)",
197 : dest->id, table->id, table->type_id);
198 : print_routing_table(table);
199 4 : return 0;
200 : }
201 :
202 : /**
203 : * Modifies the key associated with an existing destination in a routing table
204 : * @param table The routing table to modify
205 : * @param msu_id The ID of the destination to modify
206 : * @param new_key The new key that should be associated with this destination
207 : * @return 0 on success, -1 on error
208 : */
209 1 : static int alter_routing_table_entry(struct routing_table *table,
210 : int msu_id, uint32_t new_key) {
211 1 : write_lock(table);
212 1 : int index = find_id_index(table, msu_id);
213 1 : if (index == -1) {
214 0 : log_error("MSU %d does not exist in route %d", msu_id, table->id);
215 0 : unlock(table);
216 0 : return -1;
217 : }
218 : // TODO: Next two steps could be done in one operation
219 :
220 1 : int UNUSED orig_key = table->keys[index];
221 :
222 1 : struct msu_endpoint endpoint = table->endpoints[index];
223 : // Shift indices after removed back
224 3 : for (int i=index; i<table->n_endpoints-1; i++) {
225 2 : table->keys[i] = table->keys[i+1];
226 2 : table->endpoints[i] = table->endpoints[i+1];
227 : }
228 :
229 : // Shift indices after inserted forward
230 : int i;
231 2 : for (i = table->n_endpoints-1; i > 0 && new_key < table->keys[i-1]; i--) {
232 1 : table->keys[i] = table->keys[i-1];
233 1 : table->endpoints[i] = table->endpoints[i-1];
234 : }
235 1 : table->keys[i] = new_key;
236 1 : table->endpoints[i] = endpoint;
237 1 : unlock(table);
238 1 : log(LOG_ROUTING_CHANGES, "Changed key of msu %d in table %d from %d to %d",
239 : msu_id, table->id, orig_key, new_key);
240 : print_routing_table(table);
241 1 : return 0;
242 : }
243 :
244 : /**
245 : * Initializes and returns a new routing table.
246 : * Note: Not for external access! Use init_route()
247 : * Note: Kept non-static (but exclued from header) so it can be accessed during testing
248 : * @returns The new routing table
249 : */
250 28 : struct routing_table *init_routing_table(int route_id, int type_id) {
251 28 : struct routing_table *table = calloc(1, sizeof(*table));
252 28 : if (table == NULL) {
253 0 : log_error("Error allocating routing table");
254 0 : return NULL;
255 : }
256 28 : table->id = route_id;
257 28 : table->type_id = type_id;
258 28 : pthread_rwlock_init(&table->rwlock, NULL);
259 :
260 28 : return table;
261 : }
262 :
263 : /**
264 : * Gets the routing table associated with the route_id
265 : * @returns A pointer to the routing table if it exists, or NULL
266 : */
267 24 : static struct routing_table *get_routing_table(int route_id) {
268 24 : if (route_id > MAX_ROUTE_ID) {
269 0 : log_error("Cannot get route with ID > %d", MAX_ROUTE_ID);
270 0 : return NULL;
271 : }
272 24 : return all_routes[route_id];
273 : }
274 :
275 :
276 11 : int init_route(int route_id, int type_id) {
277 11 : if (route_id > MAX_ROUTE_ID) {
278 0 : log_error("Cannot initialize route with ID > %d", MAX_ROUTE_ID);
279 0 : return -1;
280 : }
281 11 : struct routing_table *table = get_routing_table(route_id);
282 11 : if (table != NULL) {
283 1 : log_error("Route with id %d already exists on runtime", route_id);
284 1 : return -1;
285 : }
286 10 : table = init_routing_table(route_id, type_id);
287 10 : if (table == NULL) {
288 0 : log_error("Error initializing routing table %d", route_id);
289 0 : return -1;
290 : }
291 10 : all_routes[route_id] = table;
292 10 : log(LOG_ROUTING_CHANGES, "Initialized route %d (type %d)", route_id, type_id);
293 10 : return 0;
294 : }
295 :
296 : // TODO: Redefine, move MAX_MSU_Q_SIZE!!
297 : #define MAX_MSU_Q_SIZE INT_MAX
298 :
299 0 : int get_shortest_queue_endpoint(struct routing_table *table,
300 : uint32_t key, struct msu_endpoint *endpoint) {
301 0 : read_lock(table);
302 :
303 0 : unsigned int shortest_queue = MAX_MSU_Q_SIZE;
304 0 : int n_shortest = 0;
305 0 : struct msu_endpoint *endpoints[table->n_endpoints];
306 :
307 0 : for (int i=0; i<table->n_endpoints; i++) {
308 0 : if (table->endpoints[i].locality == MSU_IS_REMOTE) {
309 0 : continue;
310 : }
311 0 : int length = table->endpoints[i].queue->num_msgs;
312 0 : if (length < shortest_queue) {
313 0 : endpoints[0] = &table->endpoints[i];
314 0 : shortest_queue = length;
315 0 : n_shortest = 1;
316 0 : } else if (length == shortest_queue) {
317 0 : endpoints[n_shortest] = &table->endpoints[i];
318 0 : n_shortest++;
319 : }
320 : }
321 :
322 0 : if (n_shortest == 0) {
323 0 : unlock(table);
324 0 : return -1;
325 : }
326 :
327 0 : *endpoint = *endpoints[key % (int)n_shortest];
328 0 : unlock(table);
329 0 : return 0;
330 : }
331 :
332 3 : int get_endpoint_by_index(struct routing_table *table, int index,
333 : struct msu_endpoint *endpoint) {
334 3 : int rtn = -1;
335 3 : read_lock(table);
336 3 : if (table->n_endpoints > index) {
337 2 : *endpoint = table->endpoints[index];
338 2 : rtn = 0;
339 : }
340 3 : unlock(table);
341 3 : return rtn;
342 : }
343 :
344 4 : int get_endpoint_by_id(struct routing_table *table, int msu_id,
345 : struct msu_endpoint *endpoint) {
346 4 : int rtn = -1;
347 4 : read_lock(table);
348 12 : for (int i=0; i < table->n_endpoints; i++) {
349 10 : if (table->endpoints[i].id == msu_id) {
350 2 : *endpoint = table->endpoints[i];
351 2 : rtn = 0;
352 2 : break;
353 : }
354 : }
355 4 : unlock(table);
356 4 : return rtn;
357 : }
358 :
359 0 : int get_endpoints_by_runtime_id(struct routing_table *table, int runtime_id,
360 : struct msu_endpoint *endpoints, int n_endpoints) {
361 0 : int found_endpoints = 0;
362 0 : read_lock(table);
363 0 : for (int i=0; i < table->n_endpoints; i++) {
364 0 : if (table->endpoints[i].runtime_id == runtime_id) {
365 0 : if (n_endpoints <= found_endpoints) {
366 0 : log_error("Not enough endpoints passed in to hold results");
367 0 : unlock(table);
368 0 : return -1;
369 : }
370 0 : endpoints[found_endpoints] = table->endpoints[i];
371 0 : found_endpoints++;
372 : }
373 : }
374 0 : unlock(table);
375 0 : return found_endpoints;
376 : }
377 :
378 0 : int get_n_endpoints(struct routing_table *table) {
379 0 : read_lock(table);
380 0 : int n_endpoints = table->n_endpoints;
381 0 : unlock(table);
382 0 : return n_endpoints;
383 : }
384 :
385 1 : int get_route_endpoint(struct routing_table *table, uint32_t key, struct msu_endpoint *endpoint) {
386 1 : read_lock(table);
387 1 : int index = find_value_index(table, key);
388 1 : if (index < 0) {
389 0 : log_error("Could not find index for key %d", key);
390 0 : unlock(table);
391 0 : return -1;
392 : }
393 1 : *endpoint = table->endpoints[index];
394 1 : log(LOG_ROUTING_DECISIONS, "Endpoint for key %u is %d", key, endpoint->id);
395 1 : unlock(table);
396 1 : return 0;
397 : }
398 :
399 3 : struct routing_table *get_type_from_route_set(struct route_set *set, int type_id) {
400 6 : for (int i=0; i<set->n_routes; i++) {
401 5 : if (set->routes[i]->type_id == type_id) {
402 2 : return set->routes[i];
403 : }
404 : }
405 1 : log_error("No route available of type %d", type_id);
406 1 : return NULL;
407 : }
408 :
409 3 : int add_route_endpoint(int route_id, struct msu_endpoint endpoint, uint32_t key) {
410 3 : log(LOG_ROUTING_CHANGES, "Adding endpoint %d to route %d", endpoint.id, route_id);
411 3 : struct routing_table *table = get_routing_table(route_id);
412 3 : if (table == NULL) {
413 0 : log_error("Route %d does not exist", route_id);
414 0 : return -1;
415 : }
416 3 : return add_routing_table_entry(table, &endpoint, key);
417 : }
418 :
419 0 : int remove_route_endpoint(int route_id, int msu_id) {
420 0 : struct routing_table *table = get_routing_table(route_id);
421 0 : if (table == NULL) {
422 0 : log_error("Route %d does not exist", route_id);
423 0 : return -1;
424 : }
425 0 : int rtn = rm_routing_table_entry(table, msu_id);
426 0 : if (rtn == -1) {
427 0 : log_error("Error removing msu %d from route %d", msu_id, route_id);
428 0 : return -1;
429 : }
430 0 : log(LOG_ROUTING_CHANGES, "Removed destination %d from route %d", msu_id, route_id);
431 0 : return 0;
432 : }
433 :
434 0 : int modify_route_endpoint(int route_id, int msu_id, uint32_t new_key) {
435 0 : struct routing_table *table = get_routing_table(route_id);
436 0 : if (table == NULL) {
437 0 : log_error("Route %d does not exist", route_id);
438 0 : return -1;
439 : }
440 0 : int rtn = alter_routing_table_entry(table, msu_id, new_key);
441 0 : if (rtn < 0) {
442 0 : log_error("Error altering routing for msu %d on route %d", msu_id, route_id);
443 0 : return -1;
444 : }
445 0 : log(LOG_ROUTING_CHANGES, "Altered key %d for msu %d in route %d",
446 : new_key, msu_id, route_id);
447 0 : return 0;
448 : }
449 :
450 5 : int add_route_to_set(struct route_set *set, int route_id) {
451 5 : struct routing_table *table = get_routing_table(route_id);
452 5 : if (table == NULL) {
453 1 : log_error("Route %d does not exist", route_id);
454 1 : return -1;
455 : }
456 4 : set->routes = realloc(set->routes, sizeof(*set->routes) * (set->n_routes + 1));
457 4 : if (set->routes == NULL) {
458 0 : log_error("Error reallocating routes");
459 0 : return -1;
460 : }
461 4 : set->routes[set->n_routes] = table;
462 4 : set->n_routes++;
463 4 : return 0;
464 : }
465 5 : int rm_route_from_set(struct route_set *set, int route_id) {
466 : int i;
467 10 : for (i=0; i<set->n_routes; i++) {
468 8 : if (set->routes[i]->id == route_id) {
469 3 : break;
470 : }
471 : }
472 5 : if (i == set->n_routes) {
473 2 : log_error("route %d does not exist in set", route_id);
474 2 : return -1;
475 : }
476 3 : for (; i<set->n_routes - 1; i++) {
477 0 : set->routes[i] = set->routes[i+1];
478 : }
479 3 : set->n_routes--;
480 3 : return 0;
481 : }
482 :
483 3 : int init_msu_endpoint(int msu_id, int runtime_id, struct msu_endpoint *endpoint) {
484 3 : endpoint->id = msu_id;
485 3 : int local_id = local_runtime_id();
486 3 : if (local_id < 0) {
487 0 : log_error("Cannot get local runtime ID");
488 0 : return -1;
489 : }
490 3 : if (runtime_id == local_id) {
491 0 : endpoint->locality = MSU_IS_LOCAL;
492 0 : struct local_msu *msu = get_local_msu(msu_id);
493 0 : if (msu == NULL) {
494 0 : log_error("Cannot find local MSU with id %d for initializing endpoint",
495 : msu_id);
496 0 : return -1;
497 : }
498 0 : endpoint->queue = &msu->queue;
499 0 : endpoint->runtime_id = runtime_id;
500 : } else {
501 3 : endpoint->locality = MSU_IS_REMOTE;
502 3 : endpoint->runtime_id = runtime_id;
503 : }
504 3 : return 0;
505 : }
506 :
507 :
|