Split callback logic into several functions.feature/data-structs
@@ -173,7 +173,7 @@ add_executable( | |||||
src/packages/server/log/IZ_log.h | src/packages/server/log/IZ_log.h | ||||
src/packages/server/log/IZ_log.c | src/packages/server/log/IZ_log.c | ||||
src/packages/server/main.c | src/packages/server/main.c | ||||
src/packages/server/protocol_lws_minimal.c src/packages/server/IZ_app.c src/packages/server/IZ_app.h src/packages/server/network/IZ_wsserver.c src/packages/server/network/IZ_wsserver.h) | |||||
src/packages/server/network/IZ_wsserver.c src/packages/server/IZ_app.c src/packages/server/IZ_app.h src/packages/server/network/IZ_wsserver.h) | |||||
target_link_libraries( | target_link_libraries( | ||||
server | server | ||||
@@ -28,7 +28,7 @@ IZ_ProcedureResult IZ_AppWSClientCallback( | |||||
return 0; | return 0; | ||||
} | } | ||||
IZ_ProcedureResult IZ_AppInitialize(IZ_App* app) { | |||||
IZ_ProcedureResult IZ_AppInitialize(IZ_App* app, u8 argc, const char* argv[]) { | |||||
memset(app, 0, sizeof(IZ_App)); | memset(app, 0, sizeof(IZ_App)); | ||||
u32 flags = ( | u32 flags = ( | ||||
SDL_INIT_VIDEO | SDL_INIT_VIDEO | ||||
@@ -37,13 +37,19 @@ IZ_ProcedureResult IZ_AppInitialize(IZ_App* app) { | |||||
); | ); | ||||
if (SDL_Init(flags) < 0) { | if (SDL_Init(flags) < 0) { | ||||
// TODO fix logging | |||||
SDL_LogError(SDL_LOG_CATEGORY_ERROR, "SDL could not initialize! SDL_Error: %s\n", SDL_GetError()); | SDL_LogError(SDL_LOG_CATEGORY_ERROR, "SDL could not initialize! SDL_Error: %s\n", SDL_GetError()); | ||||
return IZ_APP_RUN_SDL_INIT_ERROR; | return IZ_APP_RUN_SDL_INIT_ERROR; | ||||
} | } | ||||
const char* cmdline_buffer; | |||||
char config_path[128]; | char config_path[128]; | ||||
IZ_ConfigGetPath(config_path, 128); | |||||
// TODO abstract command line args parsing | |||||
if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-c"))) { | |||||
memcpy_s(config_path, 128, cmdline_buffer, 128); | |||||
} else { | |||||
IZ_ConfigGetDefaultPath(config_path, 128); | |||||
} | |||||
if (IZ_VideoInitialize(&app->video_state, config_path)) { | if (IZ_VideoInitialize(&app->video_state, config_path)) { | ||||
return IZ_APP_RUN_VIDEO_INIT_ERROR; | return IZ_APP_RUN_VIDEO_INIT_ERROR; | ||||
} | } | ||||
@@ -131,13 +137,7 @@ IZ_ProcedureResult IZ_AppRunNetworkingThread(void* ptr) { | |||||
} | } | ||||
IZ_ProcedureResult IZ_AppRun(IZ_App* app, u8 argc, const char* argv[]) { | IZ_ProcedureResult IZ_AppRun(IZ_App* app, u8 argc, const char* argv[]) { | ||||
printf_s("Args (%u):\n", argc); | |||||
u8 arg_index; | |||||
for (arg_index = 0; arg_index < argc; arg_index += 1) { | |||||
printf_s(" %s\n", argv[arg_index]); | |||||
} | |||||
IZ_ProcedureResult init_result = IZ_AppInitialize(app); | |||||
IZ_ProcedureResult init_result = IZ_AppInitialize(app, argc, argv); | |||||
if (init_result) { | if (init_result) { | ||||
return init_result; | return init_result; | ||||
} | } | ||||
@@ -2,11 +2,7 @@ | |||||
#define IZ_APP_H | #define IZ_APP_H | ||||
#include <SDL.h> | #include <SDL.h> | ||||
#ifdef __WIN32__ | |||||
#include <getopt.h> | |||||
#else | |||||
#include <unistd.h> | |||||
#endif | |||||
#include <libwebsockets.h> | |||||
#include "input/IZ_input.h" | #include "input/IZ_input.h" | ||||
#include "output/IZ_video.h" | #include "output/IZ_video.h" | ||||
#include "memory/IZ_pool.h" | #include "memory/IZ_pool.h" | ||||
@@ -1,6 +1,6 @@ | |||||
#include "IZ_config.h" | #include "IZ_config.h" | ||||
void IZ_ConfigGetPath(char* config_path, size_t string_size) { | |||||
void IZ_ConfigGetDefaultPath(const char* config_path, size_t string_size) { | |||||
//const char* config_path_dir = SDL_GetPrefPath("Modal Studios", APP_NAME); | //const char* config_path_dir = SDL_GetPrefPath("Modal Studios", APP_NAME); | ||||
const char* config_path_dir = SDL_GetBasePath(); | const char* config_path_dir = SDL_GetBasePath(); | ||||
memcpy_s(config_path, string_size, config_path_dir, 128); | memcpy_s(config_path, string_size, config_path_dir, 128); | ||||
@@ -4,6 +4,6 @@ | |||||
#include <SDL_filesystem.h> | #include <SDL_filesystem.h> | ||||
#include <string.h> | #include <string.h> | ||||
void IZ_ConfigGetPath(char*, size_t); | |||||
void IZ_ConfigGetDefaultPath(const char*, size_t); | |||||
#endif | #endif |
@@ -5,7 +5,7 @@ | |||||
#include "../IZ_common.h" | #include "../IZ_common.h" | ||||
#include "../data/IZ_list.h" | #include "../data/IZ_list.h" | ||||
#define POOL_MAX_SIZE (1l << 23) // 16MB | |||||
#define POOL_MAX_SIZE (1llu << 23) // 16MB | |||||
struct IZ_Pool; | struct IZ_Pool; | ||||
@@ -1,24 +1,75 @@ | |||||
#include "IZ_app.h" | #include "IZ_app.h" | ||||
static struct lws_protocols protocols[] = { | |||||
{ | |||||
.name = NETWORK_PROTOCOL, | |||||
.callback = IZ_WSServerCallback, | |||||
.per_session_data_size = sizeof(IZ_WSServerSessionData), | |||||
.rx_buffer_size = 0, | |||||
.id = 0, | |||||
.user = NULL, | |||||
.tx_packet_size = 0, | |||||
}, | |||||
{ | |||||
.name = "http", | |||||
.callback = lws_callback_http_dummy, | |||||
.per_session_data_size = 0, | |||||
.rx_buffer_size = 0, | |||||
.id = 0, | |||||
.user = NULL, | |||||
.tx_packet_size = 0, | |||||
}, | |||||
LWS_PROTOCOL_LIST_TERM, | |||||
}; | |||||
static const struct lws_http_mount mount = { | |||||
.mount_next = NULL, /* linked-list "next" */ | |||||
.mountpoint = "/", /* mountpoint URL */ | |||||
.origin = "./mount-origin", /* serve from dir */ | |||||
.def = "index.html", /* default filename */ | |||||
.protocol = NULL, | |||||
.cgienv = NULL, | |||||
.extra_mimetypes = NULL, | |||||
.interpret = NULL, | |||||
.cgi_timeout = 0, | |||||
.cache_max_age = 0, | |||||
.auth_mask = 0, | |||||
.cache_reusable = 0, | |||||
.cache_revalidate = 0, | |||||
.cache_intermediaries = 0, | |||||
.origin_protocol = LWSMPRO_FILE, /* files in a dir */ | |||||
.mountpoint_len = 1, /* char count */ | |||||
.basic_auth_login_file = NULL, | |||||
}; | |||||
void IZ_AppHandleSignal(i32 _signal) { | void IZ_AppHandleSignal(i32 _signal) { | ||||
interrupted = true; | interrupted = true; | ||||
} | } | ||||
void IZ_AppCreateContext(IZ_App* app) { | |||||
IZ_ProcedureResult IZ_AppCreateContext(IZ_App *app) { | |||||
struct lws_context_creation_info info; | struct lws_context_creation_info info; | ||||
memset(&info, 0, sizeof info); | memset(&info, 0, sizeof info); | ||||
info.port = app->config.port; | info.port = app->config.port; | ||||
info.mounts = &mount; | |||||
info.protocols = protocols; | |||||
// TODO initialize protocols | // TODO initialize protocols | ||||
info.options = ( | info.options = ( | ||||
LWS_SERVER_OPTION_VALIDATE_UTF8 | LWS_SERVER_OPTION_VALIDATE_UTF8 | ||||
| LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE | | LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE | ||||
); | ); | ||||
app->context = lws_create_context(&info); | |||||
if (!app->context) { | |||||
return 1; | |||||
} | |||||
return 0; | |||||
} | } | ||||
void IZ_AppLoadConfig(IZ_App* app, u8 argc, const char** argv) { | |||||
void IZ_AppLoadConfig(IZ_App *app, u8 argc, const char **argv) { | |||||
memcpy_s(app, sizeof(IZ_App), &IZ_APP_DEFAULT_STATE, sizeof(IZ_App)); | memcpy_s(app, sizeof(IZ_App), &IZ_APP_DEFAULT_STATE, sizeof(IZ_App)); | ||||
const char* cmdline_buffer; | |||||
const char *cmdline_buffer; | |||||
if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-d"))) { | if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-d"))) { | ||||
app->config.log_level = atoi(cmdline_buffer); | app->config.log_level = atoi(cmdline_buffer); | ||||
} | } | ||||
@@ -37,21 +88,29 @@ void IZ_AppLoadConfig(IZ_App* app, u8 argc, const char** argv) { | |||||
} | } | ||||
} | } | ||||
IZ_ProcedureResult IZ_AppInitialize(IZ_App* app, u8 argc, const char** argv) { | |||||
IZ_ProcedureResult IZ_AppInitialize(IZ_App *app, u8 argc, const char **argv) { | |||||
interrupted = false; | interrupted = false; | ||||
signal(SIGINT, IZ_AppHandleSignal); | signal(SIGINT, IZ_AppHandleSignal); | ||||
IZ_AppLoadConfig(app, argc, argv); | IZ_AppLoadConfig(app, argc, argv); | ||||
IZ_LogInterceptWSMessages(app->config.log_level); | IZ_LogInterceptWSMessages(app->config.log_level); | ||||
IZ_AppCreateContext(app); | |||||
if (IZ_AppCreateContext(app)) { | |||||
return 1; | |||||
} | |||||
return 0; | return 0; | ||||
} | } | ||||
IZ_ProcedureResult IZ_AppRun(IZ_App* app, u8 argc, const char** argv) { | |||||
IZ_ProcedureResult IZ_AppRun(IZ_App *app, u8 argc, const char **argv) { | |||||
if (IZ_AppInitialize(app, argc, argv)) { | if (IZ_AppInitialize(app, argc, argv)) { | ||||
return 1; | return 1; | ||||
} | } | ||||
i32 n = 0; | |||||
while (n >= 0 && !interrupted) { | |||||
n = lws_service(app->context, 0); | |||||
} | |||||
lws_context_destroy(app->context); | |||||
return 0; | return 0; | ||||
} | } |
@@ -3,9 +3,9 @@ | |||||
#include <signal.h> | #include <signal.h> | ||||
#include <stdbool.h> | #include <stdbool.h> | ||||
#include <libwebsockets.h> | |||||
#include "IZ_common.h" | |||||
#include "network/IZ_wsserver.h" | |||||
#include "log/IZ_log.h" | #include "log/IZ_log.h" | ||||
#include "IZ_common.h" | |||||
static bool interrupted; | static bool interrupted; | ||||
@@ -1,84 +1,6 @@ | |||||
#include <libwebsockets.h> | |||||
#include <string.h> | |||||
#include <signal.h> | |||||
#include "IZ_app.h" | |||||
#include "IZ_common.h" | |||||
#include "log/IZ_log.h" | |||||
#define LWS_PLUGIN_STATIC | |||||
#include "protocol_lws_minimal.c" | |||||
static struct lws_protocols protocols[] = { | |||||
//{ "http", lws_callback_http_dummy, 0, 0, 0, NULL, 0 }, | |||||
LWS_PLUGIN_PROTOCOL_MINIMAL, | |||||
LWS_PROTOCOL_LIST_TERM | |||||
}; | |||||
static int interrupted; | |||||
static const struct lws_http_mount mount = { | |||||
/* .mount_next */ NULL, /* linked-list "next" */ | |||||
/* .mountpoint */ "/", /* mountpoint URL */ | |||||
/* .origin */ "./mount-origin", /* serve from dir */ | |||||
/* .def */ "index.html", /* default filename */ | |||||
/* .protocol */ NULL, | |||||
/* .cgienv */ NULL, | |||||
/* .extra_mimetypes */ NULL, | |||||
/* .interpret */ NULL, | |||||
/* .cgi_timeout */ 0, | |||||
/* .cache_max_age */ 0, | |||||
/* .auth_mask */ 0, | |||||
/* .cache_reusable */ 0, | |||||
/* .cache_revalidate */ 0, | |||||
/* .cache_intermediaries */ 0, | |||||
/* .origin_protocol */ LWSMPRO_FILE, /* files in a dir */ | |||||
/* .mountpoint_len */ 1, /* char count */ | |||||
/* .basic_auth_login_file */ NULL, | |||||
}; | |||||
void sigint_handler(int sig) | |||||
{ | |||||
interrupted = 1; | |||||
} | |||||
int main(int argc, const char **argv) | |||||
{ | |||||
struct lws_context_creation_info info; | |||||
struct lws_context *context; | |||||
const char *p; | |||||
int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE | |||||
/* for LLL_ verbosity above NOTICE to be built into lws, | |||||
* lws must have been configured and built with | |||||
* -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */ | |||||
/* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */ | |||||
/* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */ | |||||
/* | LLL_DEBUG */; | |||||
signal(SIGINT, sigint_handler); | |||||
if ((p = lws_cmdline_option(argc, argv, "-d"))) | |||||
logs = atoi(p); | |||||
IZ_LogInterceptWSMessages(logs); | |||||
lwsl_user("LWS minimal ws server (lws_ring) | visit http://localhost:7681\n"); | |||||
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ | |||||
info.port = 42069; | |||||
info.mounts = &mount; | |||||
info.protocols = protocols; | |||||
info.options = | |||||
LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE; | |||||
context = lws_create_context(&info); | |||||
if (!context) { | |||||
lwsl_err("lws init failed\n"); | |||||
return 1; | |||||
} | |||||
while (n >= 0 && !interrupted) | |||||
n = lws_service(context, 0); | |||||
lws_context_destroy(context); | |||||
return 0; | |||||
IZ_ProcedureResult main(i32 argc, const char *argv[]) { | |||||
IZ_App app; | |||||
return IZ_AppRun(&app, argc, argv); | |||||
} | } |
@@ -1 +1,285 @@ | |||||
#include "IZ_wsserver.h" | #include "IZ_wsserver.h" | ||||
/* one of these created for each message */ | |||||
static void IZ_ProtocolCullLaggingClients(IZ_WSServerVHostData *vhd) { | |||||
u32 oldest_tail = lws_ring_get_oldest_tail(vhd->ring); | |||||
IZ_WSServerSessionData *old_pss = NULL; | |||||
i32 most = 0; | |||||
i32 before = (i32) lws_ring_get_count_waiting_elements(vhd->ring, &oldest_tail); | |||||
i32 m; | |||||
/* | |||||
* At least one guy with the oldest tail has lagged too far, filling | |||||
* the ringbuffer with stuff waiting for them, while new stuff is | |||||
* coming in, and they must close, freeing up ringbuffer entries. | |||||
*/ | |||||
lws_start_foreach_llp_safe( | |||||
IZ_WSServerSessionData**, | |||||
ppss, | |||||
vhd->pss_list, | |||||
pss_list | |||||
) { | |||||
if ((*ppss)->tail == oldest_tail) { | |||||
old_pss = *ppss; | |||||
lwsl_user("Killing lagging client %p\n", (*ppss)->wsi); | |||||
lws_set_timeout((*ppss)->wsi, PENDING_TIMEOUT_LAGGING, | |||||
/* | |||||
* we may kill the wsi we came in on, | |||||
* so the actual close is deferred | |||||
*/ | |||||
LWS_TO_KILL_ASYNC); | |||||
/* | |||||
* We might try to write something before we get a | |||||
* chance to close. But this pss is now detached | |||||
* from the ring buffer. Mark this pss as culled so we | |||||
* don't try to do anything more with it. | |||||
*/ | |||||
(*ppss)->culled = true; | |||||
/* | |||||
* Because we can't kill it synchronously, but we | |||||
* know it's closing momentarily and don't want its | |||||
* participation any more, remove its pss from the | |||||
* vhd pss list early. (This is safe to repeat | |||||
* uselessly later in the close flow). | |||||
* | |||||
* Notice this changes *ppss! | |||||
*/ | |||||
lws_ll_fwd_remove(IZ_WSServerSessionData, pss_list, (*ppss), vhd->pss_list); | |||||
/* use the changed *ppss so we won't skip anything */ | |||||
continue; | |||||
} | |||||
/* | |||||
* so this guy is a survivor of the cull. Let's track | |||||
* what is the largest number of pending ring elements | |||||
* for any survivor. | |||||
*/ | |||||
m = (i32) lws_ring_get_count_waiting_elements(vhd->ring, &((*ppss)->tail)); | |||||
if (m > most) { | |||||
most = m; | |||||
} | |||||
} lws_end_foreach_llp_safe(ppss); | |||||
/* it would mean we lost track of oldest... but Coverity insists */ | |||||
if (!old_pss) { | |||||
return; | |||||
} | |||||
/* | |||||
* Let's recover (ie, free up) all the ring slots between the | |||||
* original oldest's last one and the "worst" survivor. | |||||
*/ | |||||
lws_ring_consume_and_update_oldest_tail( | |||||
vhd->ring, | |||||
IZ_WSServerSessionData, | |||||
&old_pss->tail, | |||||
(size_t) (before - most), | |||||
vhd->pss_list, | |||||
tail, | |||||
pss_list | |||||
); | |||||
lwsl_user("%s: shrunk ring from %d to %d\n", __func__, before, most); | |||||
} | |||||
/* destroys the message when everyone has had a copy of it */ | |||||
void IZ_ProtocolDestroyMessage(void *_msg) { | |||||
IZ_WSServerMessage *msg = _msg; | |||||
free(msg->payload); | |||||
msg->payload = NULL; | |||||
msg->len = 0; | |||||
} | |||||
i32 IZ_WSServerInitialize(struct lws* wsi) { | |||||
IZ_WSServerVHostData* vhd_instance = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( | |||||
lws_get_vhost(wsi), | |||||
lws_get_protocol(wsi) | |||||
); | |||||
IZ_WSServerVHostData** vhd = &vhd_instance; | |||||
*vhd = lws_protocol_vh_priv_zalloc( | |||||
lws_get_vhost(wsi), | |||||
lws_get_protocol(wsi), | |||||
sizeof(IZ_WSServerVHostData) | |||||
); | |||||
(*vhd)->context = lws_get_context(wsi); | |||||
(*vhd)->protocol = lws_get_protocol(wsi); | |||||
(*vhd)->vhost = lws_get_vhost(wsi); | |||||
(*vhd)->ring = lws_ring_create( | |||||
sizeof(IZ_WSServerMessage), | |||||
RING_COUNT, | |||||
IZ_ProtocolDestroyMessage | |||||
); | |||||
if (!(*vhd)->ring) { | |||||
return 1; | |||||
} | |||||
return 0; | |||||
} | |||||
void IZ_WSServerTeardown(struct lws* wsi) { | |||||
IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( | |||||
lws_get_vhost(wsi), | |||||
lws_get_protocol(wsi) | |||||
); | |||||
lws_ring_destroy(vhd->ring); | |||||
} | |||||
void IZ_WSServerOnOpen(struct lws* wsi, IZ_WSServerSessionData* pss) { | |||||
IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( | |||||
lws_get_vhost(wsi), | |||||
lws_get_protocol(wsi) | |||||
); | |||||
/* add ourselves to the list of live pss held in the vhd */ | |||||
lwsl_user("LWS_CALLBACK_ESTABLISHED: wsi %p\n", wsi); | |||||
lws_ll_fwd_insert(pss, pss_list, vhd->pss_list); | |||||
pss->tail = lws_ring_get_oldest_tail(vhd->ring); | |||||
pss->wsi = wsi; | |||||
} | |||||
void IZ_WSServerOnClose(struct lws* wsi, IZ_WSServerSessionData* pss) { | |||||
IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( | |||||
lws_get_vhost(wsi), | |||||
lws_get_protocol(wsi) | |||||
); | |||||
lwsl_user("LWS_CALLBACK_CLOSED: wsi %p\n", wsi); | |||||
/* remove our closing pss from the list of live pss */ | |||||
lws_ll_fwd_remove(IZ_WSServerSessionData, pss_list, pss, vhd->pss_list); | |||||
} | |||||
IZ_ProcedureResult IZ_WSServerOnSend(struct lws* wsi, IZ_WSServerSessionData* pss) { | |||||
IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( | |||||
lws_get_vhost(wsi), | |||||
lws_get_protocol(wsi) | |||||
); | |||||
if (pss->culled) { | |||||
return 0; | |||||
} | |||||
const IZ_WSServerMessage *pmsg = lws_ring_get_element(vhd->ring, &pss->tail); | |||||
if (!pmsg) { | |||||
return 0; | |||||
} | |||||
/* notice we allowed for LWS_PRE in the payload already */ | |||||
i32 m = lws_write( | |||||
wsi, | |||||
((unsigned char *) pmsg->payload) + LWS_PRE, | |||||
pmsg->len, | |||||
pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT | |||||
); | |||||
if (m < (i32) pmsg->len) { | |||||
lwsl_err("ERROR %d writing to ws socket\n", m); | |||||
return -1; | |||||
} | |||||
lws_ring_consume_and_update_oldest_tail( | |||||
vhd->ring, /* lws_ring object */ | |||||
IZ_WSServerSessionData, /* type of objects with tails */ | |||||
&pss->tail, /* tail of guy doing the consuming */ | |||||
1, /* number of payload objects being consumed */ | |||||
vhd->pss_list, /* head of list of objects with tails */ | |||||
tail, /* member name of tail in objects with tails */ | |||||
pss_list /* member name of next object in objects with tails */ | |||||
); | |||||
/* more to do for us? */ | |||||
if (lws_ring_get_element(vhd->ring, &pss->tail)) { | |||||
/* come back as soon as we can write more */ | |||||
lws_callback_on_writable(pss->wsi); | |||||
} | |||||
return 0; | |||||
} | |||||
void IZ_WSServerOnReceive(struct lws* wsi, void* in, size_t len) { | |||||
IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( | |||||
lws_get_vhost(wsi), | |||||
lws_get_protocol(wsi) | |||||
); | |||||
IZ_WSServerMessage amsg; | |||||
i32 n = (i32) lws_ring_get_count_free_elements(vhd->ring); | |||||
if (!n) { | |||||
/* forcibly make space */ | |||||
IZ_ProtocolCullLaggingClients(vhd); | |||||
n = (i32) lws_ring_get_count_free_elements(vhd->ring); | |||||
} | |||||
if (!n) { | |||||
return; | |||||
} | |||||
lwsl_user("LWS_CALLBACK_RECEIVE: free space %d\n", n); | |||||
amsg.len = len; | |||||
/* notice we over-allocate by LWS_PRE... */ | |||||
amsg.payload = malloc(LWS_PRE + len); | |||||
amsg.binary = (u8) lws_frame_is_binary(wsi); | |||||
if (!amsg.payload) { | |||||
lwsl_user("OOM: dropping\n"); | |||||
return; | |||||
} | |||||
/* ...and we copy the payload in at +LWS_PRE */ | |||||
memcpy((char *) amsg.payload + LWS_PRE, in, len); | |||||
if (!lws_ring_insert(vhd->ring, &amsg, 1)) { | |||||
IZ_ProtocolDestroyMessage(&amsg); | |||||
lwsl_user("dropping!\n"); | |||||
return; | |||||
} | |||||
/* | |||||
* let everybody know we want to write something on them | |||||
* as soon as they are ready | |||||
*/ | |||||
lws_start_foreach_llp(IZ_WSServerSessionData**, ppss, vhd->pss_list) { | |||||
lws_callback_on_writable((*ppss)->wsi); | |||||
} | |||||
lws_end_foreach_llp(ppss, pss_list); | |||||
} | |||||
i32 IZ_WSServerCallback( | |||||
struct lws* wsi, | |||||
enum lws_callback_reasons reason, | |||||
void* user, | |||||
void* in, | |||||
size_t len | |||||
) { | |||||
switch (reason) { | |||||
case LWS_CALLBACK_PROTOCOL_INIT: | |||||
return IZ_WSServerInitialize(wsi); | |||||
case LWS_CALLBACK_PROTOCOL_DESTROY: | |||||
IZ_WSServerTeardown(wsi); | |||||
break; | |||||
case LWS_CALLBACK_ESTABLISHED: | |||||
IZ_WSServerOnOpen(wsi, user); | |||||
break; | |||||
case LWS_CALLBACK_CLOSED: | |||||
IZ_WSServerOnClose(wsi, user); | |||||
break; | |||||
case LWS_CALLBACK_SERVER_WRITEABLE: | |||||
return IZ_WSServerOnSend(wsi, user); | |||||
case LWS_CALLBACK_RECEIVE: | |||||
IZ_WSServerOnReceive(wsi, in, len); | |||||
break; | |||||
default: | |||||
break; | |||||
} | |||||
return 0; | |||||
} |
@@ -1,4 +1,45 @@ | |||||
#ifndef IZ_WSSERVER_H | #ifndef IZ_WSSERVER_H | ||||
#define IZ_WSSERVER_H | #define IZ_WSSERVER_H | ||||
#include "libwebsockets.h" | |||||
#include <string.h> | |||||
#include "../IZ_common.h" | |||||
#define RING_COUNT 32 | |||||
typedef struct { | |||||
void *payload; /* is malloc'd */ | |||||
size_t len; | |||||
u8 binary: 1; | |||||
} IZ_WSServerMessage; | |||||
/* one of these is created for each client connecting to us */ | |||||
typedef struct IZ_WSServerSessionData { | |||||
struct IZ_WSServerSessionData *pss_list; | |||||
struct lws *wsi; | |||||
u32 tail; | |||||
u8 culled: 1; | |||||
} IZ_WSServerSessionData; | |||||
/* one of these is created for each vhost our protocol is used with */ | |||||
typedef struct { | |||||
struct lws_context *context; | |||||
struct lws_vhost *vhost; | |||||
const struct lws_protocols *protocol; | |||||
IZ_WSServerSessionData *pss_list; /* linked-list of live pss*/ | |||||
struct lws_ring *ring; /* ringbuffer holding unsent messages */ | |||||
} IZ_WSServerVHostData; | |||||
i32 IZ_WSServerCallback( | |||||
struct lws*, | |||||
enum lws_callback_reasons, | |||||
void*, | |||||
void*, | |||||
size_t | |||||
); | |||||
#endif | #endif |
@@ -1,268 +0,0 @@ | |||||
#if !defined (LWS_PLUGIN_STATIC) | |||||
#define LWS_DLL | |||||
#define LWS_INTERNAL | |||||
#include <libwebsockets.h> | |||||
#endif | |||||
#include <string.h> | |||||
#include "IZ_common.h" | |||||
#define RING_COUNT 32 | |||||
/* one of these created for each message */ | |||||
struct msg { | |||||
void *payload; /* is malloc'd */ | |||||
size_t len; | |||||
}; | |||||
/* one of these is created for each client connecting to us */ | |||||
struct per_session_data__minimal { | |||||
struct per_session_data__minimal *pss_list; | |||||
struct lws *wsi; | |||||
uint32_t tail; | |||||
unsigned int culled:1; | |||||
}; | |||||
/* one of these is created for each vhost our protocol is used with */ | |||||
struct per_vhost_data__minimal { | |||||
struct lws_context *context; | |||||
struct lws_vhost *vhost; | |||||
const struct lws_protocols *protocol; | |||||
struct per_session_data__minimal *pss_list; /* linked-list of live pss*/ | |||||
struct lws_ring *ring; /* ringbuffer holding unsent messages */ | |||||
}; | |||||
static void | |||||
cull_lagging_clients(struct per_vhost_data__minimal *vhd) | |||||
{ | |||||
uint32_t oldest_tail = lws_ring_get_oldest_tail(vhd->ring); | |||||
struct per_session_data__minimal *old_pss = NULL; | |||||
int most = 0, before = (int)lws_ring_get_count_waiting_elements(vhd->ring, | |||||
&oldest_tail), m; | |||||
/* | |||||
* At least one guy with the oldest tail has lagged too far, filling | |||||
* the ringbuffer with stuff waiting for them, while new stuff is | |||||
* coming in, and they must close, freeing up ringbuffer entries. | |||||
*/ | |||||
lws_start_foreach_llp_safe(struct per_session_data__minimal **, | |||||
ppss, vhd->pss_list, pss_list) { | |||||
if ((*ppss)->tail == oldest_tail) { | |||||
old_pss = *ppss; | |||||
lwsl_user("Killing lagging client %p\n", (*ppss)->wsi); | |||||
lws_set_timeout((*ppss)->wsi, PENDING_TIMEOUT_LAGGING, | |||||
/* | |||||
* we may kill the wsi we came in on, | |||||
* so the actual close is deferred | |||||
*/ | |||||
LWS_TO_KILL_ASYNC); | |||||
/* | |||||
* We might try to write something before we get a | |||||
* chance to close. But this pss is now detached | |||||
* from the ring buffer. Mark this pss as culled so we | |||||
* don't try to do anything more with it. | |||||
*/ | |||||
(*ppss)->culled = 1; | |||||
/* | |||||
* Because we can't kill it synchronously, but we | |||||
* know it's closing momentarily and don't want its | |||||
* participation any more, remove its pss from the | |||||
* vhd pss list early. (This is safe to repeat | |||||
* uselessly later in the close flow). | |||||
* | |||||
* Notice this changes *ppss! | |||||
*/ | |||||
lws_ll_fwd_remove(struct per_session_data__minimal, | |||||
pss_list, (*ppss), vhd->pss_list); | |||||
/* use the changed *ppss so we won't skip anything */ | |||||
continue; | |||||
} else { | |||||
/* | |||||
* so this guy is a survivor of the cull. Let's track | |||||
* what is the largest number of pending ring elements | |||||
* for any survivor. | |||||
*/ | |||||
m = (int)lws_ring_get_count_waiting_elements(vhd->ring, | |||||
&((*ppss)->tail)); | |||||
if (m > most) | |||||
most = m; | |||||
} | |||||
} lws_end_foreach_llp_safe(ppss); | |||||
/* it would mean we lost track of oldest... but Coverity insists */ | |||||
if (!old_pss) | |||||
return; | |||||
/* | |||||
* Let's recover (ie, free up) all the ring slots between the | |||||
* original oldest's last one and the "worst" survivor. | |||||
*/ | |||||
lws_ring_consume_and_update_oldest_tail(vhd->ring, | |||||
struct per_session_data__minimal, &old_pss->tail, (size_t)(before - most), | |||||
vhd->pss_list, tail, pss_list); | |||||
lwsl_user("%s: shrunk ring from %d to %d\n", __func__, before, most); | |||||
} | |||||
/* destroys the message when everyone has had a copy of it */ | |||||
static void | |||||
__minimal_destroy_message(void *_msg) | |||||
{ | |||||
struct msg *msg = _msg; | |||||
free(msg->payload); | |||||
msg->payload = NULL; | |||||
msg->len = 0; | |||||
} | |||||
static int | |||||
callback_minimal(struct lws *wsi, enum lws_callback_reasons reason, | |||||
void *user, void *in, size_t len) | |||||
{ | |||||
struct per_session_data__minimal *pss = | |||||
(struct per_session_data__minimal *)user; | |||||
struct per_vhost_data__minimal *vhd = | |||||
(struct per_vhost_data__minimal *) | |||||
lws_protocol_vh_priv_get(lws_get_vhost(wsi), | |||||
lws_get_protocol(wsi)); | |||||
const struct msg *pmsg; | |||||
struct msg amsg; | |||||
int n, m; | |||||
switch (reason) { | |||||
case LWS_CALLBACK_PROTOCOL_INIT: | |||||
vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), | |||||
lws_get_protocol(wsi), | |||||
sizeof(struct per_vhost_data__minimal)); | |||||
vhd->context = lws_get_context(wsi); | |||||
vhd->protocol = lws_get_protocol(wsi); | |||||
vhd->vhost = lws_get_vhost(wsi); | |||||
vhd->ring = lws_ring_create(sizeof(struct msg), RING_COUNT, | |||||
__minimal_destroy_message); | |||||
if (!vhd->ring) | |||||
return 1; | |||||
break; | |||||
case LWS_CALLBACK_PROTOCOL_DESTROY: | |||||
lws_ring_destroy(vhd->ring); | |||||
break; | |||||
case LWS_CALLBACK_ESTABLISHED: | |||||
/* add ourselves to the list of live pss held in the vhd */ | |||||
lwsl_user("LWS_CALLBACK_ESTABLISHED: wsi %p\n", wsi); | |||||
lws_ll_fwd_insert(pss, pss_list, vhd->pss_list); | |||||
pss->tail = lws_ring_get_oldest_tail(vhd->ring); | |||||
pss->wsi = wsi; | |||||
break; | |||||
case LWS_CALLBACK_CLOSED: | |||||
lwsl_user("LWS_CALLBACK_CLOSED: wsi %p\n", wsi); | |||||
/* remove our closing pss from the list of live pss */ | |||||
lws_ll_fwd_remove(struct per_session_data__minimal, pss_list, | |||||
pss, vhd->pss_list); | |||||
break; | |||||
case LWS_CALLBACK_SERVER_WRITEABLE: | |||||
if (pss->culled) | |||||
break; | |||||
pmsg = lws_ring_get_element(vhd->ring, &pss->tail); | |||||
if (!pmsg) | |||||
break; | |||||
/* notice we allowed for LWS_PRE in the payload already */ | |||||
m = lws_write(wsi, ((unsigned char *)pmsg->payload) + | |||||
LWS_PRE, pmsg->len, LWS_WRITE_TEXT); | |||||
if (m < (int)pmsg->len) { | |||||
lwsl_err("ERROR %d writing to ws socket\n", m); | |||||
return -1; | |||||
} | |||||
lws_ring_consume_and_update_oldest_tail( | |||||
vhd->ring, /* lws_ring object */ | |||||
struct per_session_data__minimal, /* type of objects with tails */ | |||||
&pss->tail, /* tail of guy doing the consuming */ | |||||
1, /* number of payload objects being consumed */ | |||||
vhd->pss_list, /* head of list of objects with tails */ | |||||
tail, /* member name of tail in objects with tails */ | |||||
pss_list /* member name of next object in objects with tails */ | |||||
); | |||||
/* more to do for us? */ | |||||
if (lws_ring_get_element(vhd->ring, &pss->tail)) | |||||
/* come back as soon as we can write more */ | |||||
lws_callback_on_writable(pss->wsi); | |||||
break; | |||||
case LWS_CALLBACK_RECEIVE: | |||||
n = (int)lws_ring_get_count_free_elements(vhd->ring); | |||||
if (!n) { | |||||
/* forcibly make space */ | |||||
cull_lagging_clients(vhd); | |||||
n = (int)lws_ring_get_count_free_elements(vhd->ring); | |||||
} | |||||
if (!n) | |||||
break; | |||||
lwsl_user("LWS_CALLBACK_RECEIVE: free space %d\n", n); | |||||
amsg.len = len; | |||||
/* notice we over-allocate by LWS_PRE... */ | |||||
amsg.payload = malloc(LWS_PRE + len); | |||||
if (!amsg.payload) { | |||||
lwsl_user("OOM: dropping\n"); | |||||
break; | |||||
} | |||||
/* ...and we copy the payload in at +LWS_PRE */ | |||||
memcpy((char *)amsg.payload + LWS_PRE, in, len); | |||||
if (!lws_ring_insert(vhd->ring, &amsg, 1)) { | |||||
__minimal_destroy_message(&amsg); | |||||
lwsl_user("dropping!\n"); | |||||
break; | |||||
} | |||||
/* | |||||
* let everybody know we want to write something on them | |||||
* as soon as they are ready | |||||
*/ | |||||
lws_start_foreach_llp(struct per_session_data__minimal **, | |||||
ppss, vhd->pss_list) { | |||||
lws_callback_on_writable((*ppss)->wsi); | |||||
} lws_end_foreach_llp(ppss, pss_list); | |||||
break; | |||||
default: | |||||
break; | |||||
} | |||||
return 0; | |||||
} | |||||
#define LWS_PLUGIN_PROTOCOL_MINIMAL \ | |||||
{ \ | |||||
NETWORK_PROTOCOL, \ | |||||
callback_minimal, \ | |||||
sizeof(struct per_session_data__minimal), \ | |||||
0, \ | |||||
0, NULL, 0 \ | |||||
} |