LCOV - code coverage report
Current view: top level - runtime - routing.c (source / functions) Hit Total Coverage
Test: unnamed Lines: 145 243 59.7 %
Date: 2018-01-11 Functions: 19 24 79.2 %

          Line data    Source code
       1             : /*
       2             : START OF LICENSE STUB
       3             :     DeDOS: Declarative Dispersion-Oriented Software
       4             :     Copyright (C) 2017 University of Pennsylvania, Georgetown University
       5             : 
       6             :     This program is free software: you can redistribute it and/or modify
       7             :     it under the terms of the GNU General Public License as published by
       8             :     the Free Software Foundation, either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     This program is distributed in the hope that it will be useful,
      12             :     but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :     GNU General Public License for more details.
      15             : 
      16             :     You should have received a copy of the GNU General Public License
      17             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             : END OF LICENSE STUB
      19             : */
      20             : /**
      21             :  * @file routing.c
      22             :  *
      23             :  * Functions for routing MSU messages between MSUs
      24             :  */
      25             : 
      26             : #include "dfg.h"
      27             : #include "routing.h"
      28             : #include "logging.h"
      29             : #include "runtime_dfg.h"
      30             : #include "local_msu.h"
      31             : #include "unused_def.h"
      32             : #include <stdlib.h>
      33             : #include <limits.h>
      34             : #include <pthread.h>
      35             : 
      36             : /** The maximum number of destinations a route can have */
      37             : #define MAX_DESTINATIONS 128
      38             : 
      39             : /** The maximum ID that may be assigned to a route */
      40             : #define MAX_ROUTE_ID 10000
      41             : 
      42             : /**
      43             :  * The core of the routing system, the routing table lists a route's destinations.
      44             :  * The routing_table is kept private so the rwlock can be enfoced.
      45             :  * All destinations in a routing table must have the same type_id.
      46             :  */
      47             : struct routing_table{
      48             :     int id;
      49             :     int type_id; /**< The type-id associated with the destinations in this table */
      50             :     pthread_rwlock_t rwlock; /**< Protects access to the destinations so they cannot be changed
      51             :                                   while they are being read */
      52             :     int n_endpoints;      /**< The number of destinations this route contains */
      53             :     uint32_t keys[MAX_DESTINATIONS]; /**< The keys associated with each of the destinations */
      54             :     struct msu_endpoint endpoints[MAX_DESTINATIONS]; /**< The destinations themselves */
      55             : };
      56             : 
      57             : #ifdef LOG_ROUTING_CHANGES
      58             : static void print_routing_table(struct routing_table *table) {
      59             :     char output[1024];
      60             :     int offset = sprintf(output,"\n---------- ROUTE TYPE: %d ----------\n", table->type_id);
      61             :     for (int i=0; i<table->n_endpoints; i++) {
      62             :         struct msu_endpoint *destination = &table->endpoints[i];
      63             :         offset += sprintf(output+offset, "- %4d: %3d\n", destination->id, (int)table->keys[i]);
      64             :     }
      65             :     log(LOG_ROUTING_CHANGES, "%s", output);
      66             : }
      67             : #else
      68             : #define print_routing_table(t)
      69             : #endif
      70             : 
      71             : /**
      72             :  * All of the created routes in this runtime
      73             :  */
      74             : static struct routing_table *all_routes[MAX_ROUTE_ID];
      75             : 
      76             : /**
      77             :  * Simple wrapper to lock a routing table for reading
      78             :  * @param table the routing table to be locked for reading
      79             :  * @return 0 on success, -1 on error
      80             :  */
      81           8 : static int read_lock(struct routing_table *table) {
      82           8 :     return pthread_rwlock_rdlock(&table->rwlock);
      83             : }
      84             : 
      85             : /**
      86             :  * Simple wrapper to lock a routing table for writing
      87             :  * @param table the routing table to be locked for writing
      88             :  * @return 0 on success, -1 on error
      89             :  */
      90           6 : static int write_lock(struct routing_table *table) {
      91           6 :     return pthread_rwlock_wrlock(&table->rwlock);
      92             : }
      93             : 
      94             : /**
      95             :  * Simple wrapper to unlock a locked routing table
      96             :  * @param table the routing table to be unlocked
      97             :  * @return 0 on success, -1 on error
      98             :  */
      99          14 : static int unlock(struct routing_table *table) {
     100          14 :     return pthread_rwlock_unlock(&table->rwlock);
     101             : }
     102             : 
     103             : /**
     104             :  * Searches through the entries in the routing table, returning the index of the first key
     105             :  * which surpasses the provided value
     106             :  * @param table the table to search
     107             :  * @param value the value to use in the search
     108             :  * @return the index of the destination with the appropriate value
     109             :  */
     110          10 : static int find_value_index(struct routing_table *table, uint32_t value) {
     111             :     //TODO: Binary search?
     112          10 :     int i = -1;
     113          10 :     if (table->n_endpoints == 0)
     114           0 :         return -1;
     115          10 :     value = value % table->keys[table->n_endpoints-1];
     116          21 :     for (i=0; i<table->n_endpoints; i++) {
     117          21 :         if (table->keys[i] > value)
     118          10 :             return i;
     119             :     }
     120           0 :     return i;
     121             : }
     122             : 
     123             : /**
     124             :  * Finds the index of the entry in the routing table with the destination that has the provided
     125             :  * msu id
     126             :  * @param table the table to search
     127             :  * @param msu_id the ID to search for in the table
     128             :  * @return the index of the destination with the appropriate msu id
     129             :  */
     130           6 : static int find_id_index(struct routing_table *table, int msu_id) {
     131          13 :     for (int i=0; i<table->n_endpoints; i++) {
     132          12 :         if (table->endpoints[i].id == msu_id)
     133           5 :             return i;
     134             :     }
     135           1 :     return -1;
     136             : }
     137             : 
     138             : /**
     139             :  * Removes a destination from the routing table
     140             :  * @param table The table from which to remove the entry
     141             :  * @param msu_id The ID of the msu to remove from the table
     142             :  * @return 0 on success, -1 on error
     143             :  */
     144           1 : static int rm_routing_table_entry(struct routing_table *table, int msu_id) {
     145           1 :     write_lock(table);
     146           1 :     int index = find_id_index(table, msu_id);
     147           1 :     if (index == -1) {
     148           0 :         log_error("MSU %d does not exist in route %d", msu_id, table->id);
     149           0 :         unlock(table);
     150           0 :         return -1;
     151             :     }
     152             : 
     153             :     // Shift destinations after removed index back by one
     154           2 :     for (int i=index; i<table->n_endpoints-1; ++i) {
     155           1 :         table->keys[i] = table->keys[i+1];
     156           1 :         table->endpoints[i] = table->endpoints[i+1];
     157             :     }
     158           1 :     table->n_endpoints--;
     159           1 :     unlock(table);
     160           1 :     log(LOG_ROUTING_CHANGES, "Removed destination %d from table %d (type %d)",
     161             :                msu_id, table->id, table->type_id);
     162             :     print_routing_table(table);
     163           1 :     return 0;
     164             : }
     165             : 
     166             : /**
     167             :  * Adds a (copy of the) destination to a routing table.
     168             :  * Note: Kept non-static but excluded from header so it can be used for testing
     169             :  * @param table The table to which the destination is to be added
     170             :  * @param destination The endpoint to add (a copy is made)
     171             :  * @param key The key to be associated with this destination
     172             :  * @return 0 on success, -1 on error
     173             :  */
     174           4 : int add_routing_table_entry(struct routing_table *table,
     175             :                             struct msu_endpoint *dest, uint32_t key) {
     176           4 :     write_lock(table);
     177             : 
     178             :     int i;
     179           4 :     if (table->n_endpoints >= MAX_DESTINATIONS) {
     180           0 :         log_error("Too many endpoints to add to route. Max: %d", MAX_DESTINATIONS);
     181           0 :         unlock(table);
     182           0 :         return -1;
     183             :     }
     184             : 
     185             :     // Shift all endpoints greater than the provided endpoint up by one
     186           5 :     for (i = table->n_endpoints; i > 0 && key < table->keys[i-1]; --i) {
     187           1 :         table->keys[i] = table->keys[i-1];
     188           1 :         table->endpoints[i] = table->endpoints[i-1];
     189             :     }
     190           4 :     table->keys[i] = key;
     191           4 :     table->endpoints[i] = *dest;
     192           4 :     table->endpoints[i].route_id = table->id;
     193           4 :     table->n_endpoints++;
     194           4 :     unlock(table);
     195             : 
     196           4 :     log(LOG_ROUTING_CHANGES, "Added destination %d to table %d (type: %d)",
     197             :                dest->id, table->id, table->type_id);
     198             :     print_routing_table(table);
     199           4 :     return 0;
     200             : }
     201             : 
     202             : /**
     203             :  * Modifies the key associated with an existing destination in a routing table
     204             :  * @param table The routing table to modify
     205             :  * @param msu_id The ID of the destination to modify
     206             :  * @param new_key The new key that should be associated with this destination
     207             :  * @return 0 on success, -1 on error
     208             :  */
     209           1 : static int alter_routing_table_entry(struct routing_table *table,
     210             :                                      int msu_id, uint32_t new_key) {
     211           1 :     write_lock(table);
     212           1 :     int index = find_id_index(table, msu_id);
     213           1 :     if (index == -1) {
     214           0 :         log_error("MSU %d does not exist in route %d", msu_id, table->id);
     215           0 :         unlock(table);
     216           0 :         return -1;
     217             :     }
     218             :     // TODO: Next two steps could be done in one operation
     219             : 
     220           1 :     int UNUSED orig_key = table->keys[index];
     221             : 
     222           1 :     struct msu_endpoint endpoint = table->endpoints[index];
     223             :     // Shift indices after removed back
     224           3 :     for (int i=index; i<table->n_endpoints-1; i++) {
     225           2 :         table->keys[i] = table->keys[i+1];
     226           2 :         table->endpoints[i] = table->endpoints[i+1];
     227             :     }
     228             : 
     229             :     // Shift indices after inserted forward
     230             :     int i;
     231           2 :     for (i = table->n_endpoints-1; i > 0 && new_key < table->keys[i-1]; i--) {
     232           1 :         table->keys[i] = table->keys[i-1];
     233           1 :         table->endpoints[i] = table->endpoints[i-1];
     234             :     }
     235           1 :     table->keys[i] = new_key;
     236           1 :     table->endpoints[i] = endpoint;
     237           1 :     unlock(table);
     238           1 :     log(LOG_ROUTING_CHANGES, "Changed key of msu %d in table %d from %d to %d",
     239             :                msu_id, table->id, orig_key, new_key);
     240             :     print_routing_table(table);
     241           1 :     return 0;
     242             : }
     243             : 
     244             : /**
     245             :  * Initializes and returns a new routing table.
     246             :  * Note: Not for external access! Use init_route()
     247             :  * Note: Kept non-static (but exclued from header) so it can be accessed during testing
     248             :  * @returns The new routing table
     249             :  */
     250          28 : struct routing_table *init_routing_table(int route_id, int type_id) {
     251          28 :     struct routing_table *table = calloc(1, sizeof(*table));
     252          28 :     if (table == NULL) {
     253           0 :         log_error("Error allocating routing table");
     254           0 :         return NULL;
     255             :     }
     256          28 :     table->id = route_id;
     257          28 :     table->type_id = type_id;
     258          28 :     pthread_rwlock_init(&table->rwlock, NULL);
     259             : 
     260          28 :     return table;
     261             : }
     262             : 
     263             : /**
     264             :  * Gets the routing table associated with the route_id
     265             :  * @returns A pointer to the routing table if it exists, or NULL
     266             :  */
     267          24 : static struct routing_table *get_routing_table(int route_id) {
     268          24 :     if (route_id > MAX_ROUTE_ID) {
     269           0 :         log_error("Cannot get route with ID > %d", MAX_ROUTE_ID);
     270           0 :         return NULL;
     271             :     }
     272          24 :     return all_routes[route_id];
     273             : }
     274             : 
     275             : 
     276          11 : int init_route(int route_id, int type_id) {
     277          11 :     if (route_id > MAX_ROUTE_ID) {
     278           0 :         log_error("Cannot initialize route with ID > %d", MAX_ROUTE_ID);
     279           0 :         return -1;
     280             :     }
     281          11 :     struct routing_table *table = get_routing_table(route_id);
     282          11 :     if (table != NULL) {
     283           1 :         log_error("Route with id %d already exists on runtime", route_id);
     284           1 :         return -1;
     285             :     }
     286          10 :     table = init_routing_table(route_id, type_id);
     287          10 :     if (table == NULL) {
     288           0 :         log_error("Error initializing routing table %d", route_id);
     289           0 :         return -1;
     290             :     }
     291          10 :     all_routes[route_id] = table;
     292          10 :     log(LOG_ROUTING_CHANGES, "Initialized route %d (type %d)", route_id, type_id);
     293          10 :     return 0;
     294             : }
     295             : 
     296             : // TODO: Redefine, move  MAX_MSU_Q_SIZE!!
     297             : #define MAX_MSU_Q_SIZE INT_MAX
     298             : 
     299           0 : int get_shortest_queue_endpoint(struct routing_table *table,
     300             :                                 uint32_t key, struct msu_endpoint *endpoint) {
     301           0 :     read_lock(table);
     302             : 
     303           0 :     unsigned int shortest_queue = MAX_MSU_Q_SIZE;
     304           0 :     int n_shortest = 0;
     305           0 :     struct msu_endpoint *endpoints[table->n_endpoints];
     306             : 
     307           0 :     for (int i=0; i<table->n_endpoints; i++) {
     308           0 :         if (table->endpoints[i].locality == MSU_IS_REMOTE) {
     309           0 :             continue;
     310             :         }
     311           0 :         int length = table->endpoints[i].queue->num_msgs;
     312           0 :         if (length < shortest_queue) {
     313           0 :             endpoints[0] = &table->endpoints[i];
     314           0 :             shortest_queue = length;
     315           0 :             n_shortest = 1;
     316           0 :         } else if (length == shortest_queue) {
     317           0 :             endpoints[n_shortest] = &table->endpoints[i];
     318           0 :             n_shortest++;
     319             :         }
     320             :     }
     321             : 
     322           0 :     if (n_shortest == 0) {
     323           0 :         unlock(table);
     324           0 :         return -1;
     325             :     }
     326             : 
     327           0 :     *endpoint = *endpoints[key % (int)n_shortest];
     328           0 :     unlock(table);
     329           0 :     return 0;
     330             : }
     331             : 
     332           3 : int get_endpoint_by_index(struct routing_table *table, int index, 
     333             :                           struct msu_endpoint *endpoint) {
     334           3 :     int rtn = -1;
     335           3 :     read_lock(table);
     336           3 :     if (table->n_endpoints > index) {
     337           2 :         *endpoint = table->endpoints[index];
     338           2 :         rtn = 0;
     339             :     }
     340           3 :     unlock(table);
     341           3 :     return rtn;
     342             : }
     343             : 
     344           4 : int get_endpoint_by_id(struct routing_table *table, int msu_id, 
     345             :                        struct msu_endpoint *endpoint) {
     346           4 :     int rtn = -1;
     347           4 :     read_lock(table);
     348          12 :     for (int i=0; i < table->n_endpoints; i++) {
     349          10 :         if (table->endpoints[i].id == msu_id) {
     350           2 :             *endpoint = table->endpoints[i];
     351           2 :             rtn = 0;
     352           2 :             break;
     353             :         }
     354             :     }
     355           4 :     unlock(table);
     356           4 :     return rtn;
     357             : }
     358             : 
     359           0 : int get_endpoints_by_runtime_id(struct routing_table *table, int runtime_id,
     360             :                                 struct msu_endpoint *endpoints, int n_endpoints) {
     361           0 :     int found_endpoints = 0;
     362           0 :     read_lock(table);
     363           0 :     for (int i=0; i < table->n_endpoints; i++) {
     364           0 :         if (table->endpoints[i].runtime_id == runtime_id) {
     365           0 :             if (n_endpoints <= found_endpoints) {
     366           0 :                 log_error("Not enough endpoints passed in to hold results");
     367           0 :                 unlock(table);
     368           0 :                 return -1;
     369             :             }
     370           0 :             endpoints[found_endpoints] = table->endpoints[i];
     371           0 :             found_endpoints++;
     372             :         }
     373             :     }
     374           0 :     unlock(table);
     375           0 :     return found_endpoints;
     376             : }
     377             : 
     378           0 : int get_n_endpoints(struct routing_table *table) {
     379           0 :     read_lock(table);
     380           0 :     int n_endpoints = table->n_endpoints;
     381           0 :     unlock(table);
     382           0 :     return n_endpoints;
     383             : }
     384             : 
     385           1 : int get_route_endpoint(struct routing_table *table, uint32_t key, struct msu_endpoint *endpoint) {
     386           1 :     read_lock(table);
     387           1 :     int index = find_value_index(table, key);
     388           1 :     if (index < 0) {
     389           0 :         log_error("Could not find index for key %d", key);
     390           0 :         unlock(table);
     391           0 :         return -1;
     392             :     }
     393           1 :     *endpoint = table->endpoints[index];
     394           1 :     log(LOG_ROUTING_DECISIONS, "Endpoint for key %u is %d", key, endpoint->id);
     395           1 :     unlock(table);
     396           1 :     return 0;
     397             : }
     398             : 
     399           3 : struct routing_table *get_type_from_route_set(struct route_set *set, int type_id) {
     400           6 :     for (int i=0; i<set->n_routes; i++) {
     401           5 :         if (set->routes[i]->type_id == type_id) {
     402           2 :             return set->routes[i];
     403             :         }
     404             :     }
     405           1 :     log_error("No route available of type %d", type_id);
     406           1 :     return NULL;
     407             : }
     408             : 
     409           3 : int add_route_endpoint(int route_id, struct msu_endpoint endpoint, uint32_t key) {
     410           3 :     log(LOG_ROUTING_CHANGES, "Adding endpoint %d to route %d", endpoint.id, route_id);
     411           3 :     struct routing_table *table = get_routing_table(route_id);
     412           3 :     if (table == NULL) {
     413           0 :         log_error("Route %d does not exist", route_id);
     414           0 :         return -1;
     415             :     }
     416           3 :     return add_routing_table_entry(table, &endpoint, key);
     417             : }
     418             : 
     419           0 : int remove_route_endpoint(int route_id, int msu_id) {
     420           0 :     struct routing_table *table = get_routing_table(route_id);
     421           0 :     if (table == NULL) {
     422           0 :         log_error("Route %d does not exist", route_id);
     423           0 :         return -1;
     424             :     }
     425           0 :     int rtn = rm_routing_table_entry(table, msu_id);
     426           0 :     if (rtn == -1) {
     427           0 :         log_error("Error removing msu %d from route %d", msu_id, route_id);
     428           0 :         return -1;
     429             :     }
     430           0 :     log(LOG_ROUTING_CHANGES, "Removed destination %d from route %d", msu_id, route_id);
     431           0 :     return 0;
     432             : }
     433             : 
     434           0 : int modify_route_endpoint(int route_id, int msu_id, uint32_t new_key) {
     435           0 :     struct routing_table *table = get_routing_table(route_id);
     436           0 :     if (table == NULL) {
     437           0 :         log_error("Route %d does not exist", route_id);
     438           0 :         return -1;
     439             :     }
     440           0 :     int rtn = alter_routing_table_entry(table, msu_id, new_key);
     441           0 :     if (rtn < 0) {
     442           0 :         log_error("Error altering routing for msu %d on route %d", msu_id, route_id);
     443           0 :         return -1;
     444             :     }
     445           0 :     log(LOG_ROUTING_CHANGES, "Altered key %d for msu %d in route %d",
     446             :                new_key, msu_id, route_id);
     447           0 :     return 0;
     448             : }
     449             : 
     450           5 : int add_route_to_set(struct route_set *set, int route_id) {
     451           5 :     struct routing_table *table = get_routing_table(route_id);
     452           5 :     if (table == NULL) {
     453           1 :         log_error("Route %d does not exist", route_id);
     454           1 :         return -1;
     455             :     }
     456           4 :     set->routes = realloc(set->routes, sizeof(*set->routes) * (set->n_routes + 1));
     457           4 :     if (set->routes == NULL) {
     458           0 :         log_error("Error reallocating routes");
     459           0 :         return -1;
     460             :     }
     461           4 :     set->routes[set->n_routes] = table;
     462           4 :     set->n_routes++;
     463           4 :     return 0;
     464             : }
     465           5 : int rm_route_from_set(struct route_set *set, int route_id) {
     466             :     int i;
     467          10 :     for (i=0; i<set->n_routes; i++) {
     468           8 :         if (set->routes[i]->id == route_id) {
     469           3 :             break;
     470             :         }
     471             :     }
     472           5 :     if (i == set->n_routes) {
     473           2 :         log_error("route %d does not exist in set", route_id);
     474           2 :         return -1;
     475             :     }
     476           3 :     for (; i<set->n_routes - 1; i++) {
     477           0 :         set->routes[i] = set->routes[i+1];
     478             :     }
     479           3 :     set->n_routes--;
     480           3 :     return 0;
     481             : }
     482             : 
     483           3 : int init_msu_endpoint(int msu_id, int runtime_id, struct msu_endpoint *endpoint) {
     484           3 :     endpoint->id = msu_id;
     485           3 :     int local_id = local_runtime_id();
     486           3 :     if (local_id < 0) {
     487           0 :         log_error("Cannot get local runtime ID");
     488           0 :         return -1;
     489             :     }
     490           3 :     if (runtime_id == local_id) {
     491           0 :         endpoint->locality = MSU_IS_LOCAL;
     492           0 :         struct local_msu *msu = get_local_msu(msu_id);
     493           0 :         if (msu == NULL) {
     494           0 :             log_error("Cannot find local MSU with id %d for initializing endpoint",
     495             :                       msu_id);
     496           0 :             return -1;
     497             :         }
     498           0 :         endpoint->queue = &msu->queue;
     499           0 :         endpoint->runtime_id = runtime_id;
     500             :     } else {
     501           3 :         endpoint->locality = MSU_IS_REMOTE;
     502           3 :         endpoint->runtime_id = runtime_id;
     503             :     }
     504           3 :     return 0;
     505             : }
     506             : 
     507             : 

Generated by: LCOV version 1.10