81 double total_q_len = 0;
85 if (q_lens[i] < .001) {
88 total_q_len += q_lens[i];
95 double pct = (1.0 - q_lens[i] / total_q_len) * 10000;
106 if (old_keys[i] != keys[i]) {
112 log(LOG_ROUTING_CHANGES,
113 "Modified endpoint %d (idx: %d) in route %d to have key %d (old: %d)",
114 ids[i], i, route->
id, keys[i], old_keys[i]);
140 new_keys[i] = last + (key > 0 ? key : 1);
144 int old_diff = old_keys[0];
145 int new_diff = new_keys[0];
148 int change_in_diffs = 0;
150 if (old_keys[i] - old_keys[i-1] != old_diff ||
151 new_keys[i] - new_keys[i-1] != new_diff) {
156 if (!change_in_diffs) {
162 if (old_keys[i] != new_keys[i]) {
168 log(LOG_ROUTING_CHANGES,
169 "Modified endpoint %d (idx: %d) in route %d to have key %d (old: %d)",
170 old_ids[i], i, route->
id, new_keys[i], old_keys[i]);
182 for (
int j=0; j<rt->
n_routes; j++) {
201 for (n_msus=0; msus[n_msus] != NULL; n_msus++);
210 for (i = 0; i < n_msus; ++i) {
213 for (j = 0; j != i && j < n_msus; ++j) {
224 for (i = 0; i < n_msus; ++i) {
226 for (j = i+1; j < n_msus; ++j) {
229 if (msus[j]->type == msus[i]->type->meta_routing.dst_types[up] && j > i) {
285 if (thread->
n_msus == 0) {
286 log(LOG_THREAD_DECISIONS,
"Placing type %d on thread %d",
287 type->
id, thread->
id);
291 int cannot_place = 0;
292 for (
int j=0; j<thread->
n_msus; ++j) {
303 log(LOG_THREAD_DECISIONS,
"Placing type %d on thread %d",
304 type->
id, thread->
id);
323 if (free_thread == NULL) {
324 log(LOG_SCHEDULING,
"There are no free worker threads on runtime %d", rt->
id);
370 int need_remote_dep = 0, need_local_dep = 0;
372 if (dependency != NULL) {
392 log(LOG_SCHEDULING,
"Route of type %d doesn't exist on rt %d. Creating",
398 log_error(
"Could not add new route on runtime %d toward type %d",
403 log_error(
"Could not send create route message");
413 log_error(
"Could not add endpoint %d to route %d",
422 log_error(
"Could not add route %d to msu %d", route->
id, msu->
id);
439 int need_remote_dep = 0, need_local_dep = 0;
441 if (dependency != NULL) {
462 log(LOG_SCHEDULING,
"Route of type %d doesn't exist from rt %d",
467 log_error(
"Could not add new route on runtime %d toward type %d",
476 if (msu_route == NULL) {
477 log(LOG_SCHEDULING,
"Route %d doesn't exist from source %d",
478 route->
id, source->
id);
481 log_error(
"Could not attach route %d to msu %d",
482 source->
id, route->
id);
494 log_error(
"Could not add endpoint %d to route %d",
510 for (
int j=0; j<rt->
n_routes; j++) {
520 log_error(
"Error deleting endpoint from route %d for removal of %d",
531 int n_on_runtime = 0;
536 if (n_on_runtime > 1) {
551 if (n_out >= out_size - 1) {
552 log_error(
"Cannot get dependencies -- output too small");
555 output[n_out] = dep_msu;
560 output[n_out] = NULL;
571 log_error(
"Cannot unclone MSU %d. Does not exist!", msu_id);
576 log(LOG_SCHEDULING,
"Cannot remove last instance of msu type %s", msu->
type->
name);
584 for (
int i=n_deps-1; i>=0; i--) {
590 log_error(
"Error removing routes to msu %d", dependencies[i]->
id);
593 int id = dependencies[i]->
id;
600 log(LOG_SCHEDULING,
"Removed msu %d (type: %s)",
id, name);
624 debug(
"Could not allocate memory for clone msu");
631 debug(
"Cannot clone msu %d", clone->msu_id);
636 memcpy(clone, msu,
sizeof(
struct dfg_msu));
650 log_warn(
"Could not clone msu %d on runtime %d",
658 debug(
"Could not schedule msu %d on runtime %d",
665 debug(
"Could not sort MSUs");
671 while (msus[n] != NULL) {
674 debug(
"Could not update routes for msu %d", msus[n]->msu_id);
683 debug(
"Unable to clone msu %d of type %d", msu->msu_id, msu->msu_type);
688 log_info(
"Cloned msu %d of type %d into msu %d on runtime %d",
693 log_error(
"Unable to properly modify route ranges");
697 log_debug(
"properly changed route ranges");
715 log(
SCHEDULING,
"Could not spawn a new worker thread on runtime %d", rt->
id);
723 memcpy(new_msus, &msu,
sizeof(
struct dfg_msu *));
736 if (existing== NULL) {
738 log_error(
"No instances to spawn from!");
746 log_error(
"Could not allocate memory for missing dependency");
754 ret =
schedule_msu(dep, rt, new_msus + l + 1 - skipped);
757 log_info(
"Could not schedule dependency %d on runtime %d",
763 debug(
"Scheduled dependency %d on runtime %d (l=%d)" , dep->msu_id, rt->
id, l);
766 log_debug(
"Already has dependency %d", dep_type->
id);
770 log_warn(
"non-local dependency %d", dep_type->
id);
776 log_debug(
"Processed %d dependencies", l);
struct dfg_route * get_dfg_msu_route_by_type(struct dfg_msu *msu, struct dfg_msu_type *route_type)
Returns the route which the given MSU sends to of the specified MSU type.
int n_routes
The routes that an MSU can send to.
MSUs which must be present for another MSU to be cloned.
#define EXIT_VERTEX_TYPE
Bitmask representing an MSU through which messages exit DeDOS.
int n_msus
The number of MSUs in dedos_dfg::msus.
enum thread_mode mode
Pinned/unpinned mode for the thread.
double average_n(struct timed_rrdb *timeseries, int n_samples)
timeseries.c
Round-robin database (circular buffer) for storing timeseries data.
struct dfg_msu_type * type
The MSU type which must be present.
struct dfg_msu * copy_dfg_msu(struct dfg_msu *input)
Allocates a new MSU with the same fields as the input MSU (though unscheduled)
int n_msus
Number of MSUs placed on the thread.
#define log_info(fmt,...)
struct dfg_scheduling scheduling
Information about where an MSU is scheduled.
struct dfg_route * routes[64]
Routes located on this runtime.
int wire_msu(struct dfg_msu *msu)
static int downstream_q_len(struct dfg_msu *msu)
struct dfg_msu * instances[512]
Each instance of this MSU type.
int place_on_runtime(struct dfg_runtime *rt, struct dfg_msu *msu)
Find a core on which to spawn a new thread for an MSU.
#define log_debug(...)
Use of log_debug(fmt, ...) is not recommended.
uint8_t vertex_type
Whether the MSU is #ENTRY, #EXIT, or possible #ENTRY | #EXIT.
struct dfg_msu * msus[8]
MSUs placed on that thread.
struct dfg_msu * clone_msu(int msu_id)
Clone a msu of given ID.
struct dfg_thread * find_unused_thread(struct dfg_runtime *runtime, struct dfg_msu_type *type, int is_pinned)
Find a suitable thread for an MSU.
char name[32]
A name describing the function of the MSU.
static int get_dependencies(struct dfg_msu *msu, struct dfg_msu **output, int out_size)
int id
Unique identifier for the runtime.
int add_endpoint(unsigned int msu_id, uint32_t key, unsigned int route_id)
Logging of status messages to the terminal.
struct timed_rrdb * get_msu_stat(enum stat_id id, unsigned int msu_id)
struct dfg_route * create_dfg_route(unsigned int id, struct dfg_msu_type *type, unsigned int runtime_id)
Creates a route with the specified parameters.
struct dfg_msu_type * type
The type of the MSU and meta-routing information.
Representation of a runtime in the DFG.
struct dfg_dependency * dependencies[32]
These MSU types must be present in order for this MSU type to be cloned.
static int remove_routes_to_msu(struct dfg_msu *msu)
struct dfg_route_endpoint * endpoints[256]
The endpoints of the route.
int send_create_msu_msg(struct dfg_msu *msu)
int n_endpoints
The number of endpoints in dfg_route::endpoints.
struct dfg_runtime * runtime
The runtime on which an MSU is running.
int register_msu_stats(unsigned int msu_id, int msu_type_id, int thread_id, int runtime_id)
int n_runtimes
The number of elements in dedos_dfg::runtimes.
int remove_msu(unsigned int id)
struct dfg_route * get_dfg_rt_route_by_type(struct dfg_runtime *rt, struct dfg_msu_type *type)
Returns the route on the given runtime with the specified MSU type.
#define log_error(fmt,...)
struct dfg_msu * msu
The MSU at this endpoint to which a message would be delivered.
static int fix_route_ranges(struct dfg_route *route)
int id
A unique identifier for the MSU.
static int UNUSED n_downstream_msus(struct dfg_msu *msu)
int cloneable
If cloneable == N, this MSU can be cloned on runtimes numbered up to and including N...
int n_unpinned_threads
Number of the above-threads which are unpinned.
static double get_q_len(struct dfg_msu *msu)
void prepare_clone(struct dfg_msu *msu)
Find an ID and clean up data structures for an MSU.
struct dfg_thread * thread
The thread on which an MSU is running.
int fix_all_route_ranges(struct dedos_dfg *dfg)
struct dfg_msu * msu_type_on_runtime(struct dfg_runtime *rt, struct dfg_msu_type *type)
Returns 1 if the given MSU type is present on the provided runtime.
int id
Unique identifier for the thread.
Representation of a single MSU in the dfg.
int del_endpoint(unsigned int msu_id, unsigned int route_id)
int add_route_to_msu(unsigned int msu_id, unsigned int route_id)
#define MAX_MSU
The maximum number of MSUs which can be present in the system at a time.
int n_pinned_threads
Number of the above-threads which are pinned.
int id
A unique identifier for the MSU type.
int id
A unique identifier for the route.
enum msu_locality locality
Whether it must be present on the same machine.
int msu_hierarchical_sort(struct dfg_msu **msus)
Based on the meta routing, sort msus in a list in ascending order (from leaf to root) ...
struct dfg_route * routes[32]
Top-level structure holding the data-flow graph.
#define WEBSERVER_READ_MSU_TYPE_ID
A route through which MSU messages can be passed.
static struct dedos_dfg * dfg
Static local copy of the DFG, so each call doesn't have to pass a copy.
void set_haproxy_weights(int rt_id, int offset)
Interfaces for the creation and modification of the data-flow-graph and and general description of th...
struct dfg_msu * get_dfg_msu(unsigned int id)
Returns the MSU with the given ID.
struct dfg_runtime * runtimes[16]
The runtimes present in the application.
A single endpoint for an MSU route.
struct dfg_thread * threads[32]
Threads located on the runtime.
uint32_t generate_endpoint_key(struct dfg_route *route)
uint32_t key
The key associated with this endpoint.
struct dfg_route_endpoint * get_dfg_route_endpoint(struct dfg_route *route, unsigned int msu_id)
Returns the endpoint within the given route which has the specified MSU ID.
Representation of a thread on a runtime in the DFG.
#define WEBSERVER_HTTP_MSU_TYPE_ID
int n_routes
Number of routes above.
struct dfg_dependency * get_dependency(struct dfg_msu_type *type, struct dfg_msu_type *dep_type)
int create_route(unsigned int route_id, unsigned int type_id, unsigned int runtime_id)
int n_instances
The number of instances of this MSU type.
#define log(level, fmt,...)
Log at a custom level.
struct dedos_dfg * get_dfg()
int n_dependencies
The number of elements in dfg_msu_type::dependencies.
int schedule_msu(struct dfg_msu *msu, struct dfg_runtime *rt, struct dfg_msu **new_msus)
Tries to place an MSU on a given runtime.
int send_create_route_msg(struct dfg_route *route)
int mod_endpoint(unsigned int msu_id, uint32_t key, unsigned int route_id)
struct dfg_meta_routing meta_routing
Which types of msus route to/from this MSU.
#define log_warn(fmt,...)
int msu_has_route(struct dfg_msu *msu, struct dfg_route *route)
Returns 1 if the given MSU has the route as an endpoint.
enum blocking_mode blocking_mode
Whether the MSU is blocking or not.
int unclone_msu(int msu_id)
struct dfg_msu_type * msu_type
The type of MSU to which this route delivers.
struct dfg_msu * msus[512]
The MSUs present in the application.