My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
api.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
20 #include <stdlib.h>
21 #include <string.h>
22 #include <arpa/inet.h>
23 
24 
25 #include "controller_stats.h"
26 #include "controller_mysql.h"
27 #include "logging.h"
28 #include "api.h"
29 #include "dfg.h"
30 #include "communication.h"
31 #include "msu_ids.h"
32 #include "controller_dfg.h"
33 #include "runtime_messages.h"
34 
35 int add_msu(unsigned int msu_id, unsigned int type_id,
36  char *init_data_c, char *msu_mode, char *vertex_type_c,
37  unsigned int thread_id, unsigned int runtime_id) {
38 
39  struct dfg_msu_type *type = get_dfg_msu_type(type_id);
40  if (type == NULL) {
41  log_error("Could not get MSU type %d", type_id);
42  return -1;
43  }
44 
45  enum blocking_mode mode = str_to_blocking_mode(msu_mode);
46  if (mode == UNKNOWN_BLOCKING_MODE) {
47  log_error("Could not get blocking mode from %s", msu_mode);
48  return -1;
49  }
50 
51  uint8_t vertex_type = str_to_vertex_type(vertex_type_c);
52 
53  struct msu_init_data init_data;
54  strcpy(init_data.init_data, init_data_c);
55 
56  struct dfg_msu *msu = init_dfg_msu(msu_id, type, vertex_type, mode, &init_data);
57 
58  if (msu == NULL) {
59  log_error("Error innitializing dfg MSU %d", msu_id);
60  return -1;
61  }
62 
63  int rtn = schedule_dfg_msu(msu, runtime_id, thread_id);
64  if (rtn < 0) {
65  log_error("Error scheduling MSU on DFG");
66  return -1;
67  }
68 
69  if (send_create_msu_msg(msu) == -1) {
70  log_error("Could not send create-msu-msg to runtime %u", runtime_id);
71  unschedule_dfg_msu(msu);
72  return -1;
73  } else {
74  register_msu_stats(msu_id, type_id, thread_id, runtime_id);
75  return 0;
76  }
77 }
78 
79 int remove_msu(unsigned int id) {
80  struct dfg_msu *msu = get_dfg_msu(id);
81 
82  if (msu == NULL) {
83  log_error("MSU %u not scheduled!", id);
84  return -1;
85  }
86 
87  int rtn = send_delete_msu_msg(msu);
88 
89  if (rtn < 0) {
90  log_error("Error sending delete MSU msg for MSU %d", id);
91  return -1;
92  }
93 
94  rtn = unschedule_dfg_msu(msu);
95  if (rtn < 0) {
96  log_error("Error unscheduling DFG msu");
97  return -1;
98  }
100 
101  return 0;
102 }
103 
104 int create_route(unsigned int route_id, unsigned int type_id, unsigned int runtime_id) {
105  struct dfg_msu_type *type = get_dfg_msu_type(type_id);
106  if (type == NULL) {
107  log_error("Could not find MSU type %u in dfg", type_id);
108  return -1;
109  }
110 
111  struct dfg_route *route = create_dfg_route(route_id, type, runtime_id);
112  if (route == NULL) {
113  log_error("Could not create route %u in dfg", route_id);
114  return -1;
115  }
116 
117  int rtn = send_create_route_msg(route);
118  if (rtn < 0) {
119  log_error("Error sending create route message");
120  // TODO: Delete route
121  return -1;
122  }
123  return 0;
124 }
125 
126 int delete_route(unsigned int route_id) {
127  struct dfg_route *route = get_dfg_route(route_id);
128  if (route == NULL) {
129  log_error("Route %u does not exist", route_id);
130  return -1;
131  }
132  int rtn = send_delete_route_msg(route);
133  if (rtn < 0) {
134  log_error("Error sending delete route message");
135  return -1;
136  }
137 
138  rtn = delete_dfg_route(route);
139  if (rtn < 0) {
140  log_error("Error deleting DFG route");
141  return -1;
142  }
143 
144  return 0;
145 }
146 
147 int add_route_to_msu(unsigned int msu_id, unsigned int route_id) {
148  struct dfg_route *route = get_dfg_route(route_id);
149  if (route == NULL) {
150  log_error("Could not retrieve route %u from dfg", route_id);
151  return -1;
152  }
153  struct dfg_msu *msu = get_dfg_msu(msu_id);
154  if (msu == NULL) {
155  log_error("Could not retrieve MSU %u from DFG", msu_id);
156  return -1;
157  }
158 
159  int rtn = add_dfg_route_to_msu(route, msu);
160 
161  if (rtn < 0) {
162  log_error("Error adding route to MSU");
163  return -1;
164  }
165 
166  rtn = send_add_route_to_msu_msg(route, msu);
167  if (rtn < 0) {
168  log_error("Error sending add route to MSU msg");
169  // TODO: Remove route from MSU
170  return -1;
171  }
172  return 0;
173 }
174 
175 // TODO: del_route_from_msu()
176 
177 int add_endpoint(unsigned int msu_id, uint32_t key, unsigned int route_id) {
178  struct dfg_route *route = get_dfg_route(route_id);
179  if (route == NULL) {
180  log_error("Could not retrieve route %u from dfg", route_id);
181  return -1;
182  }
183  struct dfg_msu *msu = get_dfg_msu(msu_id);
184  if (msu == NULL) {
185  log_error("Could not retrieve MSU %u from DFG", msu_id);
186  return -1;
187  }
188 
189  struct dfg_route_endpoint *endpoint = add_dfg_route_endpoint(msu, key, route);
190  if (endpoint == NULL) {
191  log_error("Error adding MSU to route");
192  return -1;
193  }
194 
195  int rtn = send_add_endpoint_msg(route, endpoint);
196  if (rtn < 0) {
197  log_error("Error sending add endpoint message");
198  // TODO: Remove endpoint
199  return -1;
200  }
201  log(LOG_API_CALLS, "Added endpoint %u with key %d to route %u",
202  msu_id, (int)key, route_id);
203  return 0;
204 }
205 
206 int del_endpoint(unsigned int msu_id, unsigned int route_id) {
207  struct dfg_route *route = get_dfg_route(route_id);
208  if (route == NULL) {
209  log_error("Could not retrieve route %u from dfg", route_id);
210  return -1;
211  }
212 
213  struct dfg_route_endpoint *ep = get_dfg_route_endpoint(route, msu_id);
214  if (ep == NULL) {
215  log_error("Could not get endpoint %u from dfg route", msu_id);
216  return -1;
217  }
218 
219  int rtn = send_del_endpoint_msg(route, ep);
220  if (rtn < 0) {
221  log_error("Error sending delete endpoint message");
222  return -1;
223  }
224 
225  rtn = del_dfg_route_endpoint(route, ep);
226  if (rtn < 0) {
227  log_error("Error deleting endpoint from DFG route");
228  return -1;
229  }
230 
231  return 0;
232 }
233 
234 int mod_endpoint(unsigned int msu_id, uint32_t key, unsigned int route_id) {
235  struct dfg_route *route = get_dfg_route(route_id);
236  if (route == NULL) {
237  log_error("Could not retrieve route %u from dfg", route_id);
238  return -1;
239  }
240 
241  struct dfg_route_endpoint *ep = get_dfg_route_endpoint(route, msu_id);
242  if (ep == NULL) {
243  log_error("Could not get endpoint %u from dfg route", msu_id);
244  return -1;
245  }
246 
247  int rtn = mod_dfg_route_endpoint(route, ep, key);
248  if (rtn < 0) {
249  log_error("Could not modify DFG route endpoint");
250  return -1;
251  }
252 
253  rtn = send_mod_endpoint_msg(route, ep);
254  if (rtn < 0) {
255  log_error("Error sending message to modify route endpoint");
256  return -1;
257  }
258  return 0;
259 }
260 
261 int create_worker_thread(unsigned int thread_id, unsigned int runtime_id, char *mode) {
262  struct dfg_runtime *rt = get_dfg_runtime(runtime_id);
263  if (rt == NULL) {
264  log_error("could not retrieve runtime %u from dfg", runtime_id);
265  return -1;
266  }
267 
268  struct dfg_thread *thread = get_dfg_thread(rt, thread_id);
269  if (thread != NULL) {
270  log_error("Thread %u already exists on runtime %u", thread_id, runtime_id);
271  return -1;
272  }
273 
274  enum thread_mode mode_e = str_to_thread_mode(mode);
275  if (mode_e == 0) {
276  log_error("Could not get mode from %s", mode);
277  return -1;
278  }
279 
280  thread = create_dfg_thread(rt, thread_id, mode_e);
281  if (thread == NULL) {
282  log_error("Error creating dfg thread");
283  return -1;
284  };
285 
286  int rtn = send_create_thread_msg(thread, rt);
287  if (rtn < 0) {
288  log_error("Error sending create_thread_msg");
289  return -1;
290  }
291  register_thread_stats(thread_id, runtime_id);
292  return 0;
293 }
294 
Interface for general-purpose socket communication.
int delete_dfg_route(struct dfg_route *route)
Deletes the provided route from the DFG.
Definition: dfg.c:399
uint8_t str_to_vertex_type(char *str_type)
Converts a string containing exit and/or entry to the correct bitmask.
Definition: dfg.c:162
thread_mode
Identifies if a thread is pinned to a core or able to be scheduled on any core.
Definition: dfg.h:91
int send_delete_msu_msg(struct dfg_msu *msu)
struct dfg_route * get_dfg_route(unsigned int id)
Returns the route with the given ID.
Definition: dfg.c:76
int del_dfg_route_endpoint(struct dfg_route *route, struct dfg_route_endpoint *ep)
Removes an MSU as a route endpoint.
Definition: dfg.c:487
int send_add_endpoint_msg(struct dfg_route *route, struct dfg_route_endpoint *endpoint)
int unschedule_dfg_msu(struct dfg_msu *msu)
Removes the given MSU from its runtime and thread.
Definition: dfg.c:339
int send_mod_endpoint_msg(struct dfg_route *route, struct dfg_route_endpoint *endpoint)
static int route(struct msu_type *type, struct local_msu *sender, struct msu_msg *msg, struct msu_endpoint *output)
Definition: baremetal_msu.c:30
struct dfg_runtime * get_dfg_runtime(unsigned int runtime_id)
Returns the runtime with the given ID.
Definition: dfg.c:64
unsigned char uint8_t
Definition: uthash.h:97
struct dfg_route_endpoint * add_dfg_route_endpoint(struct dfg_msu *msu, uint32_t key, struct dfg_route *route)
Adds an MSU as an endpoint to a route.
Definition: dfg.c:455
int add_endpoint(unsigned int msu_id, uint32_t key, unsigned int route_id)
Definition: api.c:177
Logging of status messages to the terminal.
struct dfg_route * create_dfg_route(unsigned int id, struct dfg_msu_type *type, unsigned int runtime_id)
Creates a route with the specified parameters.
Definition: dfg.c:366
struct dfg_thread * create_dfg_thread(struct dfg_runtime *rt, int thread_id, enum thread_mode mode)
Creates a new thread on the given runtime.
Definition: dfg.c:539
Representation of a runtime in the DFG.
Definition: dfg.h:73
struct dfg_msu * init_dfg_msu(unsigned int id, struct dfg_msu_type *type, uint8_t vertex_type, enum blocking_mode mode, struct msu_init_data *init_data)
Allocates a new MSU with the given parameters.
Definition: dfg.c:200
int delete_route(unsigned int route_id)
Definition: api.c:126
int send_create_msu_msg(struct dfg_msu *msu)
int register_msu_stats(unsigned int msu_id, int msu_type_id, int thread_id, int runtime_id)
int remove_msu(unsigned int id)
Definition: api.c:79
#define log_error(fmt,...)
Definition: logging.h:101
int send_add_route_to_msu_msg(struct dfg_route *route, struct dfg_msu *msu)
A type of MSU.
Definition: dfg.h:176
static void unregister_msu_stats(int msu_id)
Unregisters the stat IDS that are relevant to an MSU.
Definition: local_msu.c:189
blocking_mode
Whether an MSU is blocking or non-blocking.
Definition: dfg.h:161
Data with which an MSU is initialized, and the payload for messages of type CTRL_CREATE_MSU.
Definition: dfg.h:66
Representation of a single MSU in the dfg.
Definition: dfg.h:216
int del_endpoint(unsigned int msu_id, unsigned int route_id)
Definition: api.c:206
int add_route_to_msu(unsigned int msu_id, unsigned int route_id)
Definition: api.c:147
int create_worker_thread(unsigned int thread_id, unsigned int runtime_id, char *mode)
Definition: api.c:261
A route through which MSU messages can be passed.
Definition: dfg.h:152
Interfaces for the creation and modification of the data-flow-graph and and general description of th...
struct dfg_thread * get_dfg_thread(struct dfg_runtime *rt, unsigned int id)
Returns the thread on the given runtime with the specified ID.
Definition: dfg.c:108
struct dfg_msu * get_dfg_msu(unsigned int id)
Returns the MSU with the given ID.
Definition: dfg.c:86
int add_msu(unsigned int msu_id, unsigned int type_id, char *init_data_c, char *msu_mode, char *vertex_type_c, unsigned int thread_id, unsigned int runtime_id)
Definition: api.c:35
int register_thread_stats(unsigned int thread_id, unsigned int runtime_id)
int schedule_dfg_msu(struct dfg_msu *msu, unsigned int runtime_id, unsigned int thread_id)
Places the given MSU on the correct runtime and thread.
Definition: dfg.c:261
A single endpoint for an MSU route.
Definition: dfg.h:139
uint32_t key
The key associated with this endpoint.
Definition: dfg.h:140
struct dfg_route_endpoint * get_dfg_route_endpoint(struct dfg_route *route, unsigned int msu_id)
Returns the endpoint within the given route which has the specified MSU ID.
Definition: dfg.c:112
Representation of a thread on a runtime in the DFG.
Definition: dfg.h:104
char init_data[64]
Definition: dfg.h:67
static int runtime_id(int runtime_fd)
int create_route(unsigned int route_id, unsigned int type_id, unsigned int runtime_id)
Definition: api.c:104
unsigned int uint32_t
Definition: uthash.h:96
#define log(level, fmt,...)
Log at a custom level.
Definition: logging.h:147
enum blocking_mode str_to_blocking_mode(char *mode_str)
Converts a string of blocking/non-blocking to the correct enumerator.
Definition: dfg.c:150
int send_delete_route_msg(struct dfg_route *route)
int send_create_route_msg(struct dfg_route *route)
int mod_endpoint(unsigned int msu_id, uint32_t key, unsigned int route_id)
Definition: api.c:234
int send_del_endpoint_msg(struct dfg_route *route, struct dfg_route_endpoint *endpoint)
int send_create_thread_msg(struct dfg_thread *thread, struct dfg_runtime *rt)
struct dfg_msu_type * get_dfg_msu_type(unsigned int id)
Returns the MSU type with the given ID.
Definition: dfg.c:68
int add_dfg_route_to_msu(struct dfg_route *route, struct dfg_msu *msu)
Subscribes an MSU to a route, so it can send to the route's endpoints.
Definition: dfg.c:428
int mod_dfg_route_endpoint(struct dfg_route *route, struct dfg_route_endpoint *ep, uint32_t new_key)
Modifies the key associated with an MSU endpoint.
Definition: dfg.c:505
enum thread_mode str_to_thread_mode(char *mode_str)
Converts a string of pinned/unpinned to the corresponding enumerator.
Definition: dfg.c:139
Default value – used when unset, should never be specified.
Definition: dfg.h:162