My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
scheduling.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
20 #include "dfg.h"
21 #include "controller_dfg.h"
22 #include "scheduling.h"
23 #include "api.h"
24 #include "logging.h"
25 #include "runtime_messages.h"
26 #include "controller_stats.h"
27 #include "msu_ids.h"
28 #include "haproxy.h"
29 #include <unistd.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <stdbool.h>
33 static int UNUSED n_downstream_msus(struct dfg_msu * msu) {
34 
35  int n = 0;
36  struct dfg_route **routes = msu->scheduling.routes;
37  for (int r_i = 0; r_i < msu->scheduling.n_routes; ++r_i) {
38  struct dfg_route *r = routes[r_i];
39  n+= r->n_endpoints;
40  }
41  return n;
42 }
43 
44 #define QLEN_ROUTING
45 
46 #ifdef QLEN_ROUTING
47 
48 static double get_q_len(struct dfg_msu *msu) {
49  struct timed_rrdb *q_len = get_msu_stat(MSU_QUEUE_LEN, msu->id);
50  double avg = average_n(q_len, 5);
51  if (avg < 0) {
52  return 0;
53  }
54  return avg;
55 }
56 
57 static int downstream_q_len(struct dfg_msu *msu) {
58  double qlen = get_q_len(msu);
59  struct dfg_route **routes = msu->scheduling.routes;
60  for (int r_i = 0; r_i < msu->scheduling.n_routes; ++r_i) {
61  struct dfg_route *r = routes[r_i];
62  for (int i=0; i<r->n_endpoints; i++) {
63  qlen += get_q_len(r->endpoints[i]->msu);
64  }
65  }
66  return qlen;
67 }
68 #endif
69 
70 #ifdef QLEN_ROUTING
71 static int fix_route_ranges(struct dfg_route *route) {
72 
73  if (route->msu_type->id == WEBSERVER_HTTP_MSU_TYPE_ID) {
74  return 0;
75  }
76 
77  if (route->n_endpoints <= 1) {
78  return 0;
79  }
80 
81  double total_q_len = 0;
82  double q_lens[route->n_endpoints];
83  for (int i=0; i < route->n_endpoints; i++) {
84  q_lens[i] = downstream_q_len(route->endpoints[i]->msu);
85  if (q_lens[i] < .001) {
86  q_lens[i] = .001;
87  }
88  total_q_len += q_lens[i];
89  }
90  int last = 0;
91  int keys[route->n_endpoints];
92  int old_keys[route->n_endpoints];
93  int ids[route->n_endpoints];
94  for (int i=0; i < route->n_endpoints; i++) {
95  double pct = (1.0 - q_lens[i] / total_q_len) * 10000;
96  if (pct < 1) {
97  pct = 1;
98  }
99  keys[i] = pct + last;
100  last = keys[i];
101  old_keys[i] = route->endpoints[i]->key;
102  ids[i] = route->endpoints[i]->msu->id;
103  }
104 
105  for (int i=0; i < route->n_endpoints; i++) {
106  if (old_keys[i] != keys[i]) {
107  int rtn = mod_endpoint(ids[i], keys[i], route->id);
108  if (rtn < 0) {
109  log_error("Error modifying endpoint");
110  return -1;
111  } else {
112  log(LOG_ROUTING_CHANGES,
113  "Modified endpoint %d (idx: %d) in route %d to have key %d (old: %d)",
114  ids[i], i, route->id, keys[i], old_keys[i]);
115  }
116  }
117  }
118  return 0;
119 }
120 
121 #else
122 
123 static int fix_route_ranges(struct dfg_route *route) {
124 
125  if (route->n_endpoints <= 1) {
126  return 0;
127  }
128 
129  // Calculate what the new keys will be based on the number of downstream MSUs
130  int old_ids[route->n_endpoints];
131  int old_keys[route->n_endpoints];
132  int new_keys[route->n_endpoints];
133  int last = 0;
134  for (int i=0; i<route->n_endpoints; i++) {
135 
136  old_keys[i] = route->endpoints[i]->key;
137  old_ids[i] = route->endpoints[i]->msu->id;
138 
139  double key = n_downstream_msus(route->endpoints[i]->msu);
140  new_keys[i] = last + (key > 0 ? key : 1);
141  last = new_keys[i];
142  }
143 
144  int old_diff = old_keys[0];
145  int new_diff = new_keys[0];
146 
147  // If the differences between the keys is the same across all endpoints, no need to change
148  int change_in_diffs = 0;
149  for (int i=1; i<route->n_endpoints; i++) {
150  if (old_keys[i] - old_keys[i-1] != old_diff ||
151  new_keys[i] - new_keys[i-1] != new_diff) {
152  change_in_diffs = 1;
153  break;
154  }
155  }
156  if (!change_in_diffs) {
157  return 0;
158  }
159 
160 
161  for (int i=0; i<route->n_endpoints; i++) {
162  if (old_keys[i] != new_keys[i]) {
163  int rtn = mod_endpoint(old_ids[i], new_keys[i], route->id);
164  if (rtn < 0) {
165  log_error("Error modifying endpoint");
166  return -1;
167  } else {
168  log(LOG_ROUTING_CHANGES,
169  "Modified endpoint %d (idx: %d) in route %d to have key %d (old: %d)",
170  old_ids[i], i, route->id, new_keys[i], old_keys[i]);
171  }
172  }
173  }
174 
175  return 0;
176 }
177 #endif
178 
180  for (int i=0; i<dfg->n_runtimes; i++) {
181  struct dfg_runtime *rt = dfg->runtimes[i];
182  for (int j=0; j<rt->n_routes; j++) {
183  int rtn = fix_route_ranges(rt->routes[j]);
184  if (rtn < 0) {
185  log_error("Error fixing route ranges");
186  return -1;
187  }
188  }
189  }
190  return 0;
191 }
192 
198 int msu_hierarchical_sort(struct dfg_msu **msus) {
199 
200  int n_msus;
201  for (n_msus=0; msus[n_msus] != NULL; n_msus++);
202 
203  if (n_msus == 1) {
204  return 0;
205  }
206 
207  int i, j;
208  //First find any "exit" vertex and swap them with the first entry
209  //FIXME: atm assumes there is only 1 new exit node
210  for (i = 0; i < n_msus; ++i) {
211  struct dfg_msu *msu = msus[i];
212  if (msu->vertex_type & EXIT_VERTEX_TYPE) {
213  for (j = 0; j != i && j < n_msus; ++j) {
214  if (!(msu->vertex_type & EXIT_VERTEX_TYPE)) {
215  struct dfg_msu *tmp = msus[j];
216  msus[j] = msus[i];
217  msus[i] = tmp;
218  }
219  }
220  }
221  }
222 
223  //Awful linear search to sort
224  for (i = 0; i < n_msus; ++i) {
225  int up;
226  for (j = i+1; j < n_msus; ++j) {
227  for (up = 0; up < msus[i]->type->meta_routing.n_dst_types; ++up) {
228  struct dfg_msu_type *upt = msus[i]->type->meta_routing.dst_types[up];
229  if (msus[j]->type == msus[i]->type->meta_routing.dst_types[up] && j > i) {
230  struct dfg_msu *tmp = msus[j];
231  msus[j] = msus[i];
232  msus[i] = tmp;
233  i = 0;
234  j = 0;
235  break;
236  }
237  for (int upup = 0; upup < upt->meta_routing.n_dst_types; ++upup) {
238  if (msus[j]->type == upt->meta_routing.dst_types[upup] && j > i) {
239  struct dfg_msu *tmp = msus[j];
240  msus[j] = msus[i];
241  msus[i] = tmp;
242  i = 0;
243  j = 0;
244  break;
245  }
246  }
247  }
248  }
249  }
250 
251  return 0;
252 }
253 
259 //FIXME: add error handling
260 void prepare_clone(struct dfg_msu *msu) {
261  msu->id = generate_msu_id();
262  msu->scheduling.n_routes = 0;
263 }
264 
272 struct dfg_thread *find_unused_thread(struct dfg_runtime *runtime,
273  struct dfg_msu_type *type,
274  int is_pinned) {
275  for (int i=0; i<runtime->n_pinned_threads + runtime->n_unpinned_threads; i++) {
276  struct dfg_thread *thread = runtime->threads[i];
277  if ( (!is_pinned && thread->mode == PINNED_THREAD) || (is_pinned && thread->mode != PINNED_THREAD)) {
278  continue;
279  }
280 
281  if (type->colocation_group == 0 && thread->n_msus > 0) {
282  continue;
283  }
284 
285  if (thread->n_msus == 0) {
286  log(LOG_THREAD_DECISIONS, "Placing type %d on thread %d",
287  type->id, thread->id);
288  return thread;
289  }
290 
291  int cannot_place = 0;
292  for (int j=0; j<thread->n_msus; ++j) {
293  if (thread->msus[j]->type->colocation_group != type->colocation_group ||
294  thread->msus[j]->type == type) {
295  cannot_place = 1;
296  break;
297  }
298  }
299  if (cannot_place) {
300  continue;
301  }
302 
303  log(LOG_THREAD_DECISIONS, "Placing type %d on thread %d",
304  type->id, thread->id);
305  return thread;
306  }
307  return NULL;
308 }
309 
310 
317 int place_on_runtime(struct dfg_runtime *rt, struct dfg_msu *msu) {
318  int ret = 0;
319 
320  struct dfg_thread *free_thread = find_unused_thread(rt, msu->type,
321  msu->blocking_mode == NONBLOCK_MSU);
322  //struct dfg_thread *free_thread = find_unused_pinned_thread(rt, msu);
323  if (free_thread == NULL) {
324  log(LOG_SCHEDULING, "There are no free worker threads on runtime %d", rt->id);
325  return -1;
326  }
327 
328  free_thread->msus[free_thread->n_msus] = msu;
329  free_thread->n_msus++;
330  //update local view
331  msu->scheduling.thread = free_thread;
332  msu->scheduling.runtime = rt;
333 
334  register_msu_stats(msu->id, msu->type->id, msu->scheduling.thread->id, msu->scheduling.runtime->id);
335  ret = send_create_msu_msg(msu);
336  if (ret == -1) {
337  log_error("Could not send addmsu command to runtime %d", msu->scheduling.runtime->id);
338  return ret;
339  }
340 
341  //TODO: update rt->current_alloc
342 
343  return ret;
344 }
345 
346 struct dfg_dependency *get_dependency(struct dfg_msu_type *type, struct dfg_msu_type *dep_type) {
347  for (int i=0; i<type->n_dependencies; i++) {
348  if (type->dependencies[i]->type == dep_type) {
349  return type->dependencies[i];
350  }
351  }
352  return NULL;
353 }
354 
355 int wire_msu(struct dfg_msu *msu) {
356  struct dfg_runtime *rt = msu->scheduling.runtime;
357  int ret;
358 
359  //update routes
360  int i, j;
361  struct dfg_msu_type *type = msu->type;
362  for (i = 0; i < type->meta_routing.n_dst_types; ++i) {
363 
364  struct dfg_msu_type *dst_type = type->meta_routing.dst_types[i];
365  if (dst_type->n_instances < 1) {
366  continue;
367  }
368 
369  struct dfg_dependency *dependency = get_dependency(type, dst_type);
370  int need_remote_dep = 0, need_local_dep = 0;
371 
372  if (dependency != NULL) {
373  if (dependency->locality == MSU_IS_LOCAL ) {
374  need_local_dep = 1;
375  } else if (dependency->locality == MSU_IS_REMOTE ) {
376  need_remote_dep = 1;
377  }
378  }
379 
380  for (j = 0; j < dst_type->n_instances; ++j) {
381  struct dfg_msu *dst = dst_type->instances[j];
382 
383  if ((need_local_dep && dst->scheduling.runtime != msu->scheduling.runtime)
384  ||
385  (need_remote_dep && dst->scheduling.runtime == msu->scheduling.runtime)) {
386  continue;
387  } else {
388  //Does the new MSU's runtime has a route toward destination MSU's type?
390  dst->type);
391  if (route == NULL) {
392  log(LOG_SCHEDULING, "Route of type %d doesn't exist on rt %d. Creating",
393  dst->type->id, msu->scheduling.runtime->id);
394  int route_id = generate_route_id();
395 
396  route = create_dfg_route(route_id, dst->type, rt->id);
397  if (route == NULL) {
398  log_error("Could not add new route on runtime %d toward type %d",
399  rt->id, dst->type->id);
400  return -1;
401  }
402  if (send_create_route_msg(route) != 0) {
403  log_error("Could not send create route message");
404  return -1;
405  }
406  }
407 
408  //Is the destination MSU already an endpoint of that route?
409  if (!get_dfg_route_endpoint(route, dst->id)) {
410  uint32_t key = generate_endpoint_key(route);
411  ret = add_endpoint(dst->id, key, route->id);
412  if (ret != 0) {
413  log_error("Could not add endpoint %d to route %d",
414  dst->id, route->id);
415  return -1;
416  }
417  }
418  //Is the route already attached to the new MSU?
419  if (!msu_has_route(msu, route)) {
420  ret = add_route_to_msu(msu->id, route->id);
421  if (ret != 0) {
422  log_error("Could not add route %d to msu %d", route->id, msu->id);
423  return -1;
424  }
425  }
426 
427  }
428  }
429  }
430 
431  for (i = 0; i < type->meta_routing.n_src_types; ++i) {
432 
433  struct dfg_msu_type *src_type = type->meta_routing.src_types[i];
434  if (src_type->n_instances < 1) {
435  continue;
436  }
437 
438  struct dfg_dependency *dependency = get_dependency(type, src_type);
439  int need_remote_dep = 0, need_local_dep = 0;
440 
441  if (dependency != NULL) {
442  if (dependency->locality == MSU_IS_LOCAL) {
443  need_local_dep = 1;
444  } else if (dependency->locality == MSU_IS_REMOTE) {
445  need_remote_dep = 1;
446  }
447  }
448 
449  for (j = 0; j < src_type->n_instances; ++j) {
450  struct dfg_msu *source = src_type->instances[j];
451 
452  if ((need_local_dep && source->scheduling.runtime != msu->scheduling.runtime)
453  ||
454  (need_remote_dep && source->scheduling.runtime == msu->scheduling.runtime)) {
455  continue;
456  } else {
457  struct dfg_runtime *src_rt = source->scheduling.runtime;
458 
459  //Does the source's runtime has a route toward new MSU's type?
460  struct dfg_route *route = get_dfg_rt_route_by_type(src_rt, msu->type);
461  if (route == NULL) {
462  log(LOG_SCHEDULING, "Route of type %d doesn't exist from rt %d",
463  msu->type->id, src_rt->id);
464  int route_id = generate_route_id();
465  int ret = create_route(route_id, msu->type->id, src_rt->id);
466  if (ret != 0) {
467  log_error("Could not add new route on runtime %d toward type %d",
468  src_rt->id, msu->type->id);
469  return -1;
470  }
471  route = get_dfg_rt_route_by_type(src_rt, msu->type);
472  }
473 
474  struct dfg_route *msu_route = get_dfg_msu_route_by_type(source, msu->type);
475  //Is the route attached to that source msu?
476  if (msu_route == NULL) {
477  log(LOG_SCHEDULING, "Route %d doesn't exist from source %d",
478  route->id, source->id);
479  ret = add_route_to_msu(source->id, route->id);
480  if (ret != 0) {
481  log_error("Could not attach route %d to msu %d",
482  source->id, route->id);
483  return -1;
484  }
485  } else {
486  route = msu_route;
487  }
488 
489  //Is the new MSU already an endpoint of that route?
490  if (!get_dfg_route_endpoint(route, msu->id)) {
491  uint32_t key = generate_endpoint_key(route);
492  ret = add_endpoint(msu->id, key, route->id);
493  if (ret != 0) {
494  log_error("Could not add endpoint %d to route %d",
495  msu->id, route->id);
496  return -1;
497  }
498  }
499  }
500  }
501  }
502 
503  return 0;
504 }
505 
506 static int remove_routes_to_msu(struct dfg_msu *msu) {
507  struct dedos_dfg *dfg = get_dfg();
508  for (int i=0; i<dfg->n_runtimes; i++) {
509  struct dfg_runtime *rt = dfg->runtimes[i];
510  for (int j=0; j<rt->n_routes; j++) {
511  struct dfg_route *route = rt->routes[j];
512  struct dfg_route_endpoint *ep = get_dfg_route_endpoint(route, msu->id);
513 
514  if (ep == NULL) {
515  continue;
516  }
517 
518  int rtn = del_endpoint(msu->id, route->id);
519  if (rtn < 0) {
520  log_error("Error deleting endpoint from route %d for removal of %d",
521  route->id, msu->id);
522  return -1;
523  }
524  }
525  }
526  return 0;
527 }
528 
529 static int get_dependencies(struct dfg_msu *msu, struct dfg_msu **output, int out_size) {
530  output[0] = msu;
531  int n_on_runtime = 0;
532  struct dfg_runtime *rt = msu->scheduling.runtime;
533  for (int i=0; i<msu->type->n_instances; i++) {
534  if (msu->type->instances[i]->scheduling.runtime == rt) {
535  n_on_runtime++;
536  if (n_on_runtime > 1) {
537  // More than one on this runtime, just return the passed-in MSU
538  return 1;
539  }
540  }
541  }
542 
543  int n_out = 1;
544  for (int i=0; i<msu->type->n_dependencies; i++) {
545  struct dfg_dependency *dep = msu->type->dependencies[i];
546  // TODO: Check dep->locality
547  struct dfg_msu_type *dep_type = dep->type;
548  for (int j=0; j<dep_type->n_instances; j++) {
549  struct dfg_msu *dep_msu = dep_type->instances[j];
550  if (dep_msu->scheduling.runtime == rt) {
551  if (n_out >= out_size - 1) {
552  log_error("Cannot get dependencies -- output too small");
553  return -1;
554  }
555  output[n_out] = dep_msu;
556  n_out++;
557  }
558  }
559  }
560  output[n_out] = NULL;
561  msu_hierarchical_sort(output);
562  return n_out;
563 }
564 
565 
566 int unclone_msu(int msu_id) {
567 
568  struct dfg_msu *msu = get_dfg_msu(msu_id);
569 
570  if (msu == NULL) {
571  log_error("Cannot unclone MSU %d. Does not exist!", msu_id);
572  return -1;
573  }
574 
575  if (msu->type->n_instances <= 1) {
576  log(LOG_SCHEDULING, "Cannot remove last instance of msu type %s", msu->type->name);
577  return -1;
578  }
579 
580 
581  struct dfg_msu *dependencies[MAX_MSU];
582  int n_deps = get_dependencies(msu, dependencies, MAX_MSU);
583 
584  for (int i=n_deps-1; i>=0; i--) {
585  if (dependencies[i]->type->id == WEBSERVER_READ_MSU_TYPE_ID) {
586  set_haproxy_weights(dependencies[i]->scheduling.runtime->id, 1);
587  }
588  int rtn = remove_routes_to_msu(dependencies[i]);
589  if (rtn < 0) {
590  log_error("Error removing routes to msu %d", dependencies[i]->id);
591  return -1;
592  }
593  int id = dependencies[i]->id;
594  char *name = dependencies[i]->type->name;
595  rtn = remove_msu(id);
596  if (rtn < 0) {
597  log_error("Error removing MSU %d", id);
598  return -1;
599  }
600  log(LOG_SCHEDULING, "Removed msu %d (type: %s)", id, name);
601 
602  //TODO: This should really wait for confirmation of deletion of MSU
603  if (i > 0)
604  usleep(2000e3); // 2 seconds
605  }
606 
607  return 0;
608 }
609 
610 
611 
612 
613 
619 struct dfg_msu *clone_msu(int msu_id) {
620  int ret;
621 
622  struct dfg_msu *clone = calloc(1, sizeof(struct dfg_msu));
623  if (clone == NULL) {
624  debug("Could not allocate memory for clone msu");
625  return NULL;
626  }
627 
628  struct dfg_msu *msu = get_dfg_msu(msu_id);
629 
630  if (msu->type->cloneable == 0) {
631  debug("Cannot clone msu %d", clone->msu_id);
632  unlock_dfg();
633  return NULL;
634  }
635 
636  memcpy(clone, msu, sizeof(struct dfg_msu));
637 
638  prepare_clone(clone);
639  clone->type = msu->type;
640 
641 
642  lock_dfg();
643  struct dedos_dfg *dfg = get_dfg();
644  struct dfg_msu *msus[MAX_MSU];
645  bzero(msus, MAX_MSU * sizeof(struct dfg_msu *));
646 
647  int i;
648  for (i = 0; i < dfg->n_runtimes; ++i) {
649  if (msu->type->cloneable < dfg->runtimes[i]->id) {
650  log_warn("Could not clone msu %d on runtime %d",
651  clone->id, dfg->runtimes[i]->id);
652  continue;
653  }
654  ret = schedule_msu(clone, dfg->runtimes[i], msus);
655  if (ret == 0) {
656  break;
657  } else {
658  debug("Could not schedule msu %d on runtime %d",
659  clone->msu_id, dfg->runtimes[i]->id);
660  }
661  }
662 
663  ret = msu_hierarchical_sort(msus);
664  if (ret == -1) {
665  debug("Could not sort MSUs");
666  }
667 
668  usleep(50000);
669 
670  int n = 0;
671  while (msus[n] != NULL) {
672  ret = wire_msu(msus[n]);
673  if (ret == -1) {
674  debug("Could not update routes for msu %d", msus[n]->msu_id);
675  unlock_dfg();
676  return NULL;
677  }
678 
679  n++;
680  }
681 
682  if (clone->scheduling.runtime == NULL) {
683  debug("Unable to clone msu %d of type %d", msu->msu_id, msu->msu_type);
684  free(clone);
685  unlock_dfg();
686  return NULL;
687  } else {
688  log_info("Cloned msu %d of type %d into msu %d on runtime %d",
689  msu->id, msu->type->id, clone->id, clone->scheduling.runtime->id);
690 
691  int rtn = fix_all_route_ranges(dfg);
692  if (rtn < 0) {
693  log_error("Unable to properly modify route ranges");
694  unlock_dfg();
695  return NULL;
696  }
697  log_debug("properly changed route ranges");
698  unlock_dfg();
699  return clone;
700  }
701 }
702 
710 int schedule_msu(struct dfg_msu *msu, struct dfg_runtime *rt, struct dfg_msu **new_msus) {
711  int ret;
712  // First of all, find a thread on the runtime
713  ret = place_on_runtime(rt, msu);
714  if (ret == -1) {
715  log(SCHEDULING, "Could not spawn a new worker thread on runtime %d", rt->id);
716  return -1;
717  }
718 
719  struct dedos_dfg *dfg = get_dfg();
720 
721 
722  //Add the new MSU to the list of new additions
723  memcpy(new_msus, &msu, sizeof(struct dfg_msu *));
724 
725  //Handle dependencies before wiring
726  int l;
727  int skipped = 0;
728  dfg->msus[dfg->n_msus] = msu;
729  dfg->n_msus++;
730  msu->type->instances[msu->type->n_instances] = msu;
731  msu->type->n_instances++;
732  for (l = 0; l < msu->type->n_dependencies; ++l) {
733  struct dfg_msu_type *dep_type = msu->type->dependencies[l]->type;
734  if (msu->type->dependencies[l]->locality == MSU_IS_LOCAL) {
735  struct dfg_msu *existing = msu_type_on_runtime(msu->scheduling.runtime, dep_type);
736  if (existing== NULL) {
737  if (dep_type->n_instances == 0) {
738  log_error("No instances to spawn from!");
739  return -1;
740  }
741  struct dfg_msu *template = dep_type->instances[0];
742  //spawn missing local dependency
743  struct dfg_msu *dep = copy_dfg_msu(template);
744 
745  if (dep == NULL) {
746  log_error("Could not allocate memory for missing dependency");
747  return -1;
748  }
749 
750 
751  prepare_clone(dep);
752  dep->type = msu->type->dependencies[l]->type;
753 
754  ret = schedule_msu(dep, rt, new_msus + l + 1 - skipped);
755  if (ret == -1) {
756  // FIXME: This doesn't properly remove the original MSUs
757  log_info("Could not schedule dependency %d on runtime %d",
758  dep->id, rt->id);
759  dfg->n_msus--;
760  free(dep);
761  return -1;
762  } else {
763  debug("Scheduled dependency %d on runtime %d (l=%d)" , dep->msu_id, rt->id, l);
764  }
765  } else {
766  log_debug("Already has dependency %d", dep_type->id);
767  skipped++;
768  }
769  } else {
770  log_warn("non-local dependency %d", dep_type->id);
771  skipped++;
772  //TODO: check for remote dependency
773  }
774  }
775 
776  log_debug("Processed %d dependencies", l);
777  return 0;
778 }
struct dfg_route * get_dfg_msu_route_by_type(struct dfg_msu *msu, struct dfg_msu_type *route_type)
Returns the route which the given MSU sends to of the specified MSU type.
Definition: dfg.c:99
int n_routes
The routes that an MSU can send to.
Definition: dfg.h:120
MSUs which must be present for another MSU to be cloned.
Definition: dfg.h:203
#define EXIT_VERTEX_TYPE
Bitmask representing an MSU through which messages exit DeDOS.
Definition: dfg.h:211
int n_msus
The number of MSUs in dedos_dfg::msus.
Definition: dfg.h:255
enum thread_mode mode
Pinned/unpinned mode for the thread.
Definition: dfg.h:106
double average_n(struct timed_rrdb *timeseries, int n_samples)
timeseries.c
Definition: timeseries.c:31
Round-robin database (circular buffer) for storing timeseries data.
Definition: timeseries.h:36
struct dfg_msu_type * type
The MSU type which must be present.
Definition: dfg.h:204
struct dfg_msu * copy_dfg_msu(struct dfg_msu *input)
Allocates a new MSU with the same fields as the input MSU (though unscheduled)
Definition: dfg.c:190
int lock_dfg()
int n_msus
Number of MSUs placed on the thread.
Definition: dfg.h:108
#define log_info(fmt,...)
Definition: logging.h:88
#define debug(fmt,...)
Definition: logging.h:78
struct dfg_scheduling scheduling
Information about where an MSU is scheduled.
Definition: dfg.h:225
struct dfg_route * routes[64]
Routes located on this runtime.
Definition: dfg.h:83
int wire_msu(struct dfg_msu *msu)
Definition: scheduling.c:355
static int downstream_q_len(struct dfg_msu *msu)
Definition: scheduling.c:57
struct dfg_msu * instances[512]
Each instance of this MSU type.
Definition: dfg.h:186
int place_on_runtime(struct dfg_runtime *rt, struct dfg_msu *msu)
Find a core on which to spawn a new thread for an MSU.
Definition: scheduling.c:317
#define log_debug(...)
Use of log_debug(fmt, ...) is not recommended.
Definition: logging.h:76
uint8_t vertex_type
Whether the MSU is #ENTRY, #EXIT, or possible #ENTRY | #EXIT.
Definition: dfg.h:218
int n_src_types
The number of types that should route to this MSU.
Definition: dfg.h:131
struct dfg_msu * msus[8]
MSUs placed on that thread.
Definition: dfg.h:107
struct dfg_msu * clone_msu(int msu_id)
Clone a msu of given ID.
Definition: scheduling.c:619
static int route(struct msu_type *type, struct local_msu *sender, struct msu_msg *msg, struct msu_endpoint *output)
Definition: baremetal_msu.c:30
int n_dst_types
The number of types that this msu should route to.
Definition: dfg.h:133
struct dfg_thread * find_unused_thread(struct dfg_runtime *runtime, struct dfg_msu_type *type, int is_pinned)
Find a suitable thread for an MSU.
Definition: scheduling.c:272
int generate_route_id()
char name[32]
A name describing the function of the MSU.
Definition: dfg.h:179
static int get_dependencies(struct dfg_msu *msu, struct dfg_msu **output, int out_size)
Definition: scheduling.c:529
int id
Unique identifier for the runtime.
Definition: dfg.h:74
int add_endpoint(unsigned int msu_id, uint32_t key, unsigned int route_id)
Definition: api.c:177
Logging of status messages to the terminal.
struct timed_rrdb * get_msu_stat(enum stat_id id, unsigned int msu_id)
struct dfg_route * create_dfg_route(unsigned int id, struct dfg_msu_type *type, unsigned int runtime_id)
Creates a route with the specified parameters.
Definition: dfg.c:366
int colocation_group
Definition: dfg.h:191
struct dfg_msu_type * type
The type of the MSU and meta-routing information.
Definition: dfg.h:221
Representation of a runtime in the DFG.
Definition: dfg.h:73
struct dfg_dependency * dependencies[32]
These MSU types must be present in order for this MSU type to be cloned.
Definition: dfg.h:183
static int remove_routes_to_msu(struct dfg_msu *msu)
Definition: scheduling.c:506
struct dfg_route_endpoint * endpoints[256]
The endpoints of the route.
Definition: dfg.h:156
int send_create_msu_msg(struct dfg_msu *msu)
int n_endpoints
The number of endpoints in dfg_route::endpoints.
Definition: dfg.h:157
const char * name
Definition: http_parser.c:485
struct dfg_runtime * runtime
The runtime on which an MSU is running.
Definition: dfg.h:117
int unlock_dfg()
int register_msu_stats(unsigned int msu_id, int msu_type_id, int thread_id, int runtime_id)
int n_runtimes
The number of elements in dedos_dfg::runtimes.
Definition: dfg.h:260
int remove_msu(unsigned int id)
Definition: api.c:79
struct dfg_route * get_dfg_rt_route_by_type(struct dfg_runtime *rt, struct dfg_msu_type *type)
Returns the route on the given runtime with the specified MSU type.
Definition: dfg.c:90
#define log_error(fmt,...)
Definition: logging.h:101
struct dfg_msu * msu
The MSU at this endpoint to which a message would be delivered.
Definition: dfg.h:143
struct dfg_msu_type * src_types[512]
The types that should route to this MSU.
Definition: dfg.h:130
A type of MSU.
Definition: dfg.h:176
static int fix_route_ranges(struct dfg_route *route)
Definition: scheduling.c:71
int id
A unique identifier for the MSU.
Definition: dfg.h:217
static int UNUSED n_downstream_msus(struct dfg_msu *msu)
Definition: scheduling.c:33
int cloneable
If cloneable == N, this MSU can be cloned on runtimes numbered up to and including N...
Definition: dfg.h:189
int n_unpinned_threads
Number of the above-threads which are unpinned.
Definition: dfg.h:81
static double get_q_len(struct dfg_msu *msu)
Definition: scheduling.c:48
void prepare_clone(struct dfg_msu *msu)
Find an ID and clean up data structures for an MSU.
Definition: scheduling.c:260
struct dfg_thread * thread
The thread on which an MSU is running.
Definition: dfg.h:118
int fix_all_route_ranges(struct dedos_dfg *dfg)
Definition: scheduling.c:179
struct dfg_msu * msu_type_on_runtime(struct dfg_runtime *rt, struct dfg_msu_type *type)
Returns 1 if the given MSU type is present on the provided runtime.
Definition: dfg.c:130
int id
Unique identifier for the thread.
Definition: dfg.h:105
Representation of a single MSU in the dfg.
Definition: dfg.h:216
int del_endpoint(unsigned int msu_id, unsigned int route_id)
Definition: api.c:206
struct dfg_msu_type * dst_types[512]
The types that this msu should route to.
Definition: dfg.h:132
int add_route_to_msu(unsigned int msu_id, unsigned int route_id)
Definition: api.c:147
#define MAX_MSU
The maximum number of MSUs which can be present in the system at a time.
Definition: dfg.h:39
int n_pinned_threads
Number of the above-threads which are pinned.
Definition: dfg.h:80
int id
A unique identifier for the MSU type.
Definition: dfg.h:177
int id
A unique identifier for the route.
Definition: dfg.h:153
#define UNUSED
enum msu_locality locality
Whether it must be present on the same machine.
Definition: dfg.h:205
int msu_hierarchical_sort(struct dfg_msu **msus)
Based on the meta routing, sort msus in a list in ascending order (from leaf to root) ...
Definition: scheduling.c:198
struct dfg_route * routes[32]
Definition: dfg.h:119
Top-level structure holding the data-flow graph.
Definition: dfg.h:239
#define WEBSERVER_READ_MSU_TYPE_ID
Definition: msu_ids.h:25
A route through which MSU messages can be passed.
Definition: dfg.h:152
static struct dedos_dfg * dfg
Static local copy of the DFG, so each call doesn't have to pass a copy.
Definition: dfg.c:32
void set_haproxy_weights(int rt_id, int offset)
Definition: haproxy.c:102
Interfaces for the creation and modification of the data-flow-graph and and general description of th...
struct dfg_msu * get_dfg_msu(unsigned int id)
Returns the MSU with the given ID.
Definition: dfg.c:86
struct dfg_runtime * runtimes[16]
The runtimes present in the application.
Definition: dfg.h:258
A single endpoint for an MSU route.
Definition: dfg.h:139
struct dfg_thread * threads[32]
Threads located on the runtime.
Definition: dfg.h:79
uint32_t generate_endpoint_key(struct dfg_route *route)
uint32_t key
The key associated with this endpoint.
Definition: dfg.h:140
struct dfg_route_endpoint * get_dfg_route_endpoint(struct dfg_route *route, unsigned int msu_id)
Returns the endpoint within the given route which has the specified MSU ID.
Definition: dfg.c:112
Representation of a thread on a runtime in the DFG.
Definition: dfg.h:104
#define WEBSERVER_HTTP_MSU_TYPE_ID
Definition: msu_ids.h:26
int n_routes
Number of routes above.
Definition: dfg.h:84
struct dfg_dependency * get_dependency(struct dfg_msu_type *type, struct dfg_msu_type *dep_type)
Definition: scheduling.c:346
int create_route(unsigned int route_id, unsigned int type_id, unsigned int runtime_id)
Definition: api.c:104
unsigned int uint32_t
Definition: uthash.h:96
int generate_msu_id()
int n_instances
The number of instances of this MSU type.
Definition: dfg.h:187
#define log(level, fmt,...)
Log at a custom level.
Definition: logging.h:147
struct dedos_dfg * get_dfg()
Definition: runtime_dfg.c:115
int n_dependencies
The number of elements in dfg_msu_type::dependencies.
Definition: dfg.h:184
int schedule_msu(struct dfg_msu *msu, struct dfg_runtime *rt, struct dfg_msu **new_msus)
Tries to place an MSU on a given runtime.
Definition: scheduling.c:710
int send_create_route_msg(struct dfg_route *route)
int mod_endpoint(unsigned int msu_id, uint32_t key, unsigned int route_id)
Definition: api.c:234
struct dfg_meta_routing meta_routing
Which types of msus route to/from this MSU.
Definition: dfg.h:180
#define log_warn(fmt,...)
Definition: logging.h:113
int msu_has_route(struct dfg_msu *msu, struct dfg_route *route)
Returns 1 if the given MSU has the route as an endpoint.
Definition: dfg.c:121
enum blocking_mode blocking_mode
Whether the MSU is blocking or not.
Definition: dfg.h:223
int unclone_msu(int msu_id)
Definition: scheduling.c:566
struct dfg_msu_type * msu_type
The type of MSU to which this route delivers.
Definition: dfg.h:155
struct dfg_msu * msus[512]
The MSUs present in the application.
Definition: dfg.h:253