Line data Source code
1 : /*
2 : START OF LICENSE STUB
3 : DeDOS: Declarative Dispersion-Oriented Software
4 : Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 :
6 : This program is free software: you can redistribute it and/or modify
7 : it under the terms of the GNU General Public License as published by
8 : the Free Software Foundation, either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : This program is distributed in the hope that it will be useful,
12 : but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : GNU General Public License for more details.
15 :
16 : You should have received a copy of the GNU General Public License
17 : along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : END OF LICENSE STUB
19 : */
20 : /**
21 : * @file msu_calls.c
22 : * Defines methods used for calling MSUs from other MSUs
23 : */
24 :
25 : #include "msu_calls.h"
26 : #include "logging.h"
27 : #include "thread_message.h"
28 : #include "output_thread.h"
29 : #include "routing_strategies.h"
30 : #include "profiler.h"
31 :
32 : /**
33 : * Calls type-specific MSU serialization function and enqueues data into main
34 : * thread queue so it can be forwarded to remote MSU
35 : * @param msg Message to be enqueued to the remote MSU
36 : * @param dst_type MSU type of the destination
37 : * @param dst The specific MSU destination
38 : * @return 0 on success, -1 on error
39 : */
40 0 : static int enqueue_for_remote_send(struct msu_msg *msg,
41 : struct msu_type *dst_type,
42 : struct msu_endpoint *dst) {
43 : size_t size;
44 : // Is it okay that I malloc within the serialize fn but have to free here on error?
45 0 : void *serialized = serialize_msu_msg(msg, dst_type, &size);
46 0 : if (serialized == NULL) {
47 0 : log_error("Error serializing MSU msg for delivery to msu %d", dst->id);
48 0 : return -1;
49 : }
50 :
51 0 : struct thread_msg *thread_msg = init_send_thread_msg(dst->runtime_id,
52 0 : dst->id,
53 : size,
54 : serialized);
55 :
56 0 : if (thread_msg == NULL) {
57 0 : log_error("Error creating thread message for sending MSU msg to msu %d",
58 : dst->id);
59 0 : free(serialized);
60 0 : return -1;
61 : }
62 :
63 : PROFILE_EVENT(msg->hdr, PROF_REMOTE_SEND);
64 0 : int rtn = enqueue_for_output(thread_msg);
65 0 : if (rtn < 0) {
66 0 : log_error("Error enqueuing MSU msg for msu %u on main thread", dst->id);
67 0 : destroy_thread_msg(thread_msg);
68 0 : free(serialized);
69 0 : return -1;
70 : }
71 :
72 0 : log(LOG_MSU_ENQUEUES, "Enqueued message for remote send to dst %d on runtime %d",
73 : dst->id, dst->runtime_id);
74 0 : return 0;
75 : }
76 :
77 0 : int call_local_msu(struct local_msu *sender, struct local_msu *dest,
78 : struct msu_msg_hdr *hdr, size_t data_size, void *data) {
79 0 : struct msu_msg *msg = create_msu_msg(hdr, data_size, data);
80 0 : log(LOG_MSU_ENQUEUES, "Enqueing data %p directly to destination %d",
81 : msg->data, dest->id);
82 :
83 0 : int rtn = add_provinance(&msg->hdr.provinance, sender);
84 0 : if (rtn < 0) {
85 0 : log_warn("Could not add provinance to message %p", msg);
86 : }
87 :
88 0 : rtn = enqueue_msu_msg(&dest->queue, msg);
89 0 : if (rtn < 0) {
90 0 : log_error("Error enqueing data %p to local MSU %d", msg->data, dest->id);
91 0 : free(msg);
92 0 : return -1;
93 : }
94 0 : return 0;
95 : }
96 :
97 1 : int schedule_local_msu_call(struct local_msu *sender, struct local_msu *dest, struct timespec *interval,
98 : struct msu_msg_hdr *hdr, size_t data_size, void *data) {
99 1 : struct msu_msg *msg = create_msu_msg(hdr, data_size, data);
100 1 : log(LOG_MSU_ENQUEUES, "Enqueing data %p directly to destination %d",
101 : msg->data, dest->id);
102 :
103 1 : int rtn = add_provinance(&msg->hdr.provinance, sender);
104 1 : if (rtn < 0) {
105 0 : log_warn("Could not add provinance to message %p", msg);
106 : }
107 :
108 1 : rtn = schedule_msu_msg(&dest->queue, msg, interval);
109 1 : if (rtn < 0) {
110 0 : log_error("Error enqueing data %p to local MSU %d", msg->data, dest->id);
111 0 : free(msg);
112 0 : return -1;
113 : }
114 1 : rtn = enqueue_worker_timeout(dest->thread, interval);
115 1 : if (rtn < 0) {
116 0 : log_warn("Error enqueing timeout to worker thread");
117 : }
118 1 : return 0;
119 : }
120 :
121 1 : int schedule_local_msu_init_call(struct local_msu *sender, struct local_msu *dest, struct timespec *interval,
122 : struct msu_msg_key *key, size_t data_size, void *data) {
123 : struct msu_msg_hdr hdr;
124 1 : if (init_msu_msg_hdr(&hdr, key) != 0) {
125 0 : log_error("Error initializing message header");
126 0 : return -1;
127 : }
128 : SET_PROFILING(hdr);
129 : PROFILE_EVENT(hdr, PROF_DEDOS_ENTRY);
130 1 : return schedule_local_msu_call(sender, dest, interval, &hdr, data_size, data);
131 : }
132 :
133 :
134 0 : int init_call_local_msu(struct local_msu *sender, struct local_msu *dest,
135 : struct msu_msg_key *key, size_t data_size, void *data) {
136 : struct msu_msg_hdr hdr;
137 0 : if (init_msu_msg_hdr(&hdr, key) != 0) {
138 0 : log_error("Error initializing message header");
139 0 : return -1;
140 : }
141 : SET_PROFILING(hdr);
142 : PROFILE_EVENT(hdr, PROF_DEDOS_ENTRY);
143 0 : return call_local_msu(sender, dest, &hdr, data_size, data);
144 : }
145 :
146 0 : int call_msu_type(struct local_msu *sender, struct msu_type *dst_type,
147 : struct msu_msg_hdr *hdr, size_t data_size, void *data) {
148 :
149 0 : struct msu_msg *msg = create_msu_msg(hdr, data_size, data);
150 :
151 0 : int rtn = add_provinance(&msg->hdr.provinance, sender);
152 0 : if (rtn < 0) {
153 0 : log_warn("Could not add provinance to message %p", msg);
154 : }
155 :
156 0 : log(LOG_MSU_ENQUEUES, "Sending data %p to destination type %s",
157 : msg->data, dst_type->name);
158 :
159 : struct msu_endpoint dst;
160 0 : if (dst_type->route) {
161 0 : rtn = dst_type->route(dst_type, sender, msg, &dst);
162 : } else {
163 0 : rtn = default_routing(dst_type, sender, msg, &dst);
164 : }
165 :
166 0 : if (rtn < 0) {
167 0 : log_error("Could not find destination endpoint of type %s from msu %d (%s). "
168 : "Dropping message %p",
169 : dst_type->name, sender->id, sender->type->name, msg);
170 0 : free(msg);
171 0 : return -1;
172 : }
173 :
174 0 : msg->hdr.key.group_id = dst.route_id;
175 :
176 0 : switch (dst.locality) {
177 : case MSU_IS_LOCAL:
178 0 : rtn = enqueue_msu_msg(dst.queue, msg);
179 0 : if (rtn < 0) {
180 0 : log_error("Error enqueuing data %p to local MSU %d", msg->data, dst.id);
181 0 : free(msg);
182 0 : return -1;
183 : }
184 0 : log(LOG_MSU_ENQUEUES, "Enqueued data %p to local msu %d", msg->data, dst.id);
185 0 : return 0;
186 : case MSU_IS_REMOTE:
187 0 : rtn = enqueue_for_remote_send(msg, dst_type, &dst);
188 0 : if (rtn < 0) {
189 0 : log_error("Error sending data %p to remote MSU %d", msg->data, dst.id);
190 0 : return -1.i;
191 : }
192 0 : log(LOG_MSU_ENQUEUES, "Sending data %p to remote msu %d", msg->data, dst.id);
193 :
194 : // Since the data has been sent to a remote MSU, we can now
195 : // free the msu message from this runtime's memory
196 0 : destroy_msu_msg_and_contents(msg);
197 0 : return 0;
198 : default:
199 0 : log_error("Unknown MSU locality for msu %d (from %d): %d",
200 : dst.id, sender->id, dst.locality);
201 0 : return -1;
202 : }
203 : }
204 :
205 0 : int init_call_msu_type(struct local_msu *sender, struct msu_type *dst_type,
206 : struct msu_msg_key *key, size_t data_size, void *data) {
207 : struct msu_msg_hdr hdr;
208 0 : if (init_msu_msg_hdr(&hdr, key) != 0) {
209 0 : log_error("Error initializing message header");
210 0 : return -1;
211 : }
212 : SET_PROFILING(hdr);
213 : PROFILE_EVENT(hdr, PROF_DEDOS_ENTRY);
214 0 : return call_msu_type(sender, dst_type, &hdr, data_size, data);
215 : }
216 :
217 0 : int call_msu_endpoint(struct local_msu *sender, struct msu_endpoint *endpoint,
218 : struct msu_type *endpoint_type,
219 : struct msu_msg_hdr *hdr, size_t data_size, void *data) {
220 0 : struct msu_msg *msg = create_msu_msg(hdr, data_size, data);
221 :
222 :
223 0 : int rtn = add_provinance(&msg->hdr.provinance, sender);
224 0 : if (rtn < 0) {
225 0 : log_warn("Could not add provinance to message %p", msg);
226 : }
227 :
228 0 : log(LOG_MSU_ENQUEUES, "Sending data %p to destination endpoint %d",
229 : msg->data, endpoint->id);
230 :
231 0 : switch (endpoint->locality) {
232 : case MSU_IS_LOCAL:
233 0 : rtn = enqueue_msu_msg(endpoint->queue, msg);
234 0 : if (rtn < 0) {
235 0 : log_error("Error enqueuing data %p to local msu %d", msg->data, endpoint->id);
236 0 : free(msg);
237 0 : return -1;
238 : }
239 0 : log(LOG_MSU_ENQUEUES, "Enqueued data %p to local msu %d", msg->data, endpoint->id);
240 0 : return 0;
241 : case MSU_IS_REMOTE:
242 0 : rtn = enqueue_for_remote_send(msg, endpoint_type, endpoint);
243 0 : if (rtn < 0) {
244 0 : log_error("Error enqueueing data %p towards remote msu %d",
245 : msg->data, endpoint->id);
246 0 : free(msg);
247 0 : return -1;
248 : }
249 0 : log(LOG_MSU_ENQUEUES, "Enqueued data %p toward remote msu %d",
250 : msg->data, endpoint->id);
251 0 : return 0;
252 : default:
253 0 : log_error("Unknown MSU locality: %d", endpoint->locality);
254 0 : return -1;
255 : }
256 : }
257 :
258 0 : int init_call_msu_endpoint(struct local_msu *sender, struct msu_endpoint *endpoint,
259 : struct msu_type *endpoint_type,
260 : struct msu_msg_key *key, size_t data_size, void *data) {
261 : struct msu_msg_hdr hdr;
262 0 : if (init_msu_msg_hdr(&hdr, key) != 0) {
263 0 : log_error("Error initializing message header");
264 0 : return -1;
265 : }
266 : SET_PROFILING(hdr);
267 : PROFILE_EVENT(hdr, PROF_DEDOS_ENTRY);
268 0 : return call_msu_endpoint(sender, endpoint, endpoint_type, &hdr, data_size, data);
269 : }
270 :
271 0 : int call_msu_error(struct local_msu *sender, struct msu_endpoint *endpoint,
272 : struct msu_type *endpoint_type,
273 : struct msu_msg_hdr *hdr, size_t data_size, void *data) {
274 0 : hdr->error_flag = -1;
275 0 : return call_msu_endpoint(sender, endpoint, endpoint_type, hdr, data_size, data);
276 : }
277 :
278 :
279 :
|