Line data Source code
1 : /*
2 : START OF LICENSE STUB
3 : DeDOS: Declarative Dispersion-Oriented Software
4 : Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 :
6 : This program is free software: you can redistribute it and/or modify
7 : it under the terms of the GNU General Public License as published by
8 : the Free Software Foundation, either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : This program is distributed in the hope that it will be useful,
12 : but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : GNU General Public License for more details.
15 :
16 : You should have received a copy of the GNU General Public License
17 : along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : END OF LICENSE STUB
19 : */
20 : /**
21 : * @file dfg_instantiation.c
22 : *
23 : * Instantiation of a dfg on a runtime
24 : */
25 : #include "logging.h"
26 : #include "local_msu.h"
27 : #include "dfg.h"
28 : #include "routing.h"
29 :
30 : /**
31 : * Adds the provided endpoints to the route with the provided `route_id`
32 : */
33 1 : static int add_dfg_route_endpoints(int route_id,
34 : struct dfg_route_endpoint **endpoints,
35 : int n_endpoints) {
36 8 : for (int i=0; i<n_endpoints; i++) {
37 : struct msu_endpoint endpoint;
38 3 : struct dfg_route_endpoint *dest = endpoints[i];
39 :
40 3 : int rtn = init_msu_endpoint(dest->msu->id,
41 3 : dest->msu->scheduling.runtime->id,
42 : &endpoint);
43 3 : if (rtn < 0) {
44 0 : log_error("Error instantiating MSU %d endpoint for route %d",
45 : dest->msu->id, route_id);
46 0 : return -1;
47 : }
48 3 : rtn = add_route_endpoint(route_id, endpoint, dest->key);
49 3 : if (rtn < 0) {
50 0 : log_error("Error adding endpoint %d to route %d",
51 : dest->msu->id, route_id);
52 0 : return -1;
53 : }
54 : }
55 1 : log(LOG_DFG_INSTANTIATION, "Added %d endpoints to route %d",
56 : n_endpoints, route_id);
57 1 : return 0;
58 : }
59 :
60 : /**
61 : * Creates the given routes on the runtime
62 : */
63 2 : static int spawn_dfg_routes(struct dfg_route **routes, int n_routes) {
64 6 : for (int i=0; i<n_routes; i++) {
65 4 : struct dfg_route *dfg_route = routes[i];
66 4 : if (init_route(dfg_route->id, dfg_route->msu_type->id) != 0) {
67 0 : log_error("Could not instantiate route %d (type: %d)",
68 : dfg_route->id, dfg_route->msu_type->id);
69 0 : return -1;
70 : }
71 : }
72 2 : log(LOG_DFG_INSTANTIATION, "Spawned %d routes", n_routes);
73 2 : return 0;
74 : }
75 :
76 : /**
77 : * Adds all of the endpoints for the provided routes to the provided routes
78 : */
79 0 : static int add_all_dfg_route_endpoints(struct dfg_route **routes, int n_routes) {
80 0 : for (int i=0; i<n_routes; i++) {
81 0 : struct dfg_route *dfg_route = routes[i];
82 0 : if (add_dfg_route_endpoints(dfg_route->id,
83 0 : dfg_route->endpoints,
84 : dfg_route->n_endpoints) != 0) {
85 0 : log_error("Error adding endpoints");
86 0 : return -1;
87 : }
88 : }
89 0 : return 0;
90 : }
91 :
92 : /**
93 : * Subscribes the MSU to the provided routes
94 : */
95 1 : static int add_dfg_routes_to_msu(struct local_msu *msu, struct dfg_route **routes, int n_routes) {
96 3 : for (int i=0; i<n_routes; i++) {
97 2 : struct dfg_route *route = routes[i];
98 2 : if (add_route_to_set(&msu->routes, route->id) != 0) {
99 0 : log_error("Error adding route %d to msu %d", route->id, msu->id);
100 0 : return -1;
101 : }
102 : }
103 1 : log(LOG_DFG_INSTANTIATION, "Added %d routes to msu %d",
104 : n_routes, msu->id);
105 1 : return 0;
106 : }
107 :
108 : /**
109 : * Creates all of the MSUs on the provided worker thread
110 : */
111 0 : static int spawn_dfg_msus(struct worker_thread *thread, struct dfg_msu **msus, int n_msus) {
112 0 : for (int i=0; i<n_msus; i++) {
113 0 : struct dfg_msu *dfg_msu = msus[i];
114 0 : struct msu_type *type = get_msu_type(dfg_msu->type->id);
115 0 : if (type == NULL) {
116 0 : log_error("Could not retrieve MSU type %d", dfg_msu->type->id);
117 0 : return -1;
118 : }
119 0 : struct local_msu *msu = init_msu(dfg_msu->id, type, thread, &dfg_msu->init_data);
120 0 : if (msu == NULL) {
121 0 : log_error("Error instantiating MSU %d", dfg_msu->type->id);
122 0 : return -1;
123 : }
124 0 : log(LOG_DFG_INSTANTIATION, "Initialized MSU %d from dfg", dfg_msu->id);
125 0 : struct dfg_scheduling *sched = &dfg_msu->scheduling;
126 0 : if (add_dfg_routes_to_msu(msu, sched->routes, sched->n_routes) != 0) {
127 0 : log_error("Error adding routes to MSU %d", msu->id);
128 0 : return -1;
129 : }
130 : }
131 0 : log(LOG_DFG_INSTANTIATION, "Initialized %d MSUs", n_msus);
132 0 : return 0;
133 : }
134 :
135 : /**
136 : * Creates all of the provided threads (including MSUs) on the current runtime
137 : */
138 0 : static int spawn_dfg_threads(struct dfg_thread **threads, int n_threads) {
139 0 : for (int i=0; i<n_threads; i++) {
140 0 : struct dfg_thread *dfg_thread = threads[i];
141 0 : int rtn = create_worker_thread(dfg_thread->id, dfg_thread->mode);
142 0 : if (rtn < 0) {
143 0 : log_error("Error instantiating thread %d! Can not continue!", dfg_thread->id);
144 0 : return -1;
145 : }
146 0 : log(LOG_DFG_INSTANTIATION, "Created worker thread %d, mode = %s",
147 : dfg_thread->id, dfg_thread->mode == PINNED_THREAD ? "Pinned" : "Unpinned");
148 0 : struct worker_thread *thread = get_worker_thread(dfg_thread->id);
149 0 : if (spawn_dfg_msus(thread, dfg_thread->msus, dfg_thread->n_msus) != 0) {
150 0 : log_error("Error instantiating thread %d MSUs", dfg_thread->id);
151 0 : return -1;
152 : }
153 0 : log(LOG_DFG_INSTANTIATION, "Spawned %d MSUs on thread %d",
154 : dfg_thread->n_msus, dfg_thread->id);
155 : }
156 0 : log(LOG_DFG_INSTANTIATION, "Initialized %d threads", n_threads);
157 0 : return 0;
158 : }
159 :
160 0 : int init_dfg_msu_types(struct dfg_msu_type **msu_types, int n_msu_types) {
161 0 : for (int i=0; i<n_msu_types; i++) {
162 0 : struct dfg_msu_type *type = msu_types[i];
163 0 : if (init_msu_type_id(type->id) != 0) {
164 0 : log_error("Could not instantiate required type %d", type->id);
165 0 : return -1;
166 : }
167 0 : log(LOG_DFG_INSTANTIATION, "Initialized MSU Type %d from dfg",
168 : type->id);
169 : }
170 0 : log(LOG_DFG_INSTANTIATION, "Initialized %d MSU types", n_msu_types);
171 0 : return 0;
172 : }
173 :
174 0 : int instantiate_dfg_runtime(struct dfg_runtime *rt) {
175 0 : if (spawn_dfg_routes(rt->routes, rt->n_routes) != 0) {
176 0 : log_error("Error spawning routes for runtime DFG instantiation");
177 0 : return -1;
178 : }
179 0 : if (spawn_dfg_threads(rt->threads, rt->n_pinned_threads + rt->n_unpinned_threads) != 0) {
180 0 : log_error("Error spawning threads for runtime DFG instantiation");
181 0 : return -1;
182 : }
183 0 : if (add_all_dfg_route_endpoints(rt->routes, rt->n_routes) != 0) {
184 0 : log_error("Error adding endpoints to routes");
185 0 : return -1;
186 : }
187 0 : return 0;
188 : }
|