My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
dfg_instantiation.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
25 #include "logging.h"
26 #include "local_msu.h"
27 #include "dfg.h"
28 #include "routing.h"
29 
33 static int add_dfg_route_endpoints(int route_id,
34  struct dfg_route_endpoint **endpoints,
35  int n_endpoints) {
36  for (int i=0; i<n_endpoints; i++) {
37  struct msu_endpoint endpoint;
38  struct dfg_route_endpoint *dest = endpoints[i];
39 
40  int rtn = init_msu_endpoint(dest->msu->id,
41  dest->msu->scheduling.runtime->id,
42  &endpoint);
43  if (rtn < 0) {
44  log_error("Error instantiating MSU %d endpoint for route %d",
45  dest->msu->id, route_id);
46  return -1;
47  }
48  rtn = add_route_endpoint(route_id, endpoint, dest->key);
49  if (rtn < 0) {
50  log_error("Error adding endpoint %d to route %d",
51  dest->msu->id, route_id);
52  return -1;
53  }
54  }
55  log(LOG_DFG_INSTANTIATION, "Added %d endpoints to route %d",
56  n_endpoints, route_id);
57  return 0;
58 }
59 
63 static int spawn_dfg_routes(struct dfg_route **routes, int n_routes) {
64  for (int i=0; i<n_routes; i++) {
65  struct dfg_route *dfg_route = routes[i];
66  if (init_route(dfg_route->id, dfg_route->msu_type->id) != 0) {
67  log_error("Could not instantiate route %d (type: %d)",
68  dfg_route->id, dfg_route->msu_type->id);
69  return -1;
70  }
71  }
72  log(LOG_DFG_INSTANTIATION, "Spawned %d routes", n_routes);
73  return 0;
74 }
75 
79 static int add_all_dfg_route_endpoints(struct dfg_route **routes, int n_routes) {
80  for (int i=0; i<n_routes; i++) {
81  struct dfg_route *dfg_route = routes[i];
82  if (add_dfg_route_endpoints(dfg_route->id,
83  dfg_route->endpoints,
84  dfg_route->n_endpoints) != 0) {
85  log_error("Error adding endpoints");
86  return -1;
87  }
88  }
89  return 0;
90 }
91 
95 static int add_dfg_routes_to_msu(struct local_msu *msu, struct dfg_route **routes, int n_routes) {
96  for (int i=0; i<n_routes; i++) {
97  struct dfg_route *route = routes[i];
98  if (add_route_to_set(&msu->routes, route->id) != 0) {
99  log_error("Error adding route %d to msu %d", route->id, msu->id);
100  return -1;
101  }
102  }
103  log(LOG_DFG_INSTANTIATION, "Added %d routes to msu %d",
104  n_routes, msu->id);
105  return 0;
106 }
107 
111 static int spawn_dfg_msus(struct worker_thread *thread, struct dfg_msu **msus, int n_msus) {
112  for (int i=0; i<n_msus; i++) {
113  struct dfg_msu *dfg_msu = msus[i];
114  struct msu_type *type = get_msu_type(dfg_msu->type->id);
115  if (type == NULL) {
116  log_error("Could not retrieve MSU type %d", dfg_msu->type->id);
117  return -1;
118  }
119  struct local_msu *msu = init_msu(dfg_msu->id, type, thread, &dfg_msu->init_data);
120  if (msu == NULL) {
121  log_error("Error instantiating MSU %d", dfg_msu->type->id);
122  return -1;
123  }
124  log(LOG_DFG_INSTANTIATION, "Initialized MSU %d from dfg", dfg_msu->id);
125  struct dfg_scheduling *sched = &dfg_msu->scheduling;
126  if (add_dfg_routes_to_msu(msu, sched->routes, sched->n_routes) != 0) {
127  log_error("Error adding routes to MSU %d", msu->id);
128  return -1;
129  }
130  }
131  log(LOG_DFG_INSTANTIATION, "Initialized %d MSUs", n_msus);
132  return 0;
133 }
134 
138 static int spawn_dfg_threads(struct dfg_thread **threads, int n_threads) {
139  for (int i=0; i<n_threads; i++) {
140  struct dfg_thread *dfg_thread = threads[i];
141  int rtn = create_worker_thread(dfg_thread->id, dfg_thread->mode);
142  if (rtn < 0) {
143  log_error("Error instantiating thread %d! Can not continue!", dfg_thread->id);
144  return -1;
145  }
146  log(LOG_DFG_INSTANTIATION, "Created worker thread %d, mode = %s",
147  dfg_thread->id, dfg_thread->mode == PINNED_THREAD ? "Pinned" : "Unpinned");
148  struct worker_thread *thread = get_worker_thread(dfg_thread->id);
149  if (spawn_dfg_msus(thread, dfg_thread->msus, dfg_thread->n_msus) != 0) {
150  log_error("Error instantiating thread %d MSUs", dfg_thread->id);
151  return -1;
152  }
153  log(LOG_DFG_INSTANTIATION, "Spawned %d MSUs on thread %d",
154  dfg_thread->n_msus, dfg_thread->id);
155  }
156  log(LOG_DFG_INSTANTIATION, "Initialized %d threads", n_threads);
157  return 0;
158 }
159 
160 int init_dfg_msu_types(struct dfg_msu_type **msu_types, int n_msu_types) {
161  for (int i=0; i<n_msu_types; i++) {
162  struct dfg_msu_type *type = msu_types[i];
163  if (init_msu_type_id(type->id) != 0) {
164  log_error("Could not instantiate required type %d", type->id);
165  return -1;
166  }
167  log(LOG_DFG_INSTANTIATION, "Initialized MSU Type %d from dfg",
168  type->id);
169  }
170  log(LOG_DFG_INSTANTIATION, "Initialized %d MSU types", n_msu_types);
171  return 0;
172 }
173 
175  if (spawn_dfg_routes(rt->routes, rt->n_routes) != 0) {
176  log_error("Error spawning routes for runtime DFG instantiation");
177  return -1;
178  }
180  log_error("Error spawning threads for runtime DFG instantiation");
181  return -1;
182  }
183  if (add_all_dfg_route_endpoints(rt->routes, rt->n_routes) != 0) {
184  log_error("Error adding endpoints to routes");
185  return -1;
186  }
187  return 0;
188 }
static int spawn_dfg_msus(struct worker_thread *thread, struct dfg_msu **msus, int n_msus)
Creates all of the MSUs on the provided worker thread.
int init_msu_endpoint(int msu_id, int runtime_id, struct msu_endpoint *endpoint)
Initializes an endpoint structure to point to the relevant msu.
Definition: routing.c:483
static int add_all_dfg_route_endpoints(struct dfg_route **routes, int n_routes)
Adds all of the endpoints for the provided routes to the provided routes.
int n_routes
The routes that an MSU can send to.
Definition: dfg.h:120
int add_route_endpoint(int route_id, struct msu_endpoint endpoint, uint32_t key)
Adds an endpoint to the route with the given ID.
Definition: routing.c:409
int init_route(int route_id, int type_id)
Initializes a new route with the given route_id and type_id.
Definition: routing.c:276
enum thread_mode mode
Pinned/unpinned mode for the thread.
Definition: dfg.h:106
int n_msus
Number of MSUs placed on the thread.
Definition: dfg.h:108
struct dfg_scheduling scheduling
Information about where an MSU is scheduled.
Definition: dfg.h:225
struct dfg_route * routes[64]
Routes located on this runtime.
Definition: dfg.h:83
static int add_dfg_route_endpoints(int route_id, struct dfg_route_endpoint **endpoints, int n_endpoints)
Adds the provided endpoints to the route with the provided route_id
Representation of a thread that holds MSUs, messages, and waits on a semaphore.
Definition: worker_thread.h:40
struct dfg_msu * msus[8]
MSUs placed on that thread.
Definition: dfg.h:107
static int route(struct msu_type *type, struct local_msu *sender, struct msu_msg *msg, struct msu_endpoint *output)
Definition: baremetal_msu.c:30
int id
Unique identifier for the runtime.
Definition: dfg.h:74
struct msu_type * get_msu_type(int id)
Gets the MSU type with the provided ID.
Definition: msu_type.c:80
Logging of status messages to the terminal.
unsigned int id
Unique ID for a local MSU.
Definition: local_msu.h:54
struct dfg_msu_type * type
The type of the MSU and meta-routing information.
Definition: dfg.h:221
Representation of a runtime in the DFG.
Definition: dfg.h:73
struct dfg_route_endpoint * endpoints[256]
The endpoints of the route.
Definition: dfg.h:156
struct route_set routes
Routing table set, containing all destination MSUs (see routing.h for details)
Definition: local_msu.h:51
int n_endpoints
The number of endpoints in dfg_route::endpoints.
Definition: dfg.h:157
struct dfg_runtime * runtime
The runtime on which an MSU is running.
Definition: dfg.h:117
#define log_error(fmt,...)
Definition: logging.h:101
Declares the structures and functions applicable to MSUs on the local machine.
struct local_msu * init_msu(unsigned int id, struct msu_type *type, struct worker_thread *thread, struct msu_init_data *data)
Allocates and creates a new MSU of the specified type and ID on the given thread. ...
Definition: local_msu.c:198
struct dfg_msu * msu
The MSU at this endpoint to which a message would be delivered.
Definition: dfg.h:143
A type of MSU.
Definition: dfg.h:176
int id
A unique identifier for the MSU.
Definition: dfg.h:217
The structure that represents an MSU located on the local machine.
Definition: local_msu.h:38
int n_unpinned_threads
Number of the above-threads which are unpinned.
Definition: dfg.h:81
int add_route_to_set(struct route_set *set, int route_id)
Adds a route to a set of routes.
Definition: routing.c:450
int id
Unique identifier for the thread.
Definition: dfg.h:105
Representation of a single MSU in the dfg.
Definition: dfg.h:216
Functions for routing MSU messages between MSUs.
int init_msu_type_id(unsigned int type_id)
Initializes the MSU type with the given ID, calling the custom constructor if appropriate.
Definition: msu_type.c:132
int n_pinned_threads
Number of the above-threads which are pinned.
Definition: dfg.h:80
int id
A unique identifier for the MSU type.
Definition: dfg.h:177
static int spawn_dfg_routes(struct dfg_route **routes, int n_routes)
Creates the given routes on the runtime.
int id
A unique identifier for the route.
Definition: dfg.h:153
struct worker_thread * get_worker_thread(int id)
Definition: worker_thread.c:79
Defines a type of MSU.
Definition: msu_type.h:46
struct dfg_route * routes[32]
Definition: dfg.h:119
A route through which MSU messages can be passed.
Definition: dfg.h:152
struct msu_init_data init_data
Initial data passed to the MSU.
Definition: dfg.h:219
Interfaces for the creation and modification of the data-flow-graph and and general description of th...
A single endpoint for an MSU route.
Definition: dfg.h:139
struct dfg_thread * threads[32]
Threads located on the runtime.
Definition: dfg.h:79
uint32_t key
The key associated with this endpoint.
Definition: dfg.h:140
int instantiate_dfg_runtime(struct dfg_runtime *rt)
Instantiates the MSUs, routes, and threads on the specified runtime.
Structure representing the scheduling of an MSU on a runtime.
Definition: dfg.h:116
struct dedos_thread * thread
The underlying dedos_thread.
Definition: worker_thread.h:42
int init_dfg_msu_types(struct dfg_msu_type **msu_types, int n_msu_types)
Runs the runtime initilization function for the given MSU types.
Representation of a thread on a runtime in the DFG.
Definition: dfg.h:104
int n_routes
Number of routes above.
Definition: dfg.h:84
static int add_dfg_routes_to_msu(struct local_msu *msu, struct dfg_route **routes, int n_routes)
Subscribes the MSU to the provided routes.
#define log(level, fmt,...)
Log at a custom level.
Definition: logging.h:147
int create_worker_thread(unsigned int thread_id, enum blocking_mode mode)
Starts a new worker thread with the given thread ID and pinned/unpinned status.
static int spawn_dfg_threads(struct dfg_thread **threads, int n_threads)
Creates all of the provided threads (including MSUs) on the current runtime.
An endpoint to which an msu_msg can be delivered.
Definition: routing.h:32
static struct msu_type * msu_types[1000]
Pointers to MSU Types, indexed by ID.
Definition: msu_type.c:51
struct dfg_msu_type * msu_type
The type of MSU to which this route delivers.
Definition: dfg.h:155