LCOV - code coverage report
Current view: top level - runtime - dfg_instantiation.c (source / functions) Hit Total Coverage
Test: unnamed Lines: 22 91 24.2 %
Date: 2018-01-11 Functions: 3 8 37.5 %

          Line data    Source code
       1             : /*
       2             : START OF LICENSE STUB
       3             :     DeDOS: Declarative Dispersion-Oriented Software
       4             :     Copyright (C) 2017 University of Pennsylvania, Georgetown University
       5             : 
       6             :     This program is free software: you can redistribute it and/or modify
       7             :     it under the terms of the GNU General Public License as published by
       8             :     the Free Software Foundation, either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     This program is distributed in the hope that it will be useful,
      12             :     but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :     GNU General Public License for more details.
      15             : 
      16             :     You should have received a copy of the GNU General Public License
      17             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             : END OF LICENSE STUB
      19             : */
      20             : /**
      21             :  * @file dfg_instantiation.c
      22             :  *
      23             :  * Instantiation of a dfg on a runtime
      24             :  */
      25             : #include "logging.h"
      26             : #include "local_msu.h"
      27             : #include "dfg.h"
      28             : #include "routing.h"
      29             : 
      30             : /**
      31             :  * Adds the provided endpoints to the route with the provided `route_id`
      32             :  */
      33           1 : static int add_dfg_route_endpoints(int route_id,
      34             :                                       struct dfg_route_endpoint **endpoints,
      35             :                                       int n_endpoints) {
      36           8 :     for (int i=0; i<n_endpoints; i++) {
      37             :         struct msu_endpoint endpoint;
      38           3 :         struct dfg_route_endpoint *dest = endpoints[i];
      39             : 
      40           3 :         int rtn = init_msu_endpoint(dest->msu->id,
      41           3 :                                     dest->msu->scheduling.runtime->id,
      42             :                                     &endpoint);
      43           3 :         if (rtn < 0) {
      44           0 :             log_error("Error instantiating MSU %d endpoint for route %d",
      45             :                       dest->msu->id, route_id);
      46           0 :             return -1;
      47             :         }
      48           3 :         rtn = add_route_endpoint(route_id, endpoint, dest->key);
      49           3 :         if (rtn < 0) {
      50           0 :             log_error("Error adding endpoint %d to route %d",
      51             :                       dest->msu->id, route_id);
      52           0 :             return -1;
      53             :         }
      54             :     }
      55           1 :     log(LOG_DFG_INSTANTIATION, "Added %d endpoints to route %d",
      56             :                n_endpoints, route_id);
      57           1 :     return 0;
      58             : }
      59             : 
      60             : /**
      61             :  * Creates the given routes on the runtime
      62             :  */
      63           2 : static int spawn_dfg_routes(struct dfg_route **routes, int n_routes) {
      64           6 :     for (int i=0; i<n_routes; i++) {
      65           4 :         struct dfg_route *dfg_route = routes[i];
      66           4 :         if (init_route(dfg_route->id, dfg_route->msu_type->id) != 0) {
      67           0 :             log_error("Could not instantiate route %d (type: %d)",
      68             :                       dfg_route->id, dfg_route->msu_type->id);
      69           0 :             return -1;
      70             :         }
      71             :     }
      72           2 :     log(LOG_DFG_INSTANTIATION, "Spawned %d routes", n_routes);
      73           2 :     return 0;
      74             : }
      75             : 
      76             : /**
      77             :  * Adds all of the endpoints for the provided routes to the provided routes
      78             :  */
      79           0 : static int add_all_dfg_route_endpoints(struct dfg_route **routes, int n_routes) {
      80           0 :     for (int i=0; i<n_routes; i++) {
      81           0 :         struct dfg_route *dfg_route = routes[i];
      82           0 :         if (add_dfg_route_endpoints(dfg_route->id,
      83           0 :                                     dfg_route->endpoints,
      84             :                                     dfg_route->n_endpoints) != 0) {
      85           0 :             log_error("Error adding endpoints");
      86           0 :             return -1;
      87             :         }
      88             :     }
      89           0 :     return 0;
      90             : }
      91             : 
      92             : /**
      93             :  * Subscribes the MSU to the provided routes
      94             :  */
      95           1 : static int add_dfg_routes_to_msu(struct local_msu *msu, struct dfg_route **routes, int n_routes) {
      96           3 :     for (int i=0; i<n_routes; i++) {
      97           2 :         struct dfg_route *route = routes[i];
      98           2 :         if (add_route_to_set(&msu->routes,  route->id) != 0) {
      99           0 :             log_error("Error adding route %d to msu %d", route->id, msu->id);
     100           0 :             return -1;
     101             :         }
     102             :     }
     103           1 :     log(LOG_DFG_INSTANTIATION, "Added %d routes to msu %d",
     104             :                n_routes, msu->id);
     105           1 :     return 0;
     106             : }
     107             : 
     108             : /**
     109             :  * Creates all of the MSUs on the provided worker thread
     110             :  */
     111           0 : static int spawn_dfg_msus(struct worker_thread *thread, struct dfg_msu **msus, int n_msus) {
     112           0 :     for (int i=0; i<n_msus; i++) {
     113           0 :         struct dfg_msu *dfg_msu = msus[i];
     114           0 :         struct msu_type *type = get_msu_type(dfg_msu->type->id);
     115           0 :         if (type == NULL) {
     116           0 :             log_error("Could not retrieve MSU type %d", dfg_msu->type->id);
     117           0 :             return -1;
     118             :         }
     119           0 :         struct local_msu *msu = init_msu(dfg_msu->id, type, thread, &dfg_msu->init_data);
     120           0 :         if (msu == NULL) {
     121           0 :             log_error("Error instantiating MSU %d", dfg_msu->type->id);
     122           0 :             return -1;
     123             :         }
     124           0 :         log(LOG_DFG_INSTANTIATION, "Initialized MSU %d from dfg", dfg_msu->id);
     125           0 :         struct dfg_scheduling *sched = &dfg_msu->scheduling;
     126           0 :         if (add_dfg_routes_to_msu(msu, sched->routes, sched->n_routes) != 0) {
     127           0 :             log_error("Error adding routes to MSU %d", msu->id);
     128           0 :             return -1;
     129             :         }
     130             :     }
     131           0 :     log(LOG_DFG_INSTANTIATION, "Initialized %d MSUs", n_msus);
     132           0 :     return 0;
     133             : }
     134             : 
     135             : /**
     136             :  * Creates all of the provided threads (including MSUs) on the current runtime
     137             :  */
     138           0 : static int spawn_dfg_threads(struct dfg_thread **threads, int n_threads) {
     139           0 :     for (int i=0; i<n_threads; i++) {
     140           0 :         struct dfg_thread *dfg_thread = threads[i];
     141           0 :         int rtn = create_worker_thread(dfg_thread->id, dfg_thread->mode);
     142           0 :         if (rtn < 0) {
     143           0 :             log_error("Error instantiating thread %d! Can not continue!", dfg_thread->id);
     144           0 :             return -1;
     145             :         }
     146           0 :         log(LOG_DFG_INSTANTIATION, "Created worker thread %d, mode = %s",
     147             :                    dfg_thread->id, dfg_thread->mode == PINNED_THREAD ? "Pinned" : "Unpinned");
     148           0 :         struct worker_thread *thread = get_worker_thread(dfg_thread->id);
     149           0 :         if (spawn_dfg_msus(thread, dfg_thread->msus, dfg_thread->n_msus) != 0) {
     150           0 :             log_error("Error instantiating thread %d MSUs", dfg_thread->id);
     151           0 :             return -1;
     152             :         }
     153           0 :         log(LOG_DFG_INSTANTIATION, "Spawned %d MSUs on thread %d",
     154             :                    dfg_thread->n_msus, dfg_thread->id);
     155             :     }
     156           0 :     log(LOG_DFG_INSTANTIATION, "Initialized %d threads", n_threads);
     157           0 :     return 0;
     158             : }
     159             : 
     160           0 : int init_dfg_msu_types(struct dfg_msu_type **msu_types, int n_msu_types) {
     161           0 :     for (int i=0; i<n_msu_types; i++) {
     162           0 :         struct dfg_msu_type *type = msu_types[i];
     163           0 :         if (init_msu_type_id(type->id) != 0) {
     164           0 :             log_error("Could not instantiate required type %d", type->id);
     165           0 :             return -1;
     166             :         }
     167           0 :         log(LOG_DFG_INSTANTIATION, "Initialized MSU Type %d from dfg",
     168             :                    type->id);
     169             :     }
     170           0 :     log(LOG_DFG_INSTANTIATION, "Initialized %d MSU types", n_msu_types);
     171           0 :     return 0;
     172             : }
     173             : 
     174           0 : int instantiate_dfg_runtime(struct dfg_runtime *rt) {
     175           0 :     if (spawn_dfg_routes(rt->routes, rt->n_routes) != 0) {
     176           0 :         log_error("Error spawning routes for runtime DFG instantiation");
     177           0 :         return -1;
     178             :     }
     179           0 :     if (spawn_dfg_threads(rt->threads, rt->n_pinned_threads + rt->n_unpinned_threads) != 0) {
     180           0 :         log_error("Error spawning threads for runtime DFG instantiation");
     181           0 :         return -1;
     182             :     }
     183           0 :     if (add_all_dfg_route_endpoints(rt->routes, rt->n_routes) != 0) {
     184           0 :         log_error("Error adding endpoints to routes");
     185           0 :         return -1;
     186             :     }
     187           0 :     return 0;
     188             : }

Generated by: LCOV version 1.10