Line data Source code
1 : /*
2 : START OF LICENSE STUB
3 : DeDOS: Declarative Dispersion-Oriented Software
4 : Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 :
6 : This program is free software: you can redistribute it and/or modify
7 : it under the terms of the GNU General Public License as published by
8 : the Free Software Foundation, either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : This program is distributed in the hope that it will be useful,
12 : but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : GNU General Public License for more details.
15 :
16 : You should have received a copy of the GNU General Public License
17 : along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : END OF LICENSE STUB
19 : */
20 : /**
21 : * @file msu_msssage.c
22 : *
23 : * Messages passed to an MSU
24 : */
25 : #include "msu_message.h"
26 : #include "logging.h"
27 : #include "communication.h"
28 : #include "local_msu.h"
29 : #include "uthash.h"
30 : #include "runtime_dfg.h"
31 : #include "profiler.h"
32 :
33 : #include <stdlib.h>
34 :
35 0 : int init_msu_msg_hdr(struct msu_msg_hdr *hdr, struct msu_msg_key *key) {
36 0 : memset(hdr, '\0', sizeof(*hdr));
37 0 : hdr->key = *key;
38 0 : return 0;
39 : }
40 :
41 0 : unsigned int msu_msg_sender_type(struct msg_provinance *prov) {
42 0 : return prov->sender.type_id;
43 : }
44 :
45 2 : struct msu_provinance_item *get_provinance_item(struct msg_provinance *p, struct msu_type *type) {
46 3 : for (int i=0; i < p->path_len && i < MAX_PATH_LEN; i++) {
47 3 : if (p->path[i].type_id == type->id) {
48 2 : return &p->path[i];
49 : }
50 : }
51 0 : return NULL;
52 : }
53 :
54 0 : int set_msg_key(int32_t id, struct msu_msg_key *key) {
55 0 : memcpy(&key->key, &id, sizeof(id));
56 0 : key->key_len = sizeof(id);
57 0 : key->id = id;
58 0 : key->group_id = -1;
59 0 : return 0;
60 : }
61 :
62 0 : int seed_msg_key(void *seed, size_t seed_size, struct msu_msg_key *key) {
63 0 : HASH_VALUE(seed, seed_size, key->id);
64 0 : if (seed_size > sizeof(key->key)) {
65 0 : log_warn("Key length too large for composite key!");
66 0 : seed_size = sizeof(key->key);
67 : }
68 0 : memcpy(&key->key, seed, seed_size);
69 0 : key->key_len = seed_size;
70 0 : key->group_id = -1;
71 0 : return 0;
72 : }
73 :
74 2 : int add_provinance(struct msg_provinance *prov, struct local_msu *sender) {
75 2 : prov->sender.type_id = sender->type->id;
76 2 : prov->sender.msu_id = sender->id;
77 2 : prov->sender.runtime_id = local_runtime_id();
78 2 : if (prov->path_len == 0) {
79 1 : prov->origin = prov->sender;
80 : }
81 :
82 : int i;
83 3 : for (i = 0; i < MAX_PATH_LEN && i < prov->path_len; i++) {
84 1 : if (prov->path[i].type_id == sender->type->id) {
85 0 : break;
86 : }
87 : }
88 2 : if (i >= MAX_PATH_LEN) {
89 0 : log_warn("Cannot record provinance in path: Path too short");
90 0 : return 1;
91 : }
92 :
93 2 : prov->path[i] = prov->sender;
94 2 : if (i == prov->path_len) {
95 2 : prov->path_len++;
96 : }
97 2 : log(LOG_ADD_PROVINANCE, "Path len of prov %p is now %d", prov, prov->path_len);
98 2 : return 0;
99 : }
100 :
101 3 : int enqueue_msu_msg(struct msg_queue *q, struct msu_msg *data) {
102 3 : struct timespec zero = {};
103 3 : return schedule_msu_msg(q, data, &zero);
104 : }
105 :
106 3 : int schedule_msu_msg(struct msg_queue *q, struct msu_msg *data, struct timespec *interval) {
107 3 : struct dedos_msg *msg = malloc(sizeof(*msg));
108 3 : if (msg == NULL) {
109 0 : log_perror("Error allocating dedos message");
110 0 : return -1;
111 : }
112 3 : msg->type = MSU_MSG;
113 3 : msg->data_size = sizeof(*data);
114 3 : msg->data = data;
115 :
116 : PROFILE_EVENT(data->hdr, PROF_ENQUEUE);
117 3 : if (schedule_msg(q, msg, interval) != 0) {
118 0 : log_error("Error MSU message to MSU");
119 0 : return -1;
120 : }
121 3 : return 0;
122 : }
123 :
124 4 : struct msu_msg *dequeue_msu_msg(struct msg_queue *q) {
125 4 : struct dedos_msg *msg = dequeue_msg(q);
126 4 : if (msg == NULL) {
127 1 : return NULL;
128 : }
129 :
130 3 : if (msg->type != MSU_MSG) {
131 0 : log_error("Received non-MSU message on msu queue!");
132 0 : return NULL;
133 : }
134 :
135 3 : if (msg->data_size != sizeof(struct msu_msg)) {
136 0 : log_warn("Data size of dequeued MSU message (%d) does not meet expected (%d)",
137 : (int)msg->data_size, (int)sizeof(struct msu_msg));
138 0 : return NULL;
139 : }
140 :
141 3 : struct msu_msg *msu_msg = msg->data;
142 3 : free(msg);
143 : PROFILE_EVENT(msu_msg->hdr, PROF_DEQUEUE);
144 3 : return msu_msg;
145 : }
146 :
147 2 : int read_msu_msg_hdr(int fd, struct msu_msg_hdr *hdr) {
148 2 : if (read_payload(fd, sizeof(*hdr), hdr) != 0) {
149 0 : log_error("Error reading msu msg header from fd %d", fd);
150 0 : return -1;
151 : }
152 :
153 2 : return 0;
154 : }
155 :
156 0 : void destroy_msu_msg_and_contents(struct msu_msg *msg) {
157 0 : free(msg->data);
158 0 : free(msg);
159 0 : }
160 :
161 0 : struct msu_msg *create_msu_msg(struct msu_msg_hdr *hdr,
162 : size_t data_size,
163 : void *data) {
164 0 : struct msu_msg *msg = malloc(sizeof(*msg));
165 0 : msg->hdr = *hdr;
166 0 : msg->data_size = data_size;
167 0 : msg->data = data;
168 0 : return msg;
169 : }
170 :
171 2 : struct msu_msg *read_msu_msg(struct local_msu *msu, int fd, size_t size) {
172 2 : if (size < sizeof(struct msu_msg_hdr)) {
173 0 : log_error("Size of incoming message is not big enough to fit header");
174 0 : return NULL;
175 : }
176 2 : log(LOG_MSU_MSG_READ, "Reading header from %d", fd);
177 2 : struct msu_msg *msg = malloc(sizeof(*msg));
178 2 : if (msg == NULL) {
179 0 : log_perror("Error allocating MSU message");
180 0 : return NULL;
181 : }
182 2 : if (read_msu_msg_hdr(fd, &msg->hdr) != 0) {
183 0 : log_error("Error reading msu message header");
184 0 : free(msg);
185 0 : return NULL;
186 : }
187 : PROFILE_EVENT(msg->hdr, PROF_REMOTE_RECV);
188 2 : size_t data_size = size - sizeof(msg->hdr);
189 2 : void *data = NULL;
190 2 : if (data_size > 0) {
191 2 : data = malloc(data_size);
192 2 : if (data == NULL) {
193 0 : log_perror("Error allocating msu msg of size %d", (int)data_size);
194 0 : free(msg);
195 0 : return NULL;
196 : }
197 2 : log(LOG_MSU_MSG_READ, "Reading payload of size %d from %d", (int)data_size, fd);
198 2 : if (read_payload(fd, data_size, data) != 0) {
199 0 : log_perror("Error reading msu msg payload of size %d", (int)data_size);
200 0 : free(msg);
201 0 : free(data);
202 0 : return NULL;
203 : }
204 2 : log(LOG_MSU_MSG_DESERIALIZE, "Deserialized MSU message of size %d", (int)data_size);
205 : }
206 2 : msg->data_size = data_size;
207 2 : if (msu->type->deserialize) {
208 1 : msg->data = msu->type->deserialize(msu, data_size, data, &msg->data_size);
209 1 : free(data);
210 : } else {
211 1 : msg->data = data;
212 : }
213 2 : return msg;
214 : }
215 :
216 2 : void *serialize_msu_msg(struct msu_msg *msg, struct msu_type *dst_type, size_t *size_out) {
217 :
218 4 : serialization_fn serializer = (msg->hdr.error_flag == 0 ?
219 2 : dst_type->serialize : dst_type->serialize_error);
220 :
221 2 : if (serializer != NULL) {
222 1 : void *payload = NULL;
223 1 : ssize_t payload_size = serializer(dst_type, msg, &payload);
224 1 : log(LOG_SERIALIZE,
225 : "Serialized message into payload of size %d using %s serialization",
226 : (int)payload_size, dst_type->name);
227 1 : if (payload_size < 0) {
228 0 : log_error("Error running destination type %s's serialize function",
229 : dst_type->name);
230 0 : return NULL;
231 : }
232 1 : size_t serialized_size = sizeof(struct msu_msg_hdr) + payload_size;
233 1 : void *output = malloc(serialized_size);
234 1 : if (output == NULL) {
235 0 : log_error("Could not allocate serialized MSU msg");
236 0 : free(payload);
237 0 : return NULL;
238 : }
239 1 : memcpy(output, &msg->hdr, sizeof(msg->hdr));
240 1 : memcpy(output + sizeof(msg->hdr), payload, payload_size);
241 :
242 1 : free(payload);
243 :
244 1 : *size_out = serialized_size;
245 1 : return output;
246 : } else {
247 1 : size_t serialized_size = sizeof(struct msu_msg_hdr) + msg->data_size;
248 1 : void *output = malloc(serialized_size);
249 :
250 1 : if (output == NULL) {
251 0 : log_error("Could not allocate serialized MSU msg");
252 0 : return NULL;
253 : }
254 :
255 1 : memcpy(output, &msg->hdr, sizeof(msg->hdr));
256 1 : memcpy(output + sizeof(msg->hdr), msg->data, msg->data_size);
257 :
258 1 : *size_out = serialized_size;
259 1 : return output;
260 : }
261 : }
|