34 static pthread_mutex_t
lock;
38 #define CHECK_SQL_INIT \
39 if (!mysql_initialized) { \
40 log(LOG_SQL,"MYSQL not initialized"); \
46 pthread_mutex_lock(&lock)
49 pthread_mutex_unlock(&lock)
51 #define MAX_REQ_LEN 1024
53 #define MAX_INIT_CONTENTS_SIZE 8192
62 char *delimed = strtok(cmd,
";");
63 while (delimed != NULL && strlen(delimed) > 1) {
65 int len = strlen(delimed);
69 int status = mysql_query(&
mysql, req);
71 log_error(
"Could not execute mysql query %s. Error: %s",
72 req, mysql_error(&
mysql));
75 delimed = strtok(NULL,
";");
82 char db_init_file[256];
86 int fd = open(db_init_file, O_RDONLY);
88 log_error(
"Error opening schema file %s", db_init_file);
95 log_error(
"Contents of %s too large", db_init_file);
99 contents[bytes_read] =
'\0';
103 log_error(
"Could not execute mysql query in %s", db_init_file);
117 pthread_mutex_init(&
lock, 0);
120 if (mysql_library_init(0, NULL, NULL)) {
121 log_error(
"Could not initialize MySQL library");
125 char db_ip[INET_ADDRSTRLEN];
126 inet_ntop(AF_INET, &db->
ip, db_ip, INET_ADDRSTRLEN);
128 if (!mysql_real_connect(&
mysql, db_ip, db->
user, db->
pwd, db->
name, 0, NULL,
129 CLIENT_MULTI_STATEMENTS)) {
130 log_error(
"Could not connect to MySQL DB %s", mysql_error(&
mysql));
186 const char *element =
"stattype";
189 "select * from Statistics where id = (%d)", stat_id);
192 "insert into Statistics (id, name) values (%d, '%s')",
202 const char *element =
"msutype";
205 "select * from MsuTypes where id = (%d)", msu_type_id);
208 "insert into MsuTypes (id, name) values (%d, '%s')", msu_type_id, name);
222 const char *element =
"runtime";
225 "select * from Runtimes where id = (%d)",
229 "insert into Runtimes (id) values (%d)",
245 const char *element =
"thread";
248 "select * from Threads where thread_id = (%d) and runtime_id = (%d)",
249 thread_id, runtime_id);
252 "insert into Threads (thread_id, runtime_id) values (%d, %d)",
253 thread_id, runtime_id);
271 const char *element =
"MSU";
274 "select * from Msus where msu_id = (%d)",
278 "select pk from Threads where thread_id = (%d) and runtime_id = (%d)",
279 thread_id, runtime_id);
282 "insert into Msus (msu_id, msu_type_id, thread_pk) values (%d, %d, (%s))",
283 msu_id, msu_type_id, select_thread_id);
299 const char *element =
"timeserie";
302 "select pk from Msus where msu_id = (%d)", msu_id);
305 "select * from Timeseries where "
307 "and statistic_id = (%d)",
312 "insert into Timeseries (statistic_id, msu_pk) "
313 "values ((%d), (%s))",
328 const char *element =
"thread_timeseries";
331 "select pk from Threads where thread_id = (%d) and runtime_id = (%d)",
332 thread_id, runtime_id);
335 "select * from Timeseries where "
337 "and statistic_id = (%d)",
341 "insert into Timeseries (statistic_id, thread_pk) "
342 "values ((%d), (%s))",
368 if (
db_register_msu(msu_id, msu_type_id, thread_id, runtime_id) != 0) {
386 const char *element,
int element_id) {
391 query_len = strlen(check_query);
392 if (mysql_real_query(&
mysql, check_query, query_len)) {
393 log_error(
"MySQL query failed: %s\n %s", mysql_error(&
mysql), check_query);
397 MYSQL_RES *result = mysql_store_result(&
mysql);
399 log_error(
"Could not get result from MySQL query %s", mysql_error(&
mysql));
403 if (mysql_num_rows(result) == 0) {
404 mysql_free_result(result);
406 query_len = strlen(insert_query);
407 if (mysql_real_query(&
mysql, insert_query, query_len)) {
408 log_error(
"MySQL query (%s) failed: %s", insert_query, mysql_error(&
mysql));
412 log(LOG_SQL,
"registered element %s (id: %d) in DB", element, element_id);
417 log(LOG_SQL,
"element %s (id: %d) is already registered in DB", element, element_id);
418 mysql_free_result(result);
429 snprintf(select_pk, MAX_REQ_LEN,
430 "select pk from Threads where thread_id = %d and runtime_id = %d",
431 item_id, runtime_id);
432 snprintf(query, MAX_REQ_LEN,
433 "select pk from Timeseries where thread_pk = (%s) and statistic_id = (%d)",
437 snprintf(select_pk, MAX_REQ_LEN,
438 "select pk from Msus where msu_id = %d", item_id);
439 snprintf(query, MAX_REQ_LEN,
440 "select pk from Timeseries where msu_pk = (%s) and statistic_id = (%d)",
444 log_error(
"Cannot get timestamp query for stat type %d", stat_id);
465 for (i = 0; i < input_hdr->
n_stats; ++i) {
466 if (input[i].time.tv_sec < 0) {
472 "insert into Points (timeseries_pk, ts, val) values "
475 (
unsigned long) input[i].time.tv_sec * (
unsigned long)1e9 + (
unsigned long)input[i].
time.tv_nsec,
478 if (mysql_real_query(&
mysql, insert_query, query_len)) {
479 log_error(
"MySQL query (%s) failed: %s", insert_query, mysql_error(&
mysql));
483 log(LOG_SQL,
"inserted data point for msu %d and stat %d",
int db_register_thread(int thread_id, int runtime_id)
Register a runtime's thread in the DB.
int db_register_thread_timeseries(int thread_id, int runtime_id)
int db_register_msu_timeseries(int msu_id)
Register timseries for an MSU in the DB.
static int split_exec_cmd(char *cmd)
int db_register_runtime(int runtime_id)
Register a runtime in the DB.
#define log_info(fmt,...)
int db_check_and_register(const char *check_query, const char *insert_query, const char *element, int element_id)
Register an element in the DB.
static struct stat_type_label reported_msu_stat_types[]
int n_msu_types
The number of elements in dedos_dfg::msu_types.
stat_id
The identifiers with which stats can be logged.
struct dfg_runtime * get_dfg_runtime(unsigned int runtime_id)
Returns the runtime with the given ID.
char name[32]
A name describing the function of the MSU.
int id
Unique identifier for the runtime.
static pthread_mutex_t lock
Logging of status messages to the terminal.
static struct stat_type_label reported_stat_types[]
Static structure so the reported stat types can be referenced as an array.
Access local files within the repo.
int db_terminate()
Destroy the MySQL client environment.
Functions for the sending and receiving of statistics between ctrl and runtime.
Representation of a runtime in the DFG.
int n_stats
The size of the sample (number of stats, not number of items)
enum stat_id stat_id
The ID of the statistic being sampled.
#define N_REPORTED_MSU_STAT_TYPES
#define log_error(fmt,...)
struct dfg_msu_type * msu_types[32]
MSU types which may be present in the application.
#define N_REPORTED_STAT_TYPES
Number of reported stat types.
unsigned int item_id
The ID for the item being sampled.
Header for a single stat sample for a single item.
int db_init(int clear)
Initialize the MySQL client library, and connect to the server Also init tables for running system...
static struct stat_type_label reported_thread_stat_types[]
struct db_info * get_db_info()
Returns DB info.
int get_local_file(char *out, char *file)
Gets a file relative to the path of the executable.
int id
A unique identifier for the MSU type.
Top-level structure holding the data-flow graph.
static struct dedos_dfg * dfg
Static local copy of the DFG, so each call doesn't have to pass a copy.
int get_dfg_n_runtimes()
Returns the number of registered runtime.
Interfaces for the creation and modification of the data-flow-graph and and general description of th...
Info to connect and use database.
int db_register_thread_stats(int thread_id, int runtime_id)
Holds a single timestamped value.
static int get_ts_query(char query[1024], enum stat_id stat_id, int item_id, int runtime_id)
int db_register_msu_stats(int msu_id, int msu_type_id, int thread_id, int runtime_id)
static int runtime_id(int runtime_fd)
#define log(level, fmt,...)
Log at a custom level.
struct dedos_dfg * get_dfg()
int is_msu_stat(enum stat_id id)
#define MAX_INIT_CONTENTS_SIZE
int db_register_statistic(int stat_id, char *name)
int db_register_msu_type(int msu_type_id, char *name)
#define N_REPORTED_THREAD_STAT_TYPES
int is_thread_stat(enum stat_id id)
int db_insert_sample(struct timed_stat *input, struct stat_sample_hdr *input_hdr, int runtime_id)
Insert datapoint for a timseries in the DB.
int db_register_msu(int msu_id, int msu_type_id, int thread_id, int runtime_id)
Register an MSU in the DB.