Line data Source code
1 : /*
2 : START OF LICENSE STUB
3 : DeDOS: Declarative Dispersion-Oriented Software
4 : Copyright (C) 2017 University of Pennsylvania, Georgetown University
5 :
6 : This program is free software: you can redistribute it and/or modify
7 : it under the terms of the GNU General Public License as published by
8 : the Free Software Foundation, either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : This program is distributed in the hope that it will be useful,
12 : but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : GNU General Public License for more details.
15 :
16 : You should have received a copy of the GNU General Public License
17 : along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : END OF LICENSE STUB
19 : */
20 : /**
21 : * @file communication.c
22 : *
23 : * General-purpose socket communication functions used from
24 : * global controller, runtime, or MSUs
25 : */
26 : #include "communication.h"
27 : #include "logging.h"
28 :
29 : #include <arpa/inet.h>
30 :
31 : /**
32 : * The maximum number of times that a call to `read()`
33 : * can be attempted for a single buffer before giving up
34 : */
35 : #define MAX_READ_ATTEMPTS 100
36 :
37 0 : int read_payload(int fd, size_t size, void *buff) {
38 0 : ssize_t rtn = 0;
39 0 : int attempts = 0;
40 : do {
41 : log(LOG_READS, "Attempting to read payload of size %d", (int)size - (int)rtn);
42 0 : int new_rtn = recv(fd, buff + rtn, size - rtn, 0);
43 0 : if (new_rtn < 0 && errno != EAGAIN) {
44 0 : log_perror("Error reading from fd: %d", fd);
45 0 : return -1;
46 : }
47 0 : if (attempts++ > MAX_READ_ATTEMPTS) {
48 0 : log_error("Attempted to read %d times", MAX_READ_ATTEMPTS);
49 0 : return -1;
50 : }
51 0 : if (new_rtn > 0)
52 0 : rtn += new_rtn;
53 0 : } while ((errno == EAGAIN || rtn > 0) && (int)rtn < (int)size);
54 0 : if (rtn == 0) {
55 : log(LOG_CONNECTIONS, "fd %d has been closed by peer", fd);
56 0 : return 1;
57 : }
58 0 : if (rtn != size) {
59 0 : log_error("Could not read full runtime payload from socket %d. "
60 : "Requested: %d, received: %d", fd, (int)size, (int)rtn);
61 0 : return -1;
62 : }
63 0 : return 0;
64 : }
65 :
66 0 : ssize_t send_to_endpoint(int fd, void *data, size_t data_len) {
67 0 : ssize_t size = 0;
68 0 : while (size < data_len) {
69 0 : ssize_t rtn = write(fd, data + size, data_len - size);
70 0 : if (rtn <= 0) {
71 0 : log_error("Error sending buffer to endpoint with socket %d", fd);
72 0 : break;
73 0 : } else if (rtn < data_len) {
74 0 : log_warn("Full buffer not sent to endpoint!");
75 : }
76 0 : size += rtn;
77 : }
78 0 : return size;
79 : }
80 :
81 0 : int init_connected_socket(struct sockaddr_in *addr) {
82 :
83 0 : int sock = socket(AF_INET, SOCK_STREAM, 0);
84 0 : if (sock < 0) {
85 0 : log_perror("Failed to create socket");
86 0 : return -1;
87 : }
88 :
89 : // ???: Why set REUSEPORT and REUSEADDR on a socket that's not binding?
90 0 : int val = 1;
91 0 : if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT,
92 : &val, sizeof(val)) < 0 ) {
93 0 : log_perror("Error setting SO_REUSEPORT");
94 : }
95 0 : val = 1;
96 0 : if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
97 : &val, sizeof(val)) < 0) {
98 0 : log_perror("Error setting SO_REUSEADDR");
99 : }
100 : char ip[INET_ADDRSTRLEN];
101 0 : inet_ntop(AF_INET, &addr->sin_addr, ip, INET_ADDRSTRLEN);
102 0 : int port = ntohs(addr->sin_port);
103 : log(LOG_CONNECTIONS, "Attepting to connect to socket at %s:%d", ip, port);
104 0 : if (connect(sock, (struct sockaddr*)addr, sizeof(*addr)) < 0) {
105 0 : log_perror("Failed to connect to socket at %s:%d", ip, port);
106 0 : close(sock);
107 0 : return -1;
108 : }
109 :
110 : log(LOG_CONNECTIONS, "Connected socket to %s:%d", ip, port);
111 0 : return sock;
112 : }
113 :
114 0 : int init_bound_socket(int port) {
115 0 : int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
116 0 : if (sock == -1) {
117 0 : log_perror("Failed to create socket");
118 0 : return -1;
119 : }
120 0 : int val = 1;
121 0 : if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val)) < 0) {
122 0 : log_perror("Error setting SO_REUSEPORT on socket");
123 0 : return -1;
124 : }
125 0 : if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) {
126 0 : log_perror("Error setting SO_REUSEADDR on socket");
127 0 : return -1;
128 : }
129 :
130 : struct sockaddr_in addr;
131 0 : addr.sin_family = AF_INET;
132 0 : addr.sin_port = htons(port);
133 0 : addr.sin_addr.s_addr = htonl(INADDR_ANY);
134 :
135 0 : if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
136 0 : log_perror("Failed to bind to port %d", port);
137 0 : return -1;
138 : }
139 0 : return sock;
140 : }
141 :
142 : /**
143 : * The backlog size for listening sockets
144 : */
145 : #define BACKLOG 1024
146 :
147 0 : int init_listening_socket(int port) {
148 0 : int sock = init_bound_socket(port);
149 0 : if (sock < 0) {
150 0 : log_error("Could not start listening socket due to failed bind");
151 0 : return -1;
152 : }
153 :
154 0 : int rtn = listen(sock, BACKLOG);
155 0 : if (rtn < 0) {
156 0 : log_perror("Error starting listening socket");
157 0 : return -1;
158 : }
159 : log(LOG_COMMUNICATION, "Started listening socket on fd %d", sock);
160 0 : return sock;
161 : }
|