My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
controller_mysql.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
20 #include "controller_mysql.h"
21 #include "dfg.h"
22 #include "logging.h"
23 #include "stats.h"
24 #include "stdlib.h"
25 #include "local_files.h"
26 #include "controller_dfg.h"
27 
28 #include <mysql.h>
29 #include <stdbool.h>
30 #include <fcntl.h>
31 #include <sys/stat.h>
32 #include <unistd.h>
33 
34 static pthread_mutex_t lock;
35 static MYSQL mysql;
36 bool mysql_initialized = false;
37 
38 #define CHECK_SQL_INIT \
39  if (!mysql_initialized) { \
40  log(LOG_SQL,"MYSQL not initialized"); \
41  return -1; \
42  }
43 
44 
45 #define SQL_LOCK \
46  pthread_mutex_lock(&lock)
47 
48 #define SQL_UNLOCK \
49  pthread_mutex_unlock(&lock)
50 
51 #define MAX_REQ_LEN 1024
52 
53 #define MAX_INIT_CONTENTS_SIZE 8192
54 
55 int db_register_msu_type(int msu_type_id, char *name);
56 int db_register_statistic(int stat_id, char *name);
57 int db_register_thread_stats(int thread_id, int runtime_id);
58 
59 static int split_exec_cmd(char *cmd) {
60  char req[MAX_REQ_LEN];
61 
62  char *delimed = strtok(cmd, ";");
63  while (delimed != NULL && strlen(delimed) > 1) {
64  strncpy(req, delimed, MAX_REQ_LEN);
65  int len = strlen(delimed);
66  req[len] = ';';
67  req[len+1] = '\0';
68 
69  int status = mysql_query(&mysql, req);
70  if (status) {
71  log_error("Could not execute mysql query %s. Error: %s",
72  req, mysql_error(&mysql));
73  return -1;
74  }
75  delimed = strtok(NULL, ";");
76  }
77  return 0;
78 }
79 
80 
81 static int db_clear() {
82  char db_init_file[256];
83  get_local_file(db_init_file, "schema.sql");
84  char contents[MAX_INIT_CONTENTS_SIZE];
85 
86  int fd = open(db_init_file, O_RDONLY);
87  if (fd == -1) {
88  log_error("Error opening schema file %s", db_init_file);
89  return -1;
90  }
91 
92  ssize_t bytes_read = read(fd, contents, MAX_INIT_CONTENTS_SIZE);
93  close(fd);
94  if (bytes_read == MAX_INIT_CONTENTS_SIZE) {
95  log_error("Contents of %s too large", db_init_file);
96  return -1;
97  }
98 
99  contents[bytes_read] = '\0';
100 
101  int status = split_exec_cmd(contents);
102  if (status) {
103  log_error("Could not execute mysql query in %s", db_init_file);
104  return -1;
105  }
106  log_info("Initialized cleared db");
107  return 0;
108 }
109 
116 int db_init(int clear) {
117  pthread_mutex_init(&lock, 0);
118  struct db_info *db = get_db_info();
119 
120  if (mysql_library_init(0, NULL, NULL)) {
121  log_error("Could not initialize MySQL library");
122  return -1;
123  }
124 
125  char db_ip[INET_ADDRSTRLEN];
126  inet_ntop(AF_INET, &db->ip, db_ip, INET_ADDRSTRLEN);
127 
128  if (!mysql_real_connect(&mysql, db_ip, db->user, db->pwd, db->name, 0, NULL,
129  CLIENT_MULTI_STATEMENTS)) {
130  log_error("Could not connect to MySQL DB %s", mysql_error(&mysql));
131  return -1;
132  }
133 
134  if (clear) {
135  int rtn = db_clear();
136  if (rtn < 0) {
137  log_error("Could not clear Database");
138  return -1;
139  }
140  }
141  mysql_initialized = true;
142 
143  for (int i = 0; i < N_REPORTED_STAT_TYPES; ++i) {
145  reported_stat_types[i].name) != 0) {
146  return -1;
147  }
148  }
149 
150  struct dedos_dfg *dfg = get_dfg();
151 
152  for (int i=0; i < dfg->n_msu_types; ++i) {
153  struct dfg_msu_type *type = dfg->msu_types[i];
154  if (db_register_msu_type(type->id, type->name) != 0) {
155  return -1;
156  }
157 
158  }
159 
160  int r;
161  for (r = 1; r <= get_dfg_n_runtimes(); ++r) {
162  struct dfg_runtime *rt = get_dfg_runtime(r);
163  if (db_register_runtime(rt->id) != 0) {
164  return -1;
165  }
166  }
167 
168  return 0;
169 }
170 
177  mysql_library_end();
178  mysql_initialized = false;
179  return 1;
180 }
181 
184  char check_query[MAX_REQ_LEN];
185  char insert_query[MAX_REQ_LEN];
186  const char *element = "stattype";
187 
188  snprintf(check_query, MAX_REQ_LEN,
189  "select * from Statistics where id = (%d)", stat_id);
190 
191  snprintf(insert_query, MAX_REQ_LEN,
192  "insert into Statistics (id, name) values (%d, '%s')",
193  stat_id, name);
194 
195  return db_check_and_register(check_query, insert_query, element, stat_id);
196 }
197 
198 int db_register_msu_type(int msu_type_id, char *name) {
200  char check_query[MAX_REQ_LEN];
201  char insert_query[MAX_REQ_LEN];
202  const char *element = "msutype";
203 
204  snprintf(check_query, MAX_REQ_LEN,
205  "select * from MsuTypes where id = (%d)", msu_type_id);
206 
207  snprintf(insert_query, MAX_REQ_LEN,
208  "insert into MsuTypes (id, name) values (%d, '%s')", msu_type_id, name);
209 
210  return db_check_and_register(check_query, insert_query, element, msu_type_id);
211 }
212 
220  char check_query[MAX_REQ_LEN];
221  char insert_query[MAX_REQ_LEN];
222  const char *element = "runtime";
223 
224  snprintf(check_query, MAX_REQ_LEN,
225  "select * from Runtimes where id = (%d)",
226  runtime_id);
227 
228  snprintf(insert_query, MAX_REQ_LEN,
229  "insert into Runtimes (id) values (%d)",
230  runtime_id);
231 
232  return db_check_and_register(check_query, insert_query, element, runtime_id);
233 }
234 
241 int db_register_thread(int thread_id, int runtime_id) {
243  char check_query[MAX_REQ_LEN];
244  char insert_query[MAX_REQ_LEN];
245  const char *element = "thread";
246 
247  snprintf(check_query, MAX_REQ_LEN,
248  "select * from Threads where thread_id = (%d) and runtime_id = (%d)",
249  thread_id, runtime_id);
250 
251  snprintf(insert_query, MAX_REQ_LEN,
252  "insert into Threads (thread_id, runtime_id) values (%d, %d)",
253  thread_id, runtime_id);
254 
255  return db_check_and_register(check_query, insert_query, element, thread_id);
256 }
257 
258 
266 int db_register_msu(int msu_id, int msu_type_id, int thread_id, int runtime_id) {
268  char check_query[MAX_REQ_LEN];
269  char insert_query[MAX_REQ_LEN];
270  char select_thread_id[MAX_REQ_LEN];
271  const char *element = "MSU";
272 
273  snprintf(check_query, MAX_REQ_LEN,
274  "select * from Msus where msu_id = (%d)",
275  msu_id);
276 
277  snprintf(select_thread_id, MAX_REQ_LEN,
278  "select pk from Threads where thread_id = (%d) and runtime_id = (%d)",
279  thread_id, runtime_id);
280 
281  snprintf(insert_query, MAX_REQ_LEN,
282  "insert into Msus (msu_id, msu_type_id, thread_pk) values (%d, %d, (%s))",
283  msu_id, msu_type_id, select_thread_id);
284 
285  return db_check_and_register(check_query, insert_query, element, msu_id);
286 }
287 
292 int db_register_msu_timeseries(int msu_id) {
294  int i;
295  for (i = 0; i < N_REPORTED_MSU_STAT_TYPES; ++i) {
296  char check_query[MAX_REQ_LEN];
297  char insert_query[MAX_REQ_LEN];
298  char select_msu_pk[MAX_REQ_LEN];
299  const char *element = "timeserie";
300 
301  snprintf(select_msu_pk, MAX_REQ_LEN,
302  "select pk from Msus where msu_id = (%d)", msu_id);
303 
304  snprintf(check_query, MAX_REQ_LEN,
305  "select * from Timeseries where "
306  "msu_pk = (%s) "
307  "and statistic_id = (%d)",
308  select_msu_pk, reported_msu_stat_types[i].id);
309 
310 
311  snprintf(insert_query, MAX_REQ_LEN,
312  "insert into Timeseries (statistic_id, msu_pk) "
313  "values ((%d), (%s))",
314  reported_msu_stat_types[i].id, select_msu_pk);
315 
316  if (db_check_and_register(check_query, insert_query, element, msu_id) != 0) {
317  return -1;
318  }
319  }
320  return 0;
321 }
322 
323 int db_register_thread_timeseries(int thread_id, int runtime_id) {
325  char check_query[MAX_REQ_LEN];
326  char insert_query[MAX_REQ_LEN];
327  char select_thread_pk[MAX_REQ_LEN];
328  const char *element = "thread_timeseries";
329  for (int i=0; i < N_REPORTED_THREAD_STAT_TYPES; ++i) {
330  snprintf(select_thread_pk, MAX_REQ_LEN,
331  "select pk from Threads where thread_id = (%d) and runtime_id = (%d)",
332  thread_id, runtime_id);
333 
334  snprintf(check_query, MAX_REQ_LEN,
335  "select * from Timeseries where "
336  "thread_pk = (%s) "
337  "and statistic_id = (%d)",
338  select_thread_pk, reported_thread_stat_types[i].id);
339 
340  snprintf(insert_query, MAX_REQ_LEN,
341  "insert into Timeseries (statistic_id, thread_pk) "
342  "values ((%d), (%s))",
343  reported_thread_stat_types[i].id, select_thread_pk);
344 
345  if (db_check_and_register(check_query, insert_query, element, thread_id) != 0) {
346  return -1;
347  }
348  }
349  return 0;
350 }
351 
352 
353 
354 int db_register_thread_stats(int thread_id, int runtime_id) {
356  if (db_register_thread(thread_id, runtime_id) != 0) {
357  return -1;
358  }
359  if (db_register_thread_timeseries(thread_id, runtime_id) != 0) {
360  return -1;
361  }
362  return 0;
363 }
364 
365 int db_register_msu_stats(int msu_id, int msu_type_id, int thread_id, int runtime_id) {
367 
368  if (db_register_msu(msu_id, msu_type_id, thread_id, runtime_id) != 0) {
369  return -1;
370  }
371  if (db_register_msu_timeseries(msu_id) != 0) {
372  return -1;
373  }
374  return 0;
375 }
376 
385 int db_check_and_register(const char *check_query, const char *insert_query,
386  const char *element, int element_id) {
388  SQL_LOCK;
389  int query_len;
390 
391  query_len = strlen(check_query);
392  if (mysql_real_query(&mysql, check_query, query_len)) {
393  log_error("MySQL query failed: %s\n %s", mysql_error(&mysql), check_query);
394  SQL_UNLOCK;
395  return -1;
396  } else {
397  MYSQL_RES *result = mysql_store_result(&mysql);
398  if (!result) {
399  log_error("Could not get result from MySQL query %s", mysql_error(&mysql));
400  SQL_UNLOCK;
401  return -1;
402  } else {
403  if (mysql_num_rows(result) == 0) {
404  mysql_free_result(result);
405 
406  query_len = strlen(insert_query);
407  if (mysql_real_query(&mysql, insert_query, query_len)) {
408  log_error("MySQL query (%s) failed: %s", insert_query, mysql_error(&mysql));
409  SQL_UNLOCK;
410  return -1;
411  } else {
412  log(LOG_SQL, "registered element %s (id: %d) in DB", element, element_id);
413  SQL_UNLOCK;
414  return 0;
415  }
416  } else {
417  log(LOG_SQL, "element %s (id: %d) is already registered in DB", element, element_id);
418  mysql_free_result(result);
419  SQL_UNLOCK;
420  return 0;
421  }
422  }
423  }
424 }
425 
426 static int get_ts_query(char query[MAX_REQ_LEN], enum stat_id stat_id, int item_id, int runtime_id) {
427  char select_pk[MAX_REQ_LEN];
428  if (is_thread_stat(stat_id)) {
429  snprintf(select_pk, MAX_REQ_LEN,
430  "select pk from Threads where thread_id = %d and runtime_id = %d",
431  item_id, runtime_id);
432  snprintf(query, MAX_REQ_LEN,
433  "select pk from Timeseries where thread_pk = (%s) and statistic_id = (%d)",
434  select_pk, stat_id);
435  return 0;
436  } else if (is_msu_stat(stat_id)) {
437  snprintf(select_pk, MAX_REQ_LEN,
438  "select pk from Msus where msu_id = %d", item_id);
439  snprintf(query, MAX_REQ_LEN,
440  "select pk from Timeseries where msu_pk = (%s) and statistic_id = (%d)",
441  select_pk, stat_id);
442  return 0;
443  } else {
444  log_error("Cannot get timestamp query for stat type %d", stat_id);
445  return -1;
446  }
447 }
448 
455 int db_insert_sample(struct timed_stat *input, struct stat_sample_hdr *input_hdr, int runtime_id) {
457  SQL_LOCK;
458  /* Find timeserie to insert data point first */
459  char ts_query[MAX_REQ_LEN];
460  if (get_ts_query(ts_query, input_hdr->stat_id, input_hdr->item_id, runtime_id) != 0) {
461  SQL_UNLOCK;
462  return -1;
463  }
464  int i;
465  for (i = 0; i < input_hdr->n_stats; ++i) {
466  if (input[i].time.tv_sec < 0) {
467  continue;
468  }
469  int query_len;
470  char insert_query[MAX_REQ_LEN];
471  query_len = snprintf(insert_query, MAX_REQ_LEN,
472  "insert into Points (timeseries_pk, ts, val) values "
473  "((%s), %lu, %Lf)",
474  ts_query,
475  (unsigned long) input[i].time.tv_sec * (unsigned long)1e9 + (unsigned long)input[i].time.tv_nsec,
476  input[i].value);
477 
478  if (mysql_real_query(&mysql, insert_query, query_len)) {
479  log_error("MySQL query (%s) failed: %s", insert_query, mysql_error(&mysql));
480  SQL_UNLOCK;
481  return -1;
482  } else {
483  log(LOG_SQL, "inserted data point for msu %d and stat %d",
484  input_hdr->item_id, input_hdr->stat_id);
485  SQL_UNLOCK;
486  return 0;
487  }
488  }
489 
490  return 0;
491 }
int db_register_thread(int thread_id, int runtime_id)
Register a runtime's thread in the DB.
bool mysql_initialized
int db_register_thread_timeseries(int thread_id, int runtime_id)
int db_register_msu_timeseries(int msu_id)
Register timseries for an MSU in the DB.
static int split_exec_cmd(char *cmd)
int db_register_runtime(int runtime_id)
Register a runtime in the DB.
#define log_info(fmt,...)
Definition: logging.h:88
int db_check_and_register(const char *check_query, const char *insert_query, const char *element, int element_id)
Register an element in the DB.
static struct stat_type_label reported_msu_stat_types[]
Definition: stats.h:111
uint32_t ip
Definition: dfg.h:231
int n_msu_types
The number of elements in dedos_dfg::msu_types.
Definition: dfg.h:250
stat_id
The identifiers with which stats can be logged.
Definition: stat_ids.h:32
struct dfg_runtime * get_dfg_runtime(unsigned int runtime_id)
Returns the runtime with the given ID.
Definition: dfg.c:64
char name[32]
A name describing the function of the MSU.
Definition: dfg.h:179
int id
Unique identifier for the runtime.
Definition: dfg.h:74
static pthread_mutex_t lock
static MYSQL mysql
Logging of status messages to the terminal.
static struct stat_type_label reported_stat_types[]
Static structure so the reported stat types can be referenced as an array.
Definition: stats.h:105
Access local files within the repo.
int db_terminate()
Destroy the MySQL client environment.
Functions for the sending and receiving of statistics between ctrl and runtime.
char pwd[16]
Definition: dfg.h:235
Representation of a runtime in the DFG.
Definition: dfg.h:73
int n_stats
The size of the sample (number of stats, not number of items)
Definition: stats.h:49
enum stat_id stat_id
The ID of the statistic being sampled.
Definition: stats.h:45
struct timespec time
Definition: stats.h:38
char name[16]
Definition: dfg.h:233
#define CHECK_SQL_INIT
const char * name
Definition: http_parser.c:485
#define N_REPORTED_MSU_STAT_TYPES
Definition: stats.h:114
#define log_error(fmt,...)
Definition: logging.h:101
#define SQL_LOCK
A type of MSU.
Definition: dfg.h:176
static int db_clear()
struct dfg_msu_type * msu_types[32]
MSU types which may be present in the application.
Definition: dfg.h:248
#define N_REPORTED_STAT_TYPES
Number of reported stat types.
Definition: stats.h:109
unsigned int item_id
The ID for the item being sampled.
Definition: stats.h:47
Header for a single stat sample for a single item.
Definition: stats.h:43
int db_init(int clear)
Initialize the MySQL client library, and connect to the server Also init tables for running system...
static struct stat_type_label reported_thread_stat_types[]
Definition: stats.h:117
struct db_info * get_db_info()
Returns DB info.
Definition: dfg.c:56
int get_local_file(char *out, char *file)
Gets a file relative to the path of the executable.
Definition: local_files.c:38
int id
A unique identifier for the MSU type.
Definition: dfg.h:177
char user[16]
Definition: dfg.h:234
Top-level structure holding the data-flow graph.
Definition: dfg.h:239
static struct dedos_dfg * dfg
Static local copy of the DFG, so each call doesn't have to pass a copy.
Definition: dfg.c:32
int get_dfg_n_runtimes()
Returns the number of registered runtime.
Definition: dfg.c:60
Interfaces for the creation and modification of the data-flow-graph and and general description of th...
Info to connect and use database.
Definition: dfg.h:230
int db_register_thread_stats(int thread_id, int runtime_id)
Holds a single timestamped value.
Definition: stats.h:37
static int get_ts_query(char query[1024], enum stat_id stat_id, int item_id, int runtime_id)
int db_register_msu_stats(int msu_id, int msu_type_id, int thread_id, int runtime_id)
static int runtime_id(int runtime_fd)
#define log(level, fmt,...)
Log at a custom level.
Definition: logging.h:147
struct dedos_dfg * get_dfg()
Definition: runtime_dfg.c:115
int is_msu_stat(enum stat_id id)
Definition: stats.c:49
#define MAX_INIT_CONTENTS_SIZE
int db_register_statistic(int stat_id, char *name)
int db_register_msu_type(int msu_type_id, char *name)
#define N_REPORTED_THREAD_STAT_TYPES
Definition: stats.h:120
int is_thread_stat(enum stat_id id)
Definition: stats.c:40
int db_insert_sample(struct timed_stat *input, struct stat_sample_hdr *input_hdr, int runtime_id)
Insert datapoint for a timseries in the DB.
#define SQL_UNLOCK
#define MAX_REQ_LEN
int db_register_msu(int msu_id, int msu_type_id, int thread_id, int runtime_id)
Register an MSU in the DB.
long double value
Definition: stats.h:39