30 #include <sys/epoll.h>
32 #include <netinet/ip.h>
48 #define SOCKET_HANDLER_TIMEOUT 500
49 #define SOCKET_HANDLER_BATCH_SIZE 1000
56 #define MONITOR_NUM_FDS
60 if (instance == NULL) {
61 log_error(
"Socket monitor instance is NULL! Must instantiate before monitoring fd");
73 #ifdef MONITOR_NUM_FDS
77 log_perror(
"Error enabling epoll for fd %d", fd);
81 log(LOG_SOCKET_MSU,
"Added fd %d to epoll", fd);
93 int rtn = epoll_ctl(state->
epoll_fd, EPOLL_CTL_DEL, fd, NULL);
96 log_perror(
"Error removing fd %d from epoll", fd);
99 #ifdef MONITOR_NUM_FDS
115 log(LOG_SOCKET_HANDLER,
"Processing connection: fd = %d", fd);
117 struct socket_msg *msg = malloc(
sizeof(*msg));
121 if (destination == NULL) {
123 log(LOG_SOCKET_HANDLER,
"New connection: fd = %d", fd);
128 socklen_t addrlen =
sizeof(seed.
sockaddr);
131 log_perror(
"Could not getpeername() on fd %d", fd);
142 log_error(
"Error enqueing to destination");
153 log_error(
"Existing destination with null header for fd %d", fd);
158 log_error(
"Error enqueueing to next MSU");
163 log(LOG_SOCKET_HANDLER,
"Enqueued to MSU %d", destination->
id);
169 #ifdef MONITOR_NUM_FDS
192 log_error(
"Error exiting socket handler main loop");
202 int rtn = close(state->
sock_fd);
216 #define DEFAULT_PORT 8080
217 #define DEFAULT_TARGET 501
218 #define INIT_SYNTAX "<port>, <target_msu_type>"
230 if (to_parse == NULL) {
231 log_warn(
"Initializing socket handler MSU with default parameters. "
235 if ( (tok = strtok_r(to_parse,
" ,", &saveptr)) == NULL) {
236 log_warn(
"Couldn't get port from initialization string");
239 parsed->
port = atoi(tok);
241 if ( (tok = strtok_r(NULL,
" ,", &saveptr)) == NULL) {
242 log_warn(
"Couldn't get target MSU from initialization string");
247 if ( (tok = strtok_r(NULL,
" ,", &saveptr)) != NULL) {
248 log_warn(
"Discarding extra tokens from socket initialzation: %s", tok);
257 if (instance != NULL) {
258 log_error(
"Socket MSU already instantiated. There can be only one!");
266 log(LOG_SOCKET_INIT,
"Initializing socket handler with port %d, target %d",
270 self->msu_state =
state;
281 log_error(
"Couldn't initialize socket for socket handler MSU %d",
self->
id);
284 log(LOG_SOCKET_INIT,
"Listening for traffic on port %d", init.
port);
288 log_error(
"Couldn't initialize epoll_Fd for socket handler MSU %d",
self->
id);
295 #ifdef MONITOR_NUM_FDS
305 .
name =
"socket_msu",
Interface for general-purpose socket communication.
Collecting statistics within the runtime.
int msu_error(struct local_msu *msu, struct msu_msg_hdr *hdr, int broadcast)
struct msu_msg_hdr hdr_mask[65536]
uint32_t local_runtime_ip()
Header for messages passed to MSUs.
struct composite_key key
The full, arbitrary-length, unique key (used in state)
static int socket_handler_main_loop(struct local_msu *self)
int msu_remove_fd_monitor(int fd)
Wrapper functions for epoll to manage event-based communication.
#define SOCKET_HANDLER_TIMEOUT
static void socket_msu_destroy(struct local_msu *self)
static int socket_msu_init(struct local_msu *self, struct msu_init_data *init_data)
void * msu_state
State related to the entire instance of the MSU, not just individual items.
int init_call_msu_type(struct local_msu *sender, struct msu_type *dst_type, struct msu_msg_key *key, size_t data_size, void *data)
Sends an MSU message to a destination of the specified type.
#define log_perror(fmt,...)
#define SOCKET_MSU_TYPE_ID
struct msu_type * get_msu_type(int id)
Gets the MSU type with the provided ID.
Logging of status messages to the terminal.
unsigned int id
Unique ID for a local MSU.
int seed_msg_key(void *seed, size_t seed_size, struct msu_msg_key *key)
Sets the key's composite-ID to the provided value, and sets the key's ID to a hash of the value...
Interactions with global dfg from individual runtime.
int add_to_epoll(int epoll_fd, int new_fd, uint32_t events, bool oneshot)
Adds a file descriptor to an epoll instance.
static int parse_init_payload(char *to_parse, struct sock_init *parsed)
Declares the methods available for calling an MSU from another MSU.
int init_epoll(int socket_fd)
Initializes a new instance of an epoll file descriptor and adds a socket to it, listening for input o...
For custom MSU statistics.
#define log_error(fmt,...)
int enable_epoll(int epoll_fd, int new_fd, uint32_t events)
Enables a file descriptor which has already been aded to an epoll instance.
Declares the structures and functions applicable to MSUs on the local machine.
int init_stat_item(enum stat_id stat_id, unsigned int item_id)
Initializes a stat item so that statistics can be logged to it.
struct msu_msg_key self_key
struct msu_msg_hdr blank_hdr
static int socket_msu_receive(struct local_msu *self, struct msu_msg *msg)
The structure that represents an MSU located on the local machine.
Data with which an MSU is initialized, and the payload for messages of type CTRL_CREATE_MSU.
int init_call_local_msu(struct local_msu *sender, struct local_msu *dest, struct msu_msg_key *key, size_t data_size, void *data)
Enqueues a new message in the queue of a local MSU.
static int process_connection(int fd, void *v_state)
static int set_default_target(int fd, void *v_state)
#define SOCKET_HANDLER_BATCH_SIZE
int epoll_loop(int socket_fd, int epoll_fd, int batch_size, int timeout, bool oneshot, int(*connection_handler)(int, void *), int(*accept_handler)(int, void *), void *data)
The event-based loop for epoll_wait.
struct msu_type SOCKET_MSU_TYPE
struct local_msu * instance
struct sockaddr_in sockaddr
int init_listening_socket(int port)
Initializes a socket which is bound to and listening on the given port.
int increment_stat(enum stat_id stat_id, unsigned int item_id, double value)
Increments the given statistic by the provided value.
struct msu_type * default_target
int msu_monitor_fd(int fd, uint32_t events, struct local_msu *destination, struct msu_msg_hdr *hdr)
#define log(level, fmt,...)
Log at a custom level.
A message that is to be delivered to an instance of an MSU.
int call_local_msu(struct local_msu *sender, struct local_msu *dest, struct msu_msg_hdr *hdr, size_t data_size, void *data)
Enqueues a message in the queue of a local MSU.
char * name
Name for the msu type.
Used to uniquely identify the source of a message, used in state storage as well as routing...
struct local_msu * destinations[65536]
#define log_warn(fmt,...)