Line data Source code
1 : /*
2 : START OF LICENSE STUB
3 : DeDOS: Declarative Dispersion-Oriented Software
4 : Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 :
6 : This program is free software: you can redistribute it and/or modify
7 : it under the terms of the GNU General Public License as published by
8 : the Free Software Foundation, either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : This program is distributed in the hope that it will be useful,
12 : but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : GNU General Public License for more details.
15 :
16 : You should have received a copy of the GNU General Public License
17 : along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : END OF LICENSE STUB
19 : */
20 : /**
21 : * @file thread_message.c
22 : *
23 : * Messages to be delivered to dedos_threads
24 : */
25 :
26 : #include "thread_message.h"
27 : #include "logging.h"
28 :
29 1 : int enqueue_thread_msg(struct thread_msg *thread_msg, struct msg_queue *queue) {
30 1 : struct dedos_msg *msg = malloc(sizeof(*msg));
31 1 : if (msg == NULL) {
32 0 : log_error("Error allocating dedos_msg for thread_msg");
33 0 : return -1;
34 : }
35 1 : msg->data_size = sizeof(*thread_msg);
36 1 : msg->type = THREAD_MSG;
37 1 : msg->data = thread_msg;
38 1 : int rtn = enqueue_msg(queue, msg);
39 1 : if (rtn < 0) {
40 0 : log_error("Error enqueueing message on thread message queue");
41 0 : return -1;
42 : }
43 1 : log(LOG_THREAD_MESSAGES, "Enqueued thread message %p on queue %p",
44 : thread_msg, queue);
45 1 : return 0;
46 : }
47 :
48 1 : struct thread_msg *dequeue_thread_msg(struct msg_queue *queue) {
49 1 : struct dedos_msg *msg = dequeue_msg(queue);
50 1 : if (msg == NULL) {
51 0 : return NULL;
52 : }
53 1 : if (msg->data_size != sizeof(struct thread_msg)) {
54 0 : log_error("Attempted to dequeue non-thread msg from queue");
55 0 : return NULL;
56 : }
57 :
58 1 : struct thread_msg *thread_msg = msg->data;
59 1 : log(LOG_THREAD_MESSAGES, "Dequeued thread message %p from queue %p",
60 : thread_msg, queue);
61 1 : free(msg);
62 1 : return thread_msg;
63 : }
64 :
65 :
66 1 : struct thread_msg *construct_thread_msg(enum thread_msg_type type,
67 : ssize_t data_size, void *data) {
68 1 : struct thread_msg *msg = calloc(1, sizeof(*msg));
69 1 : if (msg == NULL) {
70 0 : log_error("Error allocating thread message");
71 0 : return NULL;
72 : }
73 1 : msg->type = type;
74 1 : msg->data_size = data_size;
75 1 : msg->data = data;
76 1 : log(LOG_THREAD_MESSAGES, "Constructed thread message %p of size %d",
77 : msg, (int)data_size);
78 1 : return msg;
79 : }
80 :
81 0 : void destroy_thread_msg(struct thread_msg *msg) {
82 0 : log(LOG_THREAD_MESSAGES, "Freeing thread message %p",
83 : msg);
84 0 : free(msg);
85 0 : }
86 :
87 1 : struct thread_msg *init_send_thread_msg(unsigned int runtime_id,
88 : unsigned int target_id,
89 : size_t data_len,
90 : void *data) {
91 1 : struct send_to_peer_msg *msg = malloc(sizeof(*msg));
92 1 : if (msg == NULL) {
93 0 : log_error("Error allocating send_to_runtime_thread_msg");
94 0 : return NULL;
95 : }
96 1 : msg->hdr.type = RT_FWD_TO_MSU;
97 1 : msg->hdr.target = target_id;
98 1 : msg->hdr.payload_size = data_len;
99 1 : msg->runtime_id = runtime_id;
100 1 : msg->data = data;
101 :
102 1 : struct thread_msg *thread_msg = construct_thread_msg(SEND_TO_PEER,
103 : sizeof(*msg), msg);
104 1 : if (thread_msg == NULL) {
105 0 : log_error("Error creating thread message for send-to-runtime message");
106 0 : return NULL;
107 : }
108 1 : return thread_msg;
109 : }
110 :
|