My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
msu_message.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
25 #include "msu_message.h"
26 #include "logging.h"
27 #include "communication.h"
28 #include "local_msu.h"
29 #include "uthash.h"
30 #include "runtime_dfg.h"
31 #include "profiler.h"
32 
33 #include <stdlib.h>
34 
35 int init_msu_msg_hdr(struct msu_msg_hdr *hdr, struct msu_msg_key *key) {
36  memset(hdr, '\0', sizeof(*hdr));
37  hdr->key = *key;
38  return 0;
39 }
40 
41 unsigned int msu_msg_sender_type(struct msg_provinance *prov) {
42  return prov->sender.type_id;
43 }
44 
46  for (int i=0; i < p->path_len && i < MAX_PATH_LEN; i++) {
47  if (p->path[i].type_id == type->id) {
48  return &p->path[i];
49  }
50  }
51  return NULL;
52 }
53 
54 int set_msg_key(int32_t id, struct msu_msg_key *key) {
55  memcpy(&key->key, &id, sizeof(id));
56  key->key_len = sizeof(id);
57  key->id = id;
58  key->group_id = -1;
59  return 0;
60 }
61 
62 int seed_msg_key(void *seed, size_t seed_size, struct msu_msg_key *key) {
63  HASH_VALUE(seed, seed_size, key->id);
64  if (seed_size > sizeof(key->key)) {
65  log_warn("Key length too large for composite key!");
66  seed_size = sizeof(key->key);
67  }
68  memcpy(&key->key, seed, seed_size);
69  key->key_len = seed_size;
70  key->group_id = -1;
71  return 0;
72 }
73 
74 int add_provinance(struct msg_provinance *prov, struct local_msu *sender) {
75  prov->sender.type_id = sender->type->id;
76  prov->sender.msu_id = sender->id;
78  if (prov->path_len == 0) {
79  prov->origin = prov->sender;
80  }
81 
82  int i;
83  for (i = 0; i < MAX_PATH_LEN && i < prov->path_len; i++) {
84  if (prov->path[i].type_id == sender->type->id) {
85  break;
86  }
87  }
88  if (i >= MAX_PATH_LEN) {
89  log_warn("Cannot record provinance in path: Path too short");
90  return 1;
91  }
92 
93  prov->path[i] = prov->sender;
94  if (i == prov->path_len) {
95  prov->path_len++;
96  }
97  log(LOG_ADD_PROVINANCE, "Path len of prov %p is now %d", prov, prov->path_len);
98  return 0;
99 }
100 
101 int enqueue_msu_msg(struct msg_queue *q, struct msu_msg *data) {
102  struct timespec zero = {};
103  return schedule_msu_msg(q, data, &zero);
104 }
105 
106 int schedule_msu_msg(struct msg_queue *q, struct msu_msg *data, struct timespec *interval) {
107  struct dedos_msg *msg = malloc(sizeof(*msg));
108  if (msg == NULL) {
109  log_perror("Error allocating dedos message");
110  return -1;
111  }
112  msg->type = MSU_MSG;
113  msg->data_size = sizeof(*data);
114  msg->data = data;
115 
117  if (schedule_msg(q, msg, interval) != 0) {
118  log_error("Error MSU message to MSU");
119  return -1;
120  }
121  return 0;
122 }
123 
124 struct msu_msg *dequeue_msu_msg(struct msg_queue *q) {
125  struct dedos_msg *msg = dequeue_msg(q);
126  if (msg == NULL) {
127  return NULL;
128  }
129 
130  if (msg->type != MSU_MSG) {
131  log_error("Received non-MSU message on msu queue!");
132  return NULL;
133  }
134 
135  if (msg->data_size != sizeof(struct msu_msg)) {
136  log_warn("Data size of dequeued MSU message (%d) does not meet expected (%d)",
137  (int)msg->data_size, (int)sizeof(struct msu_msg));
138  return NULL;
139  }
140 
141  struct msu_msg *msu_msg = msg->data;
142  free(msg);
143  PROFILE_EVENT(msu_msg->hdr, PROF_DEQUEUE);
144  return msu_msg;
145 }
146 
147 int read_msu_msg_hdr(int fd, struct msu_msg_hdr *hdr) {
148  if (read_payload(fd, sizeof(*hdr), hdr) != 0) {
149  log_error("Error reading msu msg header from fd %d", fd);
150  return -1;
151  }
152 
153  return 0;
154 }
155 
157  free(msg->data);
158  free(msg);
159 }
160 
162  size_t data_size,
163  void *data) {
164  struct msu_msg *msg = malloc(sizeof(*msg));
165  msg->hdr = *hdr;
166  msg->data_size = data_size;
167  msg->data = data;
168  return msg;
169 }
170 
171 struct msu_msg *read_msu_msg(struct local_msu *msu, int fd, size_t size) {
172  if (size < sizeof(struct msu_msg_hdr)) {
173  log_error("Size of incoming message is not big enough to fit header");
174  return NULL;
175  }
176  log(LOG_MSU_MSG_READ, "Reading header from %d", fd);
177  struct msu_msg *msg = malloc(sizeof(*msg));
178  if (msg == NULL) {
179  log_perror("Error allocating MSU message");
180  return NULL;
181  }
182  if (read_msu_msg_hdr(fd, &msg->hdr) != 0) {
183  log_error("Error reading msu message header");
184  free(msg);
185  return NULL;
186  }
188  size_t data_size = size - sizeof(msg->hdr);
189  void *data = NULL;
190  if (data_size > 0) {
191  data = malloc(data_size);
192  if (data == NULL) {
193  log_perror("Error allocating msu msg of size %d", (int)data_size);
194  free(msg);
195  return NULL;
196  }
197  log(LOG_MSU_MSG_READ, "Reading payload of size %d from %d", (int)data_size, fd);
198  if (read_payload(fd, data_size, data) != 0) {
199  log_perror("Error reading msu msg payload of size %d", (int)data_size);
200  free(msg);
201  free(data);
202  return NULL;
203  }
204  log(LOG_MSU_MSG_DESERIALIZE, "Deserialized MSU message of size %d", (int)data_size);
205  }
206  msg->data_size = data_size;
207  if (msu->type->deserialize) {
208  msg->data = msu->type->deserialize(msu, data_size, data, &msg->data_size);
209  free(data);
210  } else {
211  msg->data = data;
212  }
213  return msg;
214 }
215 
216 void *serialize_msu_msg(struct msu_msg *msg, struct msu_type *dst_type, size_t *size_out) {
217 
218  serialization_fn serializer = (msg->hdr.error_flag == 0 ?
219  dst_type->serialize : dst_type->serialize_error);
220 
221  if (serializer != NULL) {
222  void *payload = NULL;
223  ssize_t payload_size = serializer(dst_type, msg, &payload);
224  log(LOG_SERIALIZE,
225  "Serialized message into payload of size %d using %s serialization",
226  (int)payload_size, dst_type->name);
227  if (payload_size < 0) {
228  log_error("Error running destination type %s's serialize function",
229  dst_type->name);
230  return NULL;
231  }
232  size_t serialized_size = sizeof(struct msu_msg_hdr) + payload_size;
233  void *output = malloc(serialized_size);
234  if (output == NULL) {
235  log_error("Could not allocate serialized MSU msg");
236  free(payload);
237  return NULL;
238  }
239  memcpy(output, &msg->hdr, sizeof(msg->hdr));
240  memcpy(output + sizeof(msg->hdr), payload, payload_size);
241 
242  free(payload);
243 
244  *size_out = serialized_size;
245  return output;
246  } else {
247  size_t serialized_size = sizeof(struct msu_msg_hdr) + msg->data_size;
248  void *output = malloc(serialized_size);
249 
250  if (output == NULL) {
251  log_error("Could not allocate serialized MSU msg");
252  return NULL;
253  }
254 
255  memcpy(output, &msg->hdr, sizeof(msg->hdr));
256  memcpy(output + sizeof(msg->hdr), msg->data, msg->data_size);
257 
258  *size_out = serialized_size;
259  return output;
260  }
261 }
Item in the chain of history kept track of in each MSU.
Definition: msu_message.h:59
Interface for general-purpose socket communication.
Header for messages passed to MSUs.
Definition: msu_message.h:85
struct composite_key key
The full, arbitrary-length, unique key (used in state)
Definition: msu_message.h:48
int add_provinance(struct msg_provinance *prov, struct local_msu *sender)
Adds a new item to the path of MSUs taken within the mesasge provinance in the header.
Definition: msu_message.c:74
int error_flag
0 if no error has been encountered
Definition: msu_message.h:91
struct dedos_msg * dequeue_msg(struct msg_queue *q)
Dequeues the first available message from q.
Definition: message_queue.c:87
int read_payload(int fd, size_t size, void *buff)
Reads a buffer of a given size from a file descriptor.
Definition: communication.c:37
struct msu_msg * dequeue_msu_msg(struct msg_queue *q)
Dequeues an MSU message from the provided message queue.
Definition: msu_message.c:124
struct msu_msg * read_msu_msg(struct local_msu *msu, int fd, size_t size)
Reads an MSU message of the given size from the provided file descriptor.
Definition: msu_message.c:171
unsigned int msu_msg_sender_type(struct msg_provinance *prov)
Definition: msu_message.c:41
Profiling.
Definition: stat_ids.h:60
Container for linked list message queue.
Definition: message_queue.h:56
int read_msu_msg_hdr(int fd, struct msu_msg_hdr *hdr)
Definition: msu_message.c:147
void * data
Payload.
Definition: msu_message.h:104
int32_t id
A shorter, often hashed id for the key of fixed length (used in routing)
Definition: msu_message.h:52
#define log_perror(fmt,...)
Definition: logging.h:102
int set_msg_key(int32_t id, struct msu_msg_key *key)
Sets the key's ID and composite-ID to be equal to the provided id.
Definition: msu_message.c:54
struct msu_provinance_item sender
The last MSU to see this message.
Definition: msu_message.h:73
Logging of status messages to the terminal.
unsigned int type_id
Definition: msu_message.h:60
unsigned int id
Unique ID for a local MSU.
Definition: local_msu.h:54
struct msu_provinance_item origin
The first MSU to see this message.
Definition: msu_message.h:71
int seed_msg_key(void *seed, size_t seed_size, struct msu_msg_key *key)
Sets the key's composite-ID to the provided value, and sets the key's ID to a hash of the value...
Definition: msu_message.c:62
Interactions with global dfg from individual runtime.
unsigned int runtime_id
Definition: msu_message.h:62
A linked-list entry containing a message.
Definition: message_queue.h:42
Keeps track of which MSUs have seen a given message header.
Definition: msu_message.h:69
void destroy_msu_msg_and_contents(struct msu_msg *msg)
Frees both the message and message data.
Definition: msu_message.c:156
int schedule_msg(struct msg_queue *q, struct dedos_msg *msg, struct timespec *interval)
Schedules a message to be delivered once interval time has passed.
Definition: message_queue.c:38
For profiling the path of MSU messages through DeDOS.
enum dedos_msg_type type
Target of delivery: runtime, thread, or MSU.
Definition: message_queue.h:46
struct msu_provinance_item * get_provinance_item(struct msg_provinance *p, struct msu_type *type)
Definition: msu_message.c:45
struct msu_msg * create_msu_msg(struct msu_msg_hdr *hdr, size_t data_size, void *data)
Creates an MSU message with the appropriate header, data size, and data.
Definition: msu_message.c:161
size_t data_size
Payload size.
Definition: msu_message.h:103
unsigned int id
Numerical identifier for the MSU.
Definition: msu_type.h:51
#define log_error(fmt,...)
Definition: logging.h:101
Declares the structures and functions applicable to MSUs on the local machine.
int path_len
The current length of msg_provinance::path.
Definition: msu_message.h:78
#define PROFILE_EVENT(hdr, stat_id)
If the header is marked for profiling, profiles the given event.
Definition: profiler.h:77
struct msu_provinance_item path[8]
A list of each MSU that has seen this message TODO: For now, one MSU of each type.
Definition: msu_message.h:76
The structure that represents an MSU located on the local machine.
Definition: local_msu.h:38
serialization_fn serialize
Defines serialization protocol for data sent to this MSU type If NULL, assumes no pointers in the msu...
Definition: msu_type.h:112
int local_runtime_id()
Definition: runtime_dfg.c:91
ssize_t(* serialization_fn)(struct msu_type *, struct msu_msg *, void **output)
Definition: msu_type.h:38
serialization_fn serialize_error
Defines serialization protocol for errors returned to this MSU type If NULL, assumes no pointers in t...
Definition: msu_type.h:118
Defines a type of MSU.
Definition: msu_type.h:46
deserialization_fn deserialize
Defines deserialization protocl for data received by this MSU type If NULL, assumes no pointers in th...
Definition: msu_type.h:128
Profiling.
Definition: stat_ids.h:58
int init_msu_msg_hdr(struct msu_msg_hdr *hdr, struct msu_msg_key *key)
Initializes and resets a message header, storing a copy of the provided key.
Definition: msu_message.c:35
struct msu_msg_hdr hdr
Definition: msu_message.h:102
#define MAX_PATH_LEN
The maximum path length recorded for a messages path through MSUs.
Definition: msu_message.h:66
void * serialize_msu_msg(struct msu_msg *msg, struct msu_type *dst_type, size_t *size_out)
Converts an MSU message into a serializes stream of bytes.
Definition: msu_message.c:216
unsigned int msu_id
Definition: msu_message.h:61
int group_id
Used to mark a route ID when storing state.
Definition: msu_message.h:55
void * data
Payload.
Definition: message_queue.h:48
#define log(level, fmt,...)
Log at a custom level.
Definition: logging.h:147
A message that is to be delivered to an instance of an MSU.
Definition: msu_message.h:101
Profiling.
Definition: stat_ids.h:64
#define HASH_VALUE(keyptr, keylen, hashv)
Definition: uthash.h:133
Messages passed to MSUs.
char * name
Name for the msu type.
Definition: msu_type.h:48
Used to uniquely identify the source of a message, used in state storage as well as routing...
Definition: msu_message.h:46
size_t key_len
The length of the composite key.
Definition: msu_message.h:50
int enqueue_msu_msg(struct msg_queue *q, struct msu_msg *data)
Enqueues a message for immediate delivery.
Definition: msu_message.c:101
#define log_warn(fmt,...)
Definition: logging.h:113
ssize_t data_size
Size of payload.
Definition: message_queue.h:50
int schedule_msu_msg(struct msg_queue *q, struct msu_msg *data, struct timespec *interval)
Schedules an MSU message to be delivered after interval time has passed.
Definition: msu_message.c:106
struct msu_type * type
Pointer to struct containing information shared across all instances of this type of MSU...
Definition: local_msu.h:45
struct msu_msg_key key
Routing/state key.
Definition: msu_message.h:87