30 #include <sys/epoll.h> 
   32 #include <netinet/ip.h> 
   48 #define SOCKET_HANDLER_TIMEOUT 500 
   49 #define SOCKET_HANDLER_BATCH_SIZE 1000 
   56 #define MONITOR_NUM_FDS 
   60     if (instance == NULL) {
 
   61         log_error(
"Socket monitor instance is NULL! Must instantiate before monitoring fd");
 
   73 #ifdef MONITOR_NUM_FDS 
   77             log_perror(
"Error enabling epoll for fd %d", fd);
 
   81     log(LOG_SOCKET_MSU, 
"Added fd %d to epoll", fd);
 
   93     int rtn = epoll_ctl(state->
epoll_fd, EPOLL_CTL_DEL, fd, NULL);
 
   96         log_perror(
"Error removing fd %d from epoll", fd);
 
   99 #ifdef MONITOR_NUM_FDS 
  115     log(LOG_SOCKET_HANDLER, 
"Processing connection: fd = %d", fd);
 
  117     struct socket_msg *msg = malloc(
sizeof(*msg));
 
  121     if (destination == NULL) {
 
  123         log(LOG_SOCKET_HANDLER, 
"New connection: fd = %d", fd);
 
  128         socklen_t addrlen = 
sizeof(seed.
sockaddr);
 
  131             log_perror(
"Could not getpeername() on fd %d", fd);
 
  142             log_error(
"Error enqueing to destination");
 
  153             log_error(
"Existing destination with null header for fd %d", fd);
 
  158             log_error(
"Error enqueueing to next MSU");
 
  163         log(LOG_SOCKET_HANDLER,
"Enqueued to MSU %d", destination->
id);
 
  169 #ifdef MONITOR_NUM_FDS 
  192         log_error(
"Error exiting socket handler main loop");
 
  202     int rtn = close(state->
sock_fd);
 
  216 #define DEFAULT_PORT 8080 
  217 #define DEFAULT_TARGET 501 
  218 #define INIT_SYNTAX "<port>, <target_msu_type>" 
  230     if (to_parse == NULL) {
 
  231         log_warn(
"Initializing socket handler MSU with default parameters. " 
  235         if ( (tok = strtok_r(to_parse, 
" ,", &saveptr)) == NULL) {
 
  236             log_warn(
"Couldn't get port from initialization string");
 
  239         parsed->
port = atoi(tok);
 
  241         if ( (tok = strtok_r(NULL, 
" ,", &saveptr)) == NULL) {
 
  242             log_warn(
"Couldn't get target MSU from initialization string");
 
  247         if ( (tok = strtok_r(NULL, 
" ,", &saveptr)) != NULL) {
 
  248             log_warn(
"Discarding extra tokens from socket initialzation: %s", tok);
 
  257     if (instance != NULL) {
 
  258         log_error(
"Socket MSU already instantiated. There can be only one!");
 
  266     log(LOG_SOCKET_INIT, 
"Initializing socket handler with port %d, target %d",
 
  270     self->msu_state = 
state;
 
  281         log_error(
"Couldn't initialize socket for socket handler MSU %d", 
self->
id);
 
  284     log(LOG_SOCKET_INIT, 
"Listening for traffic on port %d", init.
port);
 
  288         log_error(
"Couldn't initialize epoll_Fd for socket handler MSU %d", 
self->
id);
 
  295 #ifdef MONITOR_NUM_FDS 
  305     .
name = 
"socket_msu",
 
Interface for general-purpose socket communication. 
Collecting statistics within the runtime. 
int msu_error(struct local_msu *msu, struct msu_msg_hdr *hdr, int broadcast)
struct msu_msg_hdr hdr_mask[65536]
uint32_t local_runtime_ip()
Header for messages passed to MSUs. 
struct composite_key key
The full, arbitrary-length, unique key (used in state) 
static int socket_handler_main_loop(struct local_msu *self)
int msu_remove_fd_monitor(int fd)
Wrapper functions for epoll to manage event-based communication. 
#define SOCKET_HANDLER_TIMEOUT
static void socket_msu_destroy(struct local_msu *self)
static int socket_msu_init(struct local_msu *self, struct msu_init_data *init_data)
void * msu_state
State related to the entire instance of the MSU, not just individual items. 
int init_call_msu_type(struct local_msu *sender, struct msu_type *dst_type, struct msu_msg_key *key, size_t data_size, void *data)
Sends an MSU message to a destination of the specified type. 
#define log_perror(fmt,...)
#define SOCKET_MSU_TYPE_ID
struct msu_type * get_msu_type(int id)
Gets the MSU type with the provided ID. 
Logging of status messages to the terminal. 
unsigned int id
Unique ID for a local MSU. 
int seed_msg_key(void *seed, size_t seed_size, struct msu_msg_key *key)
Sets the key's composite-ID to the provided value, and sets the key's ID to a hash of the value...
Interactions with global dfg from individual runtime. 
int add_to_epoll(int epoll_fd, int new_fd, uint32_t events, bool oneshot)
Adds a file descriptor to an epoll instance. 
static int parse_init_payload(char *to_parse, struct sock_init *parsed)
Declares the methods available for calling an MSU from another MSU. 
int init_epoll(int socket_fd)
Initializes a new instance of an epoll file descriptor and adds a socket to it, listening for input o...
For custom MSU statistics. 
#define log_error(fmt,...)
int enable_epoll(int epoll_fd, int new_fd, uint32_t events)
Enables a file descriptor which has already been aded to an epoll instance. 
Declares the structures and functions applicable to MSUs on the local machine. 
int init_stat_item(enum stat_id stat_id, unsigned int item_id)
Initializes a stat item so that statistics can be logged to it. 
struct msu_msg_key self_key
struct msu_msg_hdr blank_hdr
static int socket_msu_receive(struct local_msu *self, struct msu_msg *msg)
The structure that represents an MSU located on the local machine. 
Data with which an MSU is initialized, and the payload for messages of type CTRL_CREATE_MSU. 
int init_call_local_msu(struct local_msu *sender, struct local_msu *dest, struct msu_msg_key *key, size_t data_size, void *data)
Enqueues a new message in the queue of a local MSU. 
static int process_connection(int fd, void *v_state)
static int set_default_target(int fd, void *v_state)
#define SOCKET_HANDLER_BATCH_SIZE
int epoll_loop(int socket_fd, int epoll_fd, int batch_size, int timeout, bool oneshot, int(*connection_handler)(int, void *), int(*accept_handler)(int, void *), void *data)
The event-based loop for epoll_wait. 
struct msu_type SOCKET_MSU_TYPE
struct local_msu * instance
struct sockaddr_in sockaddr
int init_listening_socket(int port)
Initializes a socket which is bound to and listening on the given port. 
int increment_stat(enum stat_id stat_id, unsigned int item_id, double value)
Increments the given statistic by the provided value. 
struct msu_type * default_target
int msu_monitor_fd(int fd, uint32_t events, struct local_msu *destination, struct msu_msg_hdr *hdr)
#define log(level, fmt,...)
Log at a custom level. 
A message that is to be delivered to an instance of an MSU. 
int call_local_msu(struct local_msu *sender, struct local_msu *dest, struct msu_msg_hdr *hdr, size_t data_size, void *data)
Enqueues a message in the queue of a local MSU. 
char * name
Name for the msu type. 
Used to uniquely identify the source of a message, used in state storage as well as routing...
struct local_msu * destinations[65536]
#define log_warn(fmt,...)