diff --git a/CMakeLists.txt b/CMakeLists.txt index 1d27a1b..2a65eec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -173,7 +173,7 @@ add_executable( src/packages/server/log/IZ_log.h src/packages/server/log/IZ_log.c src/packages/server/main.c - src/packages/server/protocol_lws_minimal.c src/packages/server/IZ_app.c src/packages/server/IZ_app.h src/packages/server/network/IZ_wsserver.c src/packages/server/network/IZ_wsserver.h) + src/packages/server/network/IZ_wsserver.c src/packages/server/IZ_app.c src/packages/server/IZ_app.h src/packages/server/network/IZ_wsserver.h) target_link_libraries( server diff --git a/src/packages/game/IZ_app.c b/src/packages/game/IZ_app.c index 4cd8fa4..d15d324 100644 --- a/src/packages/game/IZ_app.c +++ b/src/packages/game/IZ_app.c @@ -28,7 +28,7 @@ IZ_ProcedureResult IZ_AppWSClientCallback( return 0; } -IZ_ProcedureResult IZ_AppInitialize(IZ_App* app) { +IZ_ProcedureResult IZ_AppInitialize(IZ_App* app, u8 argc, const char* argv[]) { memset(app, 0, sizeof(IZ_App)); u32 flags = ( SDL_INIT_VIDEO @@ -37,13 +37,19 @@ IZ_ProcedureResult IZ_AppInitialize(IZ_App* app) { ); if (SDL_Init(flags) < 0) { - // TODO fix logging SDL_LogError(SDL_LOG_CATEGORY_ERROR, "SDL could not initialize! SDL_Error: %s\n", SDL_GetError()); return IZ_APP_RUN_SDL_INIT_ERROR; } + const char* cmdline_buffer; char config_path[128]; - IZ_ConfigGetPath(config_path, 128); + // TODO abstract command line args parsing + if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-c"))) { + memcpy_s(config_path, 128, cmdline_buffer, 128); + } else { + IZ_ConfigGetDefaultPath(config_path, 128); + } + if (IZ_VideoInitialize(&app->video_state, config_path)) { return IZ_APP_RUN_VIDEO_INIT_ERROR; } @@ -131,13 +137,7 @@ IZ_ProcedureResult IZ_AppRunNetworkingThread(void* ptr) { } IZ_ProcedureResult IZ_AppRun(IZ_App* app, u8 argc, const char* argv[]) { - printf_s("Args (%u):\n", argc); - u8 arg_index; - for (arg_index = 0; arg_index < argc; arg_index += 1) { - printf_s(" %s\n", argv[arg_index]); - } - - IZ_ProcedureResult init_result = IZ_AppInitialize(app); + IZ_ProcedureResult init_result = IZ_AppInitialize(app, argc, argv); if (init_result) { return init_result; } diff --git a/src/packages/game/IZ_app.h b/src/packages/game/IZ_app.h index 3c06ab0..1478667 100644 --- a/src/packages/game/IZ_app.h +++ b/src/packages/game/IZ_app.h @@ -2,11 +2,7 @@ #define IZ_APP_H #include -#ifdef __WIN32__ -#include -#else -#include -#endif +#include #include "input/IZ_input.h" #include "output/IZ_video.h" #include "memory/IZ_pool.h" diff --git a/src/packages/game/IZ_config.c b/src/packages/game/IZ_config.c index 25fe3a8..21ef66d 100644 --- a/src/packages/game/IZ_config.c +++ b/src/packages/game/IZ_config.c @@ -1,6 +1,6 @@ #include "IZ_config.h" -void IZ_ConfigGetPath(char* config_path, size_t string_size) { +void IZ_ConfigGetDefaultPath(const char* config_path, size_t string_size) { //const char* config_path_dir = SDL_GetPrefPath("Modal Studios", APP_NAME); const char* config_path_dir = SDL_GetBasePath(); memcpy_s(config_path, string_size, config_path_dir, 128); diff --git a/src/packages/game/IZ_config.h b/src/packages/game/IZ_config.h index 38ddaf9..e007c7d 100644 --- a/src/packages/game/IZ_config.h +++ b/src/packages/game/IZ_config.h @@ -4,6 +4,6 @@ #include #include -void IZ_ConfigGetPath(char*, size_t); +void IZ_ConfigGetDefaultPath(const char*, size_t); #endif diff --git a/src/packages/game/memory/IZ_pool.h b/src/packages/game/memory/IZ_pool.h index 324a742..648a9be 100644 --- a/src/packages/game/memory/IZ_pool.h +++ b/src/packages/game/memory/IZ_pool.h @@ -5,7 +5,7 @@ #include "../IZ_common.h" #include "../data/IZ_list.h" -#define POOL_MAX_SIZE (1l << 23) // 16MB +#define POOL_MAX_SIZE (1llu << 23) // 16MB struct IZ_Pool; diff --git a/src/packages/server/IZ_app.c b/src/packages/server/IZ_app.c index f8cee3d..68034de 100644 --- a/src/packages/server/IZ_app.c +++ b/src/packages/server/IZ_app.c @@ -1,24 +1,75 @@ #include "IZ_app.h" +static struct lws_protocols protocols[] = { + { + .name = NETWORK_PROTOCOL, + .callback = IZ_WSServerCallback, + .per_session_data_size = sizeof(IZ_WSServerSessionData), + .rx_buffer_size = 0, + .id = 0, + .user = NULL, + .tx_packet_size = 0, + }, + { + .name = "http", + .callback = lws_callback_http_dummy, + .per_session_data_size = 0, + .rx_buffer_size = 0, + .id = 0, + .user = NULL, + .tx_packet_size = 0, + }, + LWS_PROTOCOL_LIST_TERM, +}; + +static const struct lws_http_mount mount = { + .mount_next = NULL, /* linked-list "next" */ + .mountpoint = "/", /* mountpoint URL */ + .origin = "./mount-origin", /* serve from dir */ + .def = "index.html", /* default filename */ + .protocol = NULL, + .cgienv = NULL, + .extra_mimetypes = NULL, + .interpret = NULL, + .cgi_timeout = 0, + .cache_max_age = 0, + .auth_mask = 0, + .cache_reusable = 0, + .cache_revalidate = 0, + .cache_intermediaries = 0, + .origin_protocol = LWSMPRO_FILE, /* files in a dir */ + .mountpoint_len = 1, /* char count */ + .basic_auth_login_file = NULL, +}; + void IZ_AppHandleSignal(i32 _signal) { interrupted = true; } -void IZ_AppCreateContext(IZ_App* app) { +IZ_ProcedureResult IZ_AppCreateContext(IZ_App *app) { struct lws_context_creation_info info; memset(&info, 0, sizeof info); info.port = app->config.port; + info.mounts = &mount; + info.protocols = protocols; // TODO initialize protocols info.options = ( LWS_SERVER_OPTION_VALIDATE_UTF8 | LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE ); + + app->context = lws_create_context(&info); + if (!app->context) { + return 1; + } + + return 0; } -void IZ_AppLoadConfig(IZ_App* app, u8 argc, const char** argv) { +void IZ_AppLoadConfig(IZ_App *app, u8 argc, const char **argv) { memcpy_s(app, sizeof(IZ_App), &IZ_APP_DEFAULT_STATE, sizeof(IZ_App)); - const char* cmdline_buffer; + const char *cmdline_buffer; if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-d"))) { app->config.log_level = atoi(cmdline_buffer); } @@ -37,21 +88,29 @@ void IZ_AppLoadConfig(IZ_App* app, u8 argc, const char** argv) { } } -IZ_ProcedureResult IZ_AppInitialize(IZ_App* app, u8 argc, const char** argv) { +IZ_ProcedureResult IZ_AppInitialize(IZ_App *app, u8 argc, const char **argv) { interrupted = false; signal(SIGINT, IZ_AppHandleSignal); IZ_AppLoadConfig(app, argc, argv); IZ_LogInterceptWSMessages(app->config.log_level); - IZ_AppCreateContext(app); + if (IZ_AppCreateContext(app)) { + return 1; + } return 0; } -IZ_ProcedureResult IZ_AppRun(IZ_App* app, u8 argc, const char** argv) { +IZ_ProcedureResult IZ_AppRun(IZ_App *app, u8 argc, const char **argv) { if (IZ_AppInitialize(app, argc, argv)) { return 1; } + i32 n = 0; + while (n >= 0 && !interrupted) { + n = lws_service(app->context, 0); + } + + lws_context_destroy(app->context); return 0; } diff --git a/src/packages/server/IZ_app.h b/src/packages/server/IZ_app.h index 52c77a4..ebfd300 100644 --- a/src/packages/server/IZ_app.h +++ b/src/packages/server/IZ_app.h @@ -3,9 +3,9 @@ #include #include -#include -#include "IZ_common.h" +#include "network/IZ_wsserver.h" #include "log/IZ_log.h" +#include "IZ_common.h" static bool interrupted; diff --git a/src/packages/server/main.c b/src/packages/server/main.c index bc8e782..243bde6 100644 --- a/src/packages/server/main.c +++ b/src/packages/server/main.c @@ -1,84 +1,6 @@ -#include -#include -#include +#include "IZ_app.h" -#include "IZ_common.h" -#include "log/IZ_log.h" - -#define LWS_PLUGIN_STATIC -#include "protocol_lws_minimal.c" - -static struct lws_protocols protocols[] = { - //{ "http", lws_callback_http_dummy, 0, 0, 0, NULL, 0 }, - LWS_PLUGIN_PROTOCOL_MINIMAL, - LWS_PROTOCOL_LIST_TERM -}; - -static int interrupted; - -static const struct lws_http_mount mount = { - /* .mount_next */ NULL, /* linked-list "next" */ - /* .mountpoint */ "/", /* mountpoint URL */ - /* .origin */ "./mount-origin", /* serve from dir */ - /* .def */ "index.html", /* default filename */ - /* .protocol */ NULL, - /* .cgienv */ NULL, - /* .extra_mimetypes */ NULL, - /* .interpret */ NULL, - /* .cgi_timeout */ 0, - /* .cache_max_age */ 0, - /* .auth_mask */ 0, - /* .cache_reusable */ 0, - /* .cache_revalidate */ 0, - /* .cache_intermediaries */ 0, - /* .origin_protocol */ LWSMPRO_FILE, /* files in a dir */ - /* .mountpoint_len */ 1, /* char count */ - /* .basic_auth_login_file */ NULL, -}; - -void sigint_handler(int sig) -{ - interrupted = 1; -} - -int main(int argc, const char **argv) -{ - struct lws_context_creation_info info; - struct lws_context *context; - const char *p; - int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE - /* for LLL_ verbosity above NOTICE to be built into lws, - * lws must have been configured and built with - * -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */ - /* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */ - /* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */ - /* | LLL_DEBUG */; - - signal(SIGINT, sigint_handler); - - if ((p = lws_cmdline_option(argc, argv, "-d"))) - logs = atoi(p); - - IZ_LogInterceptWSMessages(logs); - lwsl_user("LWS minimal ws server (lws_ring) | visit http://localhost:7681\n"); - - memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ - info.port = 42069; - info.mounts = &mount; - info.protocols = protocols; - info.options = - LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE; - - context = lws_create_context(&info); - if (!context) { - lwsl_err("lws init failed\n"); - return 1; - } - - while (n >= 0 && !interrupted) - n = lws_service(context, 0); - - lws_context_destroy(context); - - return 0; +IZ_ProcedureResult main(i32 argc, const char *argv[]) { + IZ_App app; + return IZ_AppRun(&app, argc, argv); } diff --git a/src/packages/server/network/IZ_wsserver.c b/src/packages/server/network/IZ_wsserver.c index 8cba446..18b669d 100644 --- a/src/packages/server/network/IZ_wsserver.c +++ b/src/packages/server/network/IZ_wsserver.c @@ -1 +1,285 @@ #include "IZ_wsserver.h" + +/* one of these created for each message */ + +static void IZ_ProtocolCullLaggingClients(IZ_WSServerVHostData *vhd) { + u32 oldest_tail = lws_ring_get_oldest_tail(vhd->ring); + IZ_WSServerSessionData *old_pss = NULL; + i32 most = 0; + i32 before = (i32) lws_ring_get_count_waiting_elements(vhd->ring, &oldest_tail); + i32 m; + + /* + * At least one guy with the oldest tail has lagged too far, filling + * the ringbuffer with stuff waiting for them, while new stuff is + * coming in, and they must close, freeing up ringbuffer entries. + */ + + lws_start_foreach_llp_safe( + IZ_WSServerSessionData**, + ppss, + vhd->pss_list, + pss_list + ) { + if ((*ppss)->tail == oldest_tail) { + old_pss = *ppss; + + lwsl_user("Killing lagging client %p\n", (*ppss)->wsi); + + lws_set_timeout((*ppss)->wsi, PENDING_TIMEOUT_LAGGING, + /* + * we may kill the wsi we came in on, + * so the actual close is deferred + */ + LWS_TO_KILL_ASYNC); + + /* + * We might try to write something before we get a + * chance to close. But this pss is now detached + * from the ring buffer. Mark this pss as culled so we + * don't try to do anything more with it. + */ + + (*ppss)->culled = true; + + /* + * Because we can't kill it synchronously, but we + * know it's closing momentarily and don't want its + * participation any more, remove its pss from the + * vhd pss list early. (This is safe to repeat + * uselessly later in the close flow). + * + * Notice this changes *ppss! + */ + + lws_ll_fwd_remove(IZ_WSServerSessionData, pss_list, (*ppss), vhd->pss_list); + + /* use the changed *ppss so we won't skip anything */ + + continue; + } + + /* + * so this guy is a survivor of the cull. Let's track + * what is the largest number of pending ring elements + * for any survivor. + */ + m = (i32) lws_ring_get_count_waiting_elements(vhd->ring, &((*ppss)->tail)); + if (m > most) { + most = m; + } + } lws_end_foreach_llp_safe(ppss); + + /* it would mean we lost track of oldest... but Coverity insists */ + if (!old_pss) { + return; + } + + /* + * Let's recover (ie, free up) all the ring slots between the + * original oldest's last one and the "worst" survivor. + */ + + lws_ring_consume_and_update_oldest_tail( + vhd->ring, + IZ_WSServerSessionData, + &old_pss->tail, + (size_t) (before - most), + vhd->pss_list, + tail, + pss_list + ); + + lwsl_user("%s: shrunk ring from %d to %d\n", __func__, before, most); +} + +/* destroys the message when everyone has had a copy of it */ + +void IZ_ProtocolDestroyMessage(void *_msg) { + IZ_WSServerMessage *msg = _msg; + + free(msg->payload); + msg->payload = NULL; + msg->len = 0; +} + +i32 IZ_WSServerInitialize(struct lws* wsi) { + IZ_WSServerVHostData* vhd_instance = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( + lws_get_vhost(wsi), + lws_get_protocol(wsi) + ); + IZ_WSServerVHostData** vhd = &vhd_instance; + *vhd = lws_protocol_vh_priv_zalloc( + lws_get_vhost(wsi), + lws_get_protocol(wsi), + sizeof(IZ_WSServerVHostData) + ); + (*vhd)->context = lws_get_context(wsi); + (*vhd)->protocol = lws_get_protocol(wsi); + (*vhd)->vhost = lws_get_vhost(wsi); + (*vhd)->ring = lws_ring_create( + sizeof(IZ_WSServerMessage), + RING_COUNT, + IZ_ProtocolDestroyMessage + ); + if (!(*vhd)->ring) { + return 1; + } + return 0; +} + +void IZ_WSServerTeardown(struct lws* wsi) { + IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( + lws_get_vhost(wsi), + lws_get_protocol(wsi) + ); + lws_ring_destroy(vhd->ring); +} + +void IZ_WSServerOnOpen(struct lws* wsi, IZ_WSServerSessionData* pss) { + IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( + lws_get_vhost(wsi), + lws_get_protocol(wsi) + ); + + /* add ourselves to the list of live pss held in the vhd */ + lwsl_user("LWS_CALLBACK_ESTABLISHED: wsi %p\n", wsi); + lws_ll_fwd_insert(pss, pss_list, vhd->pss_list); + pss->tail = lws_ring_get_oldest_tail(vhd->ring); + pss->wsi = wsi; +} + +void IZ_WSServerOnClose(struct lws* wsi, IZ_WSServerSessionData* pss) { + IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( + lws_get_vhost(wsi), + lws_get_protocol(wsi) + ); + + lwsl_user("LWS_CALLBACK_CLOSED: wsi %p\n", wsi); + /* remove our closing pss from the list of live pss */ + lws_ll_fwd_remove(IZ_WSServerSessionData, pss_list, pss, vhd->pss_list); +} + +IZ_ProcedureResult IZ_WSServerOnSend(struct lws* wsi, IZ_WSServerSessionData* pss) { + IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( + lws_get_vhost(wsi), + lws_get_protocol(wsi) + ); + + if (pss->culled) { + return 0; + } + const IZ_WSServerMessage *pmsg = lws_ring_get_element(vhd->ring, &pss->tail); + if (!pmsg) { + return 0; + } + + /* notice we allowed for LWS_PRE in the payload already */ + i32 m = lws_write( + wsi, + ((unsigned char *) pmsg->payload) + LWS_PRE, + pmsg->len, + pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT + ); + + if (m < (i32) pmsg->len) { + lwsl_err("ERROR %d writing to ws socket\n", m); + return -1; + } + + lws_ring_consume_and_update_oldest_tail( + vhd->ring, /* lws_ring object */ + IZ_WSServerSessionData, /* type of objects with tails */ + &pss->tail, /* tail of guy doing the consuming */ + 1, /* number of payload objects being consumed */ + vhd->pss_list, /* head of list of objects with tails */ + tail, /* member name of tail in objects with tails */ + pss_list /* member name of next object in objects with tails */ + ); + + /* more to do for us? */ + if (lws_ring_get_element(vhd->ring, &pss->tail)) { + /* come back as soon as we can write more */ + lws_callback_on_writable(pss->wsi); + } + + return 0; +} + +void IZ_WSServerOnReceive(struct lws* wsi, void* in, size_t len) { + IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( + lws_get_vhost(wsi), + lws_get_protocol(wsi) + ); + IZ_WSServerMessage amsg; + i32 n = (i32) lws_ring_get_count_free_elements(vhd->ring); + if (!n) { + /* forcibly make space */ + IZ_ProtocolCullLaggingClients(vhd); + n = (i32) lws_ring_get_count_free_elements(vhd->ring); + } + + if (!n) { + return; + } + + lwsl_user("LWS_CALLBACK_RECEIVE: free space %d\n", n); + + amsg.len = len; + /* notice we over-allocate by LWS_PRE... */ + amsg.payload = malloc(LWS_PRE + len); + amsg.binary = (u8) lws_frame_is_binary(wsi); + if (!amsg.payload) { + lwsl_user("OOM: dropping\n"); + return; + } + + /* ...and we copy the payload in at +LWS_PRE */ + memcpy((char *) amsg.payload + LWS_PRE, in, len); + if (!lws_ring_insert(vhd->ring, &amsg, 1)) { + IZ_ProtocolDestroyMessage(&amsg); + lwsl_user("dropping!\n"); + return; + } + + /* + * let everybody know we want to write something on them + * as soon as they are ready + */ + lws_start_foreach_llp(IZ_WSServerSessionData**, ppss, vhd->pss_list) { + lws_callback_on_writable((*ppss)->wsi); + } + + lws_end_foreach_llp(ppss, pss_list); +} + +i32 IZ_WSServerCallback( + struct lws* wsi, + enum lws_callback_reasons reason, + void* user, + void* in, + size_t len +) { + switch (reason) { + case LWS_CALLBACK_PROTOCOL_INIT: + return IZ_WSServerInitialize(wsi); + case LWS_CALLBACK_PROTOCOL_DESTROY: + IZ_WSServerTeardown(wsi); + break; + case LWS_CALLBACK_ESTABLISHED: + IZ_WSServerOnOpen(wsi, user); + break; + case LWS_CALLBACK_CLOSED: + IZ_WSServerOnClose(wsi, user); + break; + case LWS_CALLBACK_SERVER_WRITEABLE: + return IZ_WSServerOnSend(wsi, user); + case LWS_CALLBACK_RECEIVE: + IZ_WSServerOnReceive(wsi, in, len); + break; + default: + break; + } + + return 0; +} diff --git a/src/packages/server/network/IZ_wsserver.h b/src/packages/server/network/IZ_wsserver.h index 3a2f849..a4b2924 100644 --- a/src/packages/server/network/IZ_wsserver.h +++ b/src/packages/server/network/IZ_wsserver.h @@ -1,4 +1,45 @@ #ifndef IZ_WSSERVER_H #define IZ_WSSERVER_H +#include "libwebsockets.h" +#include +#include "../IZ_common.h" + +#define RING_COUNT 32 + +typedef struct { + void *payload; /* is malloc'd */ + size_t len; + u8 binary: 1; +} IZ_WSServerMessage; + +/* one of these is created for each client connecting to us */ + +typedef struct IZ_WSServerSessionData { + struct IZ_WSServerSessionData *pss_list; + struct lws *wsi; + u32 tail; + u8 culled: 1; +} IZ_WSServerSessionData; + +/* one of these is created for each vhost our protocol is used with */ + +typedef struct { + struct lws_context *context; + struct lws_vhost *vhost; + const struct lws_protocols *protocol; + + IZ_WSServerSessionData *pss_list; /* linked-list of live pss*/ + + struct lws_ring *ring; /* ringbuffer holding unsent messages */ +} IZ_WSServerVHostData; + +i32 IZ_WSServerCallback( + struct lws*, + enum lws_callback_reasons, + void*, + void*, + size_t +); + #endif diff --git a/src/packages/server/protocol_lws_minimal.c b/src/packages/server/protocol_lws_minimal.c deleted file mode 100644 index f62a53a..0000000 --- a/src/packages/server/protocol_lws_minimal.c +++ /dev/null @@ -1,268 +0,0 @@ -#if !defined (LWS_PLUGIN_STATIC) -#define LWS_DLL -#define LWS_INTERNAL -#include -#endif - -#include -#include "IZ_common.h" - -#define RING_COUNT 32 -/* one of these created for each message */ - -struct msg { - void *payload; /* is malloc'd */ - size_t len; -}; - -/* one of these is created for each client connecting to us */ - -struct per_session_data__minimal { - struct per_session_data__minimal *pss_list; - struct lws *wsi; - uint32_t tail; - - unsigned int culled:1; -}; - -/* one of these is created for each vhost our protocol is used with */ - -struct per_vhost_data__minimal { - struct lws_context *context; - struct lws_vhost *vhost; - const struct lws_protocols *protocol; - - struct per_session_data__minimal *pss_list; /* linked-list of live pss*/ - - struct lws_ring *ring; /* ringbuffer holding unsent messages */ -}; - -static void -cull_lagging_clients(struct per_vhost_data__minimal *vhd) -{ - uint32_t oldest_tail = lws_ring_get_oldest_tail(vhd->ring); - struct per_session_data__minimal *old_pss = NULL; - int most = 0, before = (int)lws_ring_get_count_waiting_elements(vhd->ring, - &oldest_tail), m; - - /* - * At least one guy with the oldest tail has lagged too far, filling - * the ringbuffer with stuff waiting for them, while new stuff is - * coming in, and they must close, freeing up ringbuffer entries. - */ - - lws_start_foreach_llp_safe(struct per_session_data__minimal **, - ppss, vhd->pss_list, pss_list) { - - if ((*ppss)->tail == oldest_tail) { - old_pss = *ppss; - - lwsl_user("Killing lagging client %p\n", (*ppss)->wsi); - - lws_set_timeout((*ppss)->wsi, PENDING_TIMEOUT_LAGGING, - /* - * we may kill the wsi we came in on, - * so the actual close is deferred - */ - LWS_TO_KILL_ASYNC); - - /* - * We might try to write something before we get a - * chance to close. But this pss is now detached - * from the ring buffer. Mark this pss as culled so we - * don't try to do anything more with it. - */ - - (*ppss)->culled = 1; - - /* - * Because we can't kill it synchronously, but we - * know it's closing momentarily and don't want its - * participation any more, remove its pss from the - * vhd pss list early. (This is safe to repeat - * uselessly later in the close flow). - * - * Notice this changes *ppss! - */ - - lws_ll_fwd_remove(struct per_session_data__minimal, - pss_list, (*ppss), vhd->pss_list); - - /* use the changed *ppss so we won't skip anything */ - - continue; - - } else { - /* - * so this guy is a survivor of the cull. Let's track - * what is the largest number of pending ring elements - * for any survivor. - */ - m = (int)lws_ring_get_count_waiting_elements(vhd->ring, - &((*ppss)->tail)); - if (m > most) - most = m; - } - - } lws_end_foreach_llp_safe(ppss); - - /* it would mean we lost track of oldest... but Coverity insists */ - if (!old_pss) - return; - - /* - * Let's recover (ie, free up) all the ring slots between the - * original oldest's last one and the "worst" survivor. - */ - - lws_ring_consume_and_update_oldest_tail(vhd->ring, - struct per_session_data__minimal, &old_pss->tail, (size_t)(before - most), - vhd->pss_list, tail, pss_list); - - lwsl_user("%s: shrunk ring from %d to %d\n", __func__, before, most); -} - -/* destroys the message when everyone has had a copy of it */ - -static void -__minimal_destroy_message(void *_msg) -{ - struct msg *msg = _msg; - - free(msg->payload); - msg->payload = NULL; - msg->len = 0; -} - -static int -callback_minimal(struct lws *wsi, enum lws_callback_reasons reason, - void *user, void *in, size_t len) -{ - struct per_session_data__minimal *pss = - (struct per_session_data__minimal *)user; - struct per_vhost_data__minimal *vhd = - (struct per_vhost_data__minimal *) - lws_protocol_vh_priv_get(lws_get_vhost(wsi), - lws_get_protocol(wsi)); - const struct msg *pmsg; - struct msg amsg; - int n, m; - - switch (reason) { - case LWS_CALLBACK_PROTOCOL_INIT: - vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), - lws_get_protocol(wsi), - sizeof(struct per_vhost_data__minimal)); - vhd->context = lws_get_context(wsi); - vhd->protocol = lws_get_protocol(wsi); - vhd->vhost = lws_get_vhost(wsi); - - vhd->ring = lws_ring_create(sizeof(struct msg), RING_COUNT, - __minimal_destroy_message); - if (!vhd->ring) - return 1; - break; - - case LWS_CALLBACK_PROTOCOL_DESTROY: - lws_ring_destroy(vhd->ring); - break; - - case LWS_CALLBACK_ESTABLISHED: - /* add ourselves to the list of live pss held in the vhd */ - lwsl_user("LWS_CALLBACK_ESTABLISHED: wsi %p\n", wsi); - lws_ll_fwd_insert(pss, pss_list, vhd->pss_list); - pss->tail = lws_ring_get_oldest_tail(vhd->ring); - pss->wsi = wsi; - break; - - case LWS_CALLBACK_CLOSED: - lwsl_user("LWS_CALLBACK_CLOSED: wsi %p\n", wsi); - /* remove our closing pss from the list of live pss */ - lws_ll_fwd_remove(struct per_session_data__minimal, pss_list, - pss, vhd->pss_list); - break; - - case LWS_CALLBACK_SERVER_WRITEABLE: - if (pss->culled) - break; - pmsg = lws_ring_get_element(vhd->ring, &pss->tail); - if (!pmsg) - break; - - /* notice we allowed for LWS_PRE in the payload already */ - m = lws_write(wsi, ((unsigned char *)pmsg->payload) + - LWS_PRE, pmsg->len, LWS_WRITE_TEXT); - if (m < (int)pmsg->len) { - lwsl_err("ERROR %d writing to ws socket\n", m); - return -1; - } - - lws_ring_consume_and_update_oldest_tail( - vhd->ring, /* lws_ring object */ - struct per_session_data__minimal, /* type of objects with tails */ - &pss->tail, /* tail of guy doing the consuming */ - 1, /* number of payload objects being consumed */ - vhd->pss_list, /* head of list of objects with tails */ - tail, /* member name of tail in objects with tails */ - pss_list /* member name of next object in objects with tails */ - ); - - /* more to do for us? */ - if (lws_ring_get_element(vhd->ring, &pss->tail)) - /* come back as soon as we can write more */ - lws_callback_on_writable(pss->wsi); - break; - - case LWS_CALLBACK_RECEIVE: - n = (int)lws_ring_get_count_free_elements(vhd->ring); - if (!n) { - /* forcibly make space */ - cull_lagging_clients(vhd); - n = (int)lws_ring_get_count_free_elements(vhd->ring); - } - if (!n) - break; - - lwsl_user("LWS_CALLBACK_RECEIVE: free space %d\n", n); - - amsg.len = len; - /* notice we over-allocate by LWS_PRE... */ - amsg.payload = malloc(LWS_PRE + len); - if (!amsg.payload) { - lwsl_user("OOM: dropping\n"); - break; - } - - /* ...and we copy the payload in at +LWS_PRE */ - memcpy((char *)amsg.payload + LWS_PRE, in, len); - if (!lws_ring_insert(vhd->ring, &amsg, 1)) { - __minimal_destroy_message(&amsg); - lwsl_user("dropping!\n"); - break; - } - - /* - * let everybody know we want to write something on them - * as soon as they are ready - */ - lws_start_foreach_llp(struct per_session_data__minimal **, - ppss, vhd->pss_list) { - lws_callback_on_writable((*ppss)->wsi); - } lws_end_foreach_llp(ppss, pss_list); - break; - - default: - break; - } - - return 0; -} - -#define LWS_PLUGIN_PROTOCOL_MINIMAL \ - { \ - NETWORK_PROTOCOL, \ - callback_minimal, \ - sizeof(struct per_session_data__minimal), \ - 0, \ - 0, NULL, 0 \ - }