My Project
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Macros
msu_calls.c
Go to the documentation of this file.
1 /*
2 START OF LICENSE STUB
3  DeDOS: Declarative Dispersion-Oriented Software
4  Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 END OF LICENSE STUB
19 */
25 #include "msu_calls.h"
26 #include "logging.h"
27 #include "thread_message.h"
28 #include "output_thread.h"
29 #include "routing_strategies.h"
30 #include "profiler.h"
31 
40 static int enqueue_for_remote_send(struct msu_msg *msg,
41  struct msu_type *dst_type,
42  struct msu_endpoint *dst) {
43  size_t size;
44  // Is it okay that I malloc within the serialize fn but have to free here on error?
45  void *serialized = serialize_msu_msg(msg, dst_type, &size);
46  if (serialized == NULL) {
47  log_error("Error serializing MSU msg for delivery to msu %d", dst->id);
48  return -1;
49  }
50 
52  dst->id,
53  size,
54  serialized);
55 
56  if (thread_msg == NULL) {
57  log_error("Error creating thread message for sending MSU msg to msu %d",
58  dst->id);
59  free(serialized);
60  return -1;
61  }
62 
64  int rtn = enqueue_for_output(thread_msg);
65  if (rtn < 0) {
66  log_error("Error enqueuing MSU msg for msu %u on main thread", dst->id);
67  destroy_thread_msg(thread_msg);
68  free(serialized);
69  return -1;
70  }
71 
72  log(LOG_MSU_ENQUEUES, "Enqueued message for remote send to dst %d on runtime %d",
73  dst->id, dst->runtime_id);
74  return 0;
75 }
76 
77 int call_local_msu(struct local_msu *sender, struct local_msu *dest,
78  struct msu_msg_hdr *hdr, size_t data_size, void *data) {
79  struct msu_msg *msg = create_msu_msg(hdr, data_size, data);
80  log(LOG_MSU_ENQUEUES, "Enqueing data %p directly to destination %d",
81  msg->data, dest->id);
82 
83  int rtn = add_provinance(&msg->hdr.provinance, sender);
84  if (rtn < 0) {
85  log_warn("Could not add provinance to message %p", msg);
86  }
87 
88  rtn = enqueue_msu_msg(&dest->queue, msg);
89  if (rtn < 0) {
90  log_error("Error enqueing data %p to local MSU %d", msg->data, dest->id);
91  free(msg);
92  return -1;
93  }
94  return 0;
95 }
96 
97 int schedule_local_msu_call(struct local_msu *sender, struct local_msu *dest, struct timespec *interval,
98  struct msu_msg_hdr *hdr, size_t data_size, void *data) {
99  struct msu_msg *msg = create_msu_msg(hdr, data_size, data);
100  log(LOG_MSU_ENQUEUES, "Enqueing data %p directly to destination %d",
101  msg->data, dest->id);
102 
103  int rtn = add_provinance(&msg->hdr.provinance, sender);
104  if (rtn < 0) {
105  log_warn("Could not add provinance to message %p", msg);
106  }
107 
108  rtn = schedule_msu_msg(&dest->queue, msg, interval);
109  if (rtn < 0) {
110  log_error("Error enqueing data %p to local MSU %d", msg->data, dest->id);
111  free(msg);
112  return -1;
113  }
114  rtn = enqueue_worker_timeout(dest->thread, interval);
115  if (rtn < 0) {
116  log_warn("Error enqueing timeout to worker thread");
117  }
118  return 0;
119 }
120 
121 int schedule_local_msu_init_call(struct local_msu *sender, struct local_msu *dest, struct timespec *interval,
122  struct msu_msg_key *key, size_t data_size, void *data) {
123  struct msu_msg_hdr hdr;
124  if (init_msu_msg_hdr(&hdr, key) != 0) {
125  log_error("Error initializing message header");
126  return -1;
127  }
128  SET_PROFILING(hdr);
130  return schedule_local_msu_call(sender, dest, interval, &hdr, data_size, data);
131 }
132 
133 
134 int init_call_local_msu(struct local_msu *sender, struct local_msu *dest,
135  struct msu_msg_key *key, size_t data_size, void *data) {
136  struct msu_msg_hdr hdr;
137  if (init_msu_msg_hdr(&hdr, key) != 0) {
138  log_error("Error initializing message header");
139  return -1;
140  }
141  SET_PROFILING(hdr);
143  return call_local_msu(sender, dest, &hdr, data_size, data);
144 }
145 
146 int call_msu_type(struct local_msu *sender, struct msu_type *dst_type,
147  struct msu_msg_hdr *hdr, size_t data_size, void *data) {
148 
149  struct msu_msg *msg = create_msu_msg(hdr, data_size, data);
150 
151  int rtn = add_provinance(&msg->hdr.provinance, sender);
152  if (rtn < 0) {
153  log_warn("Could not add provinance to message %p", msg);
154  }
155 
156  log(LOG_MSU_ENQUEUES, "Sending data %p to destination type %s",
157  msg->data, dst_type->name);
158 
159  struct msu_endpoint dst;
160  if (dst_type->route) {
161  rtn = dst_type->route(dst_type, sender, msg, &dst);
162  } else {
163  rtn = default_routing(dst_type, sender, msg, &dst);
164  }
165 
166  if (rtn < 0) {
167  log_error("Could not find destination endpoint of type %s from msu %d (%s). "
168  "Dropping message %p",
169  dst_type->name, sender->id, sender->type->name, msg);
170  free(msg);
171  return -1;
172  }
173 
174  msg->hdr.key.group_id = dst.route_id;
175 
176  switch (dst.locality) {
177  case MSU_IS_LOCAL:
178  rtn = enqueue_msu_msg(dst.queue, msg);
179  if (rtn < 0) {
180  log_error("Error enqueuing data %p to local MSU %d", msg->data, dst.id);
181  free(msg);
182  return -1;
183  }
184  log(LOG_MSU_ENQUEUES, "Enqueued data %p to local msu %d", msg->data, dst.id);
185  return 0;
186  case MSU_IS_REMOTE:
187  rtn = enqueue_for_remote_send(msg, dst_type, &dst);
188  if (rtn < 0) {
189  log_error("Error sending data %p to remote MSU %d", msg->data, dst.id);
190  return -1.i;
191  }
192  log(LOG_MSU_ENQUEUES, "Sending data %p to remote msu %d", msg->data, dst.id);
193 
194  // Since the data has been sent to a remote MSU, we can now
195  // free the msu message from this runtime's memory
197  return 0;
198  default:
199  log_error("Unknown MSU locality for msu %d (from %d): %d",
200  dst.id, sender->id, dst.locality);
201  return -1;
202  }
203 }
204 
205 int init_call_msu_type(struct local_msu *sender, struct msu_type *dst_type,
206  struct msu_msg_key *key, size_t data_size, void *data) {
207  struct msu_msg_hdr hdr;
208  if (init_msu_msg_hdr(&hdr, key) != 0) {
209  log_error("Error initializing message header");
210  return -1;
211  }
212  SET_PROFILING(hdr);
214  return call_msu_type(sender, dst_type, &hdr, data_size, data);
215 }
216 
217 int call_msu_endpoint(struct local_msu *sender, struct msu_endpoint *endpoint,
218  struct msu_type *endpoint_type,
219  struct msu_msg_hdr *hdr, size_t data_size, void *data) {
220  struct msu_msg *msg = create_msu_msg(hdr, data_size, data);
221 
222 
223  int rtn = add_provinance(&msg->hdr.provinance, sender);
224  if (rtn < 0) {
225  log_warn("Could not add provinance to message %p", msg);
226  }
227 
228  log(LOG_MSU_ENQUEUES, "Sending data %p to destination endpoint %d",
229  msg->data, endpoint->id);
230 
231  switch (endpoint->locality) {
232  case MSU_IS_LOCAL:
233  rtn = enqueue_msu_msg(endpoint->queue, msg);
234  if (rtn < 0) {
235  log_error("Error enqueuing data %p to local msu %d", msg->data, endpoint->id);
236  free(msg);
237  return -1;
238  }
239  log(LOG_MSU_ENQUEUES, "Enqueued data %p to local msu %d", msg->data, endpoint->id);
240  return 0;
241  case MSU_IS_REMOTE:
242  rtn = enqueue_for_remote_send(msg, endpoint_type, endpoint);
243  if (rtn < 0) {
244  log_error("Error enqueueing data %p towards remote msu %d",
245  msg->data, endpoint->id);
246  free(msg);
247  return -1;
248  }
249  log(LOG_MSU_ENQUEUES, "Enqueued data %p toward remote msu %d",
250  msg->data, endpoint->id);
251  return 0;
252  default:
253  log_error("Unknown MSU locality: %d", endpoint->locality);
254  return -1;
255  }
256 }
257 
258 int init_call_msu_endpoint(struct local_msu *sender, struct msu_endpoint *endpoint,
259  struct msu_type *endpoint_type,
260  struct msu_msg_key *key, size_t data_size, void *data) {
261  struct msu_msg_hdr hdr;
262  if (init_msu_msg_hdr(&hdr, key) != 0) {
263  log_error("Error initializing message header");
264  return -1;
265  }
266  SET_PROFILING(hdr);
268  return call_msu_endpoint(sender, endpoint, endpoint_type, &hdr, data_size, data);
269 }
270 
271 int call_msu_error(struct local_msu *sender, struct msu_endpoint *endpoint,
272  struct msu_type *endpoint_type,
273  struct msu_msg_hdr *hdr, size_t data_size, void *data) {
274  hdr->error_flag = -1;
275  return call_msu_endpoint(sender, endpoint, endpoint_type, hdr, data_size, data);
276 }
277 
278 
279 
int id
A unque identifier for the endpoint (msu ID)
Definition: routing.h:34
unsigned int runtime_id
The ID for the runtime on which the endpoint resides.
Definition: routing.h:40
Header for messages passed to MSUs.
Definition: msu_message.h:85
static int enqueue_for_remote_send(struct msu_msg *msg, struct msu_type *dst_type, struct msu_endpoint *dst)
Calls type-specific MSU serialization function and enqueues data into main thread queue so it can be ...
Definition: msu_calls.c:40
Messages to be delivered to dedos_threads.
int add_provinance(struct msg_provinance *prov, struct local_msu *sender)
Adds a new item to the path of MSUs taken within the mesasge provinance in the header.
Definition: msu_message.c:74
int error_flag
0 if no error has been encountered
Definition: msu_message.h:91
int enqueue_worker_timeout(struct worker_thread *thread, struct timespec *interval)
Signals that the given thread should break when waiting on its semaphore once interval time has passe...
int default_routing(struct msu_type *type, struct local_msu *sender, struct msu_msg *msg, struct msu_endpoint *output)
The defualt routing strategy, using the key of the MSU message to route to a pre-defined endpoint...
int init_call_msu_type(struct local_msu *sender, struct msu_type *dst_type, struct msu_msg_key *key, size_t data_size, void *data)
Sends an MSU message to a destination of the specified type.
Definition: msu_calls.c:205
void * data
Payload.
Definition: msu_message.h:104
int call_msu_error(struct local_msu *sender, struct msu_endpoint *endpoint, struct msu_type *endpoint_type, struct msu_msg_hdr *hdr, size_t data_size, void *data)
Definition: msu_calls.c:271
Logging of status messages to the terminal.
unsigned int id
Unique ID for a local MSU.
Definition: local_msu.h:54
void destroy_thread_msg(struct thread_msg *msg)
Frees a thread message.
void destroy_msu_msg_and_contents(struct msu_msg *msg)
Frees both the message and message data.
Definition: msu_message.c:156
struct msg_queue * queue
if msu_endpoint::locality == MSU_IS_LOCAL, the queue for the msu endpoint
Definition: routing.h:42
int enqueue_for_output(struct thread_msg *msg)
Enqueues a thread_msg for delivery to the output thread.
For profiling the path of MSU messages through DeDOS.
Declares the methods available for calling an MSU from another MSU.
struct msu_msg * create_msu_msg(struct msu_msg_hdr *hdr, size_t data_size, void *data)
Creates an MSU message with the appropriate header, data size, and data.
Definition: msu_message.c:161
size_t data_size
Payload size.
Definition: msu_message.h:103
struct msg_provinance provinance
Message history.
Definition: msu_message.h:89
int call_msu_endpoint(struct local_msu *sender, struct msu_endpoint *endpoint, struct msu_type *endpoint_type, struct msu_msg_hdr *hdr, size_t data_size, void *data)
Sends an MSU message to a speicific destination, either local or remote.
Definition: msu_calls.c:217
#define log_error(fmt,...)
Definition: logging.h:101
A message to be delivered to a dedos_thread.
int schedule_local_msu_call(struct local_msu *sender, struct local_msu *dest, struct timespec *interval, struct msu_msg_hdr *hdr, size_t data_size, void *data)
Schedules a call to a local MSU to occur at some point in the future.
Definition: msu_calls.c:97
#define PROFILE_EVENT(hdr, stat_id)
If the header is marked for profiling, profiles the given event.
Definition: profiler.h:77
The structure that represents an MSU located on the local machine.
Definition: local_msu.h:38
A dedos_thread which monitors a queue for output to be sent to other runtimes or the global controlle...
enum msu_locality locality
Whether the endpoint is on the local machine or remote.
Definition: routing.h:36
int init_call_local_msu(struct local_msu *sender, struct local_msu *dest, struct msu_msg_key *key, size_t data_size, void *data)
Enqueues a new message in the queue of a local MSU.
Definition: msu_calls.c:134
struct msg_queue queue
Input queue to MSU.
Definition: local_msu.h:60
int(* route)(struct msu_type *type, struct local_msu *sender, struct msu_msg *msg, struct msu_endpoint *output)
Choose which MSU of this type the previous MSU will route to.
Definition: msu_type.h:101
struct worker_thread * thread
The worker thread on which this MSU is placed.
Definition: local_msu.h:69
int call_msu_type(struct local_msu *sender, struct msu_type *dst_type, struct msu_msg_hdr *hdr, size_t data_size, void *data)
Sends an MSU message to a destination of the given type, utilizing the sending MSU's routing function...
Definition: msu_calls.c:146
Defines a type of MSU.
Definition: msu_type.h:46
Profiling.
Definition: stat_ids.h:62
ssize_t data_size
int init_msu_msg_hdr(struct msu_msg_hdr *hdr, struct msu_msg_key *key)
Initializes and resets a message header, storing a copy of the provided key.
Definition: msu_message.c:35
struct msu_msg_hdr hdr
Definition: msu_message.h:102
unsigned int route_id
The ID for the route used to get to the endpoint.
Definition: routing.h:38
Profiling.
Definition: stat_ids.h:66
void * serialize_msu_msg(struct msu_msg *msg, struct msu_type *dst_type, size_t *size_out)
Converts an MSU message into a serializes stream of bytes.
Definition: msu_message.c:216
struct thread_msg * init_send_thread_msg(unsigned int runtime_id, unsigned int target_id, size_t data_len, void *data)
Initializes a send_to_peer message (SEND_TO_PEER)
int group_id
Used to mark a route ID when storing state.
Definition: msu_message.h:55
#define log(level, fmt,...)
Log at a custom level.
Definition: logging.h:147
A message that is to be delivered to an instance of an MSU.
Definition: msu_message.h:101
int call_local_msu(struct local_msu *sender, struct local_msu *dest, struct msu_msg_hdr *hdr, size_t data_size, void *data)
Enqueues a message in the queue of a local MSU.
Definition: msu_calls.c:77
char * name
Name for the msu type.
Definition: msu_type.h:48
Used to uniquely identify the source of a message, used in state storage as well as routing...
Definition: msu_message.h:46
An endpoint to which an msu_msg can be delivered.
Definition: routing.h:32
int enqueue_msu_msg(struct msg_queue *q, struct msu_msg *data)
Enqueues a message for immediate delivery.
Definition: msu_message.c:101
#define log_warn(fmt,...)
Definition: logging.h:113
int init_call_msu_endpoint(struct local_msu *sender, struct msu_endpoint *endpoint, struct msu_type *endpoint_type, struct msu_msg_key *key, size_t data_size, void *data)
Sends an MSU message to a specific destination, either local or remote.
Definition: msu_calls.c:258
#define SET_PROFILING(hdr)
Sets whether the header should be profiled.
Definition: profiler.h:65
int schedule_msu_msg(struct msg_queue *q, struct msu_msg *data, struct timespec *interval)
Schedules an MSU message to be delivered after interval time has passed.
Definition: msu_message.c:106
struct msu_type * type
Pointer to struct containing information shared across all instances of this type of MSU...
Definition: local_msu.h:45
Declares strategies that MSUs can use for routing to endpoints.
struct msu_msg_key key
Routing/state key.
Definition: msu_message.h:87
int schedule_local_msu_init_call(struct local_msu *sender, struct local_msu *dest, struct timespec *interval, struct msu_msg_key *key, size_t data_size, void *data)
Schedules a call to a local MSU to occur at some point in the future.
Definition: msu_calls.c:121