32 #define JSON_LEN_INCREMENT 1024
34 #define CHECK_JSON_LEN(json, len)\
35 while ( (int)((json).allocated_size - (json).length) < (len)) { \
36 log(LOG_DFG_WRITER, "Reallocating to %d", (int)(json.allocated_size + JSON_LEN_INCREMENT)); \
37 (json).string = realloc((json).string, (json).allocated_size + JSON_LEN_INCREMENT); \
38 (json).allocated_size += JSON_LEN_INCREMENT; \
41 #define START_JSON(json) \
44 #define START_LIST(json) \
46 CHECK_JSON_LEN(json, 2); \
47 (json).length += sprintf((json).string + (json).length, "[ "); \
53 #define END_LIST(json) \
55 CHECK_JSON_LEN(json, 2); \
56 (json).length += sprintf((json).string + (json).length - 1, "],") - 1; \
59 #define START_OBJ(json)\
61 CHECK_JSON_LEN(json, 2); \
62 (json).length += sprintf((json).string + (json).length, "{ " ); \
65 #define END_OBJ(json) \
67 CHECK_JSON_LEN(json, 2); \
68 (json).length += sprintf((json).string + (json).length - 1 , "},") - 1; \
71 #define KEY_VAL(json, key, fmt, value, value_len) \
73 CHECK_JSON_LEN(json, value_len + strlen(key) + 8); \
74 (json).length += sprintf((json).string + (json).length, "\"" key "\":" fmt ",", value); \
77 #define KEY_INTVAL(json, key, value) \
78 KEY_VAL(json, key, "%d", value, 128)
80 #define KEY_STRVAL(json, key, value) \
81 KEY_VAL(json, key, "\"%s\"", value, strlen(value))
83 #define FMT_KEY_VAL(json, key_fmt, key, key_len, val_fmt, value, value_len) \
85 CHECK_JSON_LEN(json, key_len + value_len + 8); \
86 (json).length += sprintf((json).string + (json).length, "\"" key_fmt "\":\"" val_fmt "\",", key, value); \
89 #define KEY(json, key)\
91 CHECK_JSON_LEN(json, strlen(key) + 4); \
92 (json).length += sprintf((json).string + (json).length, "\"" key "\":"); \
95 #define VALUE(json, fmt, value, value_len) \
97 CHECK_JSON_LEN(json, value_len + 4); \
98 (json).length += sprintf((json).string + (json).length, fmt ",", value); \
101 #define END_JSON(json) \
102 (json).string[(json).length-1] = '\0'
119 for (
int i= write_index - n_stats - 1; i<write_index; i++) {
121 if (timeseries->
time[index].tv_sec == 0 && timeseries->
time[index].tv_nsec == 0) {
126 sprintf(ts,
"%ld.%09ld", timeseries->
time[index].tv_sec,
127 timeseries->
time[index].tv_nsec);
128 KEY_VAL(json,
"time",
"%s", ts, 32);
129 KEY_VAL(json,
"value",
"%.3f", timeseries->
data[index], 16);
144 KEY(json,
"src_types");
151 KEY(json,
"dst_types");
186 KEY(json,
"meta_routing");
187 VALUE(json,
"%s", meta_routing, strlen(meta_routing));
189 KEY(json,
"dependencies");
193 VALUE(json,
"%s", dependency, strlen(dependency));
216 for (
int i=0; i<sched->
n_routes; i++) {
239 log_error(
"Cannot get MSU stat %d (idx %d) msu %d",
244 VALUE(json,
"%s", stat_json, strlen(stat_json));
262 KEY_STRVAL(json,
"vertex_type",
"entry/exit");
277 KEY_VAL(json,
"scheduling",
"%s", scheduling, strlen(scheduling));
281 KEY_VAL(json,
"stats",
"%s", stats, strlen(stats));
312 KEY(json,
"endpoints");
316 VALUE(json,
"%s", ep, strlen(ep));
334 struct in_addr addr = {rt->
ip};
335 inet_ntop(AF_INET, &addr, ip, 32);
348 for (
int i=0; i<rt->
n_routes; i++) {
350 VALUE(json,
"%s", route, strlen(route));
376 inet_ntop(AF_INET, &addr, ip, 32);
377 log(LOG_TEST,
"IP IS %s", ip);
382 KEY(json,
"MSU_types");
386 VALUE(json,
"%s", type, strlen(type));
392 for (
int i=0; i<dfg->
n_msus; i++) {
394 VALUE(json,
"%s", msu, strlen(msu));
398 KEY(json,
"runtimes");
402 VALUE(json,
"%s", rt, strlen(rt));
418 int json_size = strlen(dfg_json);
419 FILE *file = fopen(filename,
"w");
421 log_perror(
"Cannot write DFG to %s", filename);
424 fwrite(dfg_json,
sizeof(
char), json_size, file);
425 fwrite(
"\n",
sizeof(
char), 1, file);
434 size_t json_size = strlen(dfg_json);
437 while (written < json_size) {
438 ssize_t rtn = write(fd, dfg_json + written, json_size - written);
440 log_error(
"error writing dfg to fd %d", fd);
static char * msu_stats_to_json(int msu_id, int n_stats)
#define KEY_INTVAL(json, key, value)
int n_routes
The routes that an MSU can send to.
MSUs which must be present for another MSU to be cloned.
#define EXIT_VERTEX_TYPE
Bitmask representing an MSU through which messages exit DeDOS.
int n_msus
The number of MSUs in dedos_dfg::msus.
uint32_t ip
IP of the node on which the runtime is running.
Round-robin database (circular buffer) for storing timeseries data.
static char * stat_to_json(struct timed_rrdb *timeseries, int n_stats)
struct dfg_msu_type * type
The MSU type which must be present.
static char * scheduling_to_json(struct dfg_scheduling *sched)
int global_ctl_port
Port of the global controller.
struct dfg_scheduling scheduling
Information about where an MSU is scheduled.
struct dfg_route * routes[64]
Routes located on this runtime.
uint32_t global_ctl_ip
IP address of the global controller.
int n_cores
Number of cores on the runtime node.
double data[240]
The statistics.
static char * dependency_to_json(struct dfg_dependency *dep)
#define VALUE(json, fmt, value, value_len)
uint8_t vertex_type
Whether the MSU is #ENTRY, #EXIT, or possible #ENTRY | #EXIT.
static struct stat_type_label reported_msu_stat_types[]
int n_msu_types
The number of elements in dedos_dfg::msu_types.
static char * msu_type_to_json(struct dfg_msu_type *type)
#define ENTRY_VERTEX_TYPE
Bitmask representing an MSU through which messages enter DeDOS.
char name[32]
A name describing the function of the MSU.
#define log_perror(fmt,...)
int id
Unique identifier for the runtime.
Logging of status messages to the terminal.
struct timed_rrdb * get_msu_stat(enum stat_id id, unsigned int msu_id)
struct dfg_msu_type * type
The type of the MSU and meta-routing information.
int write_index
Offset into the rrdb at which writing has occurred.
static char * route_to_json(struct dfg_route *route)
char application_name[64]
Description of the whole application.
char * dfg_to_json(struct dedos_dfg *dfg, int n_stats)
Representation of a runtime in the DFG.
static pthread_mutex_t json_lock
struct dfg_dependency * dependencies[32]
These MSU types must be present in order for this MSU type to be cloned.
struct dfg_route_endpoint * endpoints[256]
The endpoints of the route.
int n_endpoints
The number of endpoints in dfg_route::endpoints.
struct dfg_runtime * runtime
The runtime on which an MSU is running.
int n_runtimes
The number of elements in dedos_dfg::runtimes.
static char * msu_to_json(struct dfg_msu *msu, int n_stats)
#define N_REPORTED_MSU_STAT_TYPES
static char * runtime_to_json(struct dfg_runtime *rt)
#define log_error(fmt,...)
static char * meta_routing_to_json(struct dfg_meta_routing *meta_routing)
#define KEY_VAL(json, key, fmt, value, value_len)
struct dfg_msu * msu
The MSU at this endpoint to which a message would be delivered.
struct dfg_msu_type * msu_types[32]
MSU types which may be present in the application.
int id
A unique identifier for the MSU.
int cloneable
If cloneable == N, this MSU can be cloned on runtimes numbered up to and including N...
static char * endpoint_to_json(struct dfg_route_endpoint *ep)
int n_unpinned_threads
Number of the above-threads which are unpinned.
#define RRDB_ENTRIES
timeseries.h
struct dfg_thread * thread
The thread on which an MSU is running.
int id
Unique identifier for the thread.
Representation of a single MSU in the dfg.
void dfg_to_file(char *filename)
#define STAT_SAMPLE_SIZE
Number of statistics sampled in each send from runtime to controller.
int n_pinned_threads
Number of the above-threads which are pinned.
int id
A unique identifier for the MSU type.
int runtime_fd(unsigned int runtime_id)
int id
A unique identifier for the route.
enum msu_locality locality
Whether it must be present on the same machine.
struct dfg_route * routes[32]
Top-level structure holding the data-flow graph.
A route through which MSU messages can be passed.
static struct dedos_dfg * dfg
Static local copy of the DFG, so each call doesn't have to pass a copy.
struct msu_init_data init_data
Initial data passed to the MSU.
Interfaces for the creation and modification of the data-flow-graph and and general description of th...
struct dfg_runtime * runtimes[16]
The runtimes present in the application.
A single endpoint for an MSU route.
uint32_t key
The key associated with this endpoint.
Structure representing the scheduling of an MSU on a runtime.
struct timespec time[240]
The time at which the stats were gathered.
int n_routes
Number of routes above.
#define log(level, fmt,...)
Log at a custom level.
struct dedos_dfg * get_dfg()
int n_dependencies
The number of elements in dfg_msu_type::dependencies.
int port
Port on which the runtime is listening for controller/inter-runtime.
struct dfg_meta_routing meta_routing
Which types of msus route to/from this MSU.
#define KEY_STRVAL(json, key, value)
enum blocking_mode blocking_mode
Whether the MSU is blocking or not.
struct dfg_msu_type * msu_type
The type of MSU to which this route delivers.
struct dfg_msu * msus[512]
The MSUs present in the application.