From 67036afd8a8b70a6a7a83b05e06cdae34b3cc4c2 Mon Sep 17 00:00:00 2001 From: TheoryOfNekomata Date: Tue, 14 Jun 2022 14:25:30 +0800 Subject: [PATCH] Add write logic from client Connect client to server, save connection details. --- CMakeLists.txt | 2 + src/packages/game/IZ_app.c | 135 +++++----- src/packages/game/IZ_config.c | 2 +- src/packages/game/network/IZ_websocket.h | 1 + src/packages/game/network/IZ_wsclient.c | 43 ++- src/packages/game/network/IZ_wsclient.h | 27 +- src/packages/server/IZ_app.c | 293 +++++++++++++++++++-- src/packages/server/IZ_app.h | 16 +- src/packages/server/IZ_config.c | 8 + src/packages/server/IZ_config.h | 10 + src/packages/server/network/IZ_wsserver.c | 307 ++++------------------ src/packages/server/network/IZ_wsserver.h | 46 ++-- 12 files changed, 492 insertions(+), 398 deletions(-) create mode 100644 src/packages/server/IZ_config.c create mode 100644 src/packages/server/IZ_config.h diff --git a/CMakeLists.txt b/CMakeLists.txt index c542bdb..f00d7f7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -186,6 +186,8 @@ add_executable( src/packages/server/main.c src/packages/server/IZ_app.c src/packages/server/IZ_app.h + src/packages/server/IZ_config.c + src/packages/server/IZ_config.h src/packages/server/network/IZ_wsserver.c src/packages/server/network/IZ_wsserver.h src/packages/server/network/IZ_websocket.h diff --git a/src/packages/game/IZ_app.c b/src/packages/game/IZ_app.c index 639165e..0dcb4b2 100644 --- a/src/packages/game/IZ_app.c +++ b/src/packages/game/IZ_app.c @@ -1,7 +1,7 @@ #include "IZ_app.h" -IZ_ProcedureResult IZ_AppWSClientInitialize(IZ_App* app, IZ_WSClientInitializeParams params) { - if (IZ_WSClientInitialize(&app->client, params)) { +IZ_ProcedureResult IZ_AppWSClientInitialize(IZ_App* app) { + if (IZ_WSClientInitialize(&app->client)) { printf("error\n"); return -1; } @@ -10,14 +10,7 @@ IZ_ProcedureResult IZ_AppWSClientInitialize(IZ_App* app, IZ_WSClientInitializePa IZ_ProcedureResult IZ_AppConnect(void* app_raw) { IZ_App* app = app_raw; - IZ_WSClientInitializeParams params = { - .address = "127.0.0.1", - .path = "/", - .port = 42069, - .userdata = app, - }; - - if (IZ_AppWSClientInitialize(app, params)) { + if (IZ_AppWSClientInitialize(app)) { return -1; } @@ -37,11 +30,29 @@ IZ_ProcedureResult IZ_AppConnect(void* app_raw) { return result; } -void IZ_AppEstablishConnection(IZ_App* app) { +void IZ_AppHandleNetworkingOutboundEvents(IZ_App* app) { + struct lws* wsi = app->client.ws.connection; + printf("%p\n", wsi); + if (!wsi) { + return; + } + IZ_WSClientVHostData* vhd = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get( + lws_get_vhost(wsi), + lws_get_protocol(wsi) + ); + + IZ_WebsocketMessage* amsg = IZ_WSClientCreateMessage(wsi, false, "hello", 5); + lws_ring_insert(vhd->ring, amsg, 1); + lws_callback_on_writable(wsi); +} + +void IZ_AppEstablishConnection(IZ_App* app, IZ_WSClientInitializeParams params) { if (app->client.ws.context) { return; } + app->client.params = params; + app->client.userdata = app; app->client_thread = SDL_CreateThread(IZ_AppConnect, "networking", app); SDL_DetachThread(app->client_thread); } @@ -108,9 +119,15 @@ IZ_ProcedureResult IZ_AppHandleSDLEvents(IZ_App* app) { if (e.type == SDL_KEYDOWN) { if (e.key.keysym.sym == SDLK_PAGEUP) { - IZ_AppEstablishConnection(app); + IZ_AppEstablishConnection(app, (IZ_WSClientInitializeParams){ + .address = "127.0.0.1", + .path = "/", + .port = 42069, + }); } else if (e.key.keysym.sym == SDLK_PAGEDOWN) { IZ_AppCloseConnection(app); + } else if (e.key.keysym.sym == SDLK_INSERT) { + IZ_AppHandleNetworkingOutboundEvents(app); } } @@ -184,15 +201,12 @@ IZ_ProcedureResult IZ_AppRun(IZ_App* app, u8 argc, const char* argv[]) { // TODO do audio processing // TODO do networking - //if (app->client.connection) { - // FIXME stuck in infinite loop - //IZ_WSClientHandle(&app->client); - //} if (IZ_AppHandleInputEvents(app)) { break; } + IZ_VideoUpdate(&app->video_state, app->ticks, &app->input_state); } @@ -275,7 +289,6 @@ void IZ_WSClientProtocolTeardown(struct lws* wsi) { lws_get_protocol(wsi) ); - vhd->finished = true; if (vhd->ring) { lws_ring_destroy(vhd->ring); } @@ -290,6 +303,8 @@ void IZ_WSClientConnectionError(struct lws* wsi, void* in) { ); lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", in ? (char *)in : "(null)"); + IZ_App* app = (IZ_App*) vhd->app; + app->client.ws.connection = NULL; vhd->client_wsi = NULL; lws_sul_schedule( vhd->context, @@ -306,7 +321,8 @@ IZ_ProcedureResult IZ_WSClientOnOpen(struct lws* wsi, IZ_WSClientSessionData* ps lws_get_protocol(wsi) ); - vhd->established = true; + IZ_App* app = (IZ_App*) vhd->app; + app->client.ws.connection = wsi; pss->ring = lws_ring_create(sizeof(IZ_WebsocketMessage), RING_COUNT,IZ_WebsocketDestroyMessage); if (!pss->ring) { return -1; @@ -321,8 +337,9 @@ void IZ_WSClientOnClose(struct lws* wsi) { lws_get_protocol(wsi) ); + IZ_App* app = (IZ_App*) vhd->app; + app->client.ws.connection = NULL; vhd->client_wsi = NULL; - vhd->established = false; lws_sul_schedule( vhd->context, 0, @@ -367,87 +384,65 @@ IZ_ProcedureResult IZ_WSClientWritable(struct lws* wsi) { return 0; } -void IZ_WSClientOnReceive(struct lws* wsi, IZ_WSClientSessionData* pss, void* in, size_t len) { - lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: %4d (rpp %5d, first %d, last %d, bin %d)\n", - (int)len, (int)lws_remaining_packet_payload(wsi), - lws_is_first_fragment(wsi), - lws_is_final_fragment(wsi), - lws_frame_is_binary(wsi)); - - // lwsl_hexdump_notice(in, len); +IZ_WebsocketMessage* IZ_WSClientCreateMessage(struct lws* wsi, bool binary, void* in, size_t len) { IZ_WebsocketMessage amsg; amsg.first = (char)lws_is_first_fragment(wsi); amsg.final = (char)lws_is_final_fragment(wsi); - amsg.binary = (char)lws_frame_is_binary(wsi); + amsg.binary = binary; + + amsg.len = len; + /* notice we over-allocate by LWS_PRE */ + amsg.payload = malloc(LWS_PRE + len); + if (!amsg.payload) { + return NULL; + } + + memcpy((char*) amsg.payload + LWS_PRE, in, len); + return &amsg; +} + +void IZ_WSClientOnReceive(struct lws* wsi, IZ_WSClientSessionData* pss, void* in, size_t len) { i32 n = (i32) lws_ring_get_count_free_elements(pss->ring); if (!n) { lwsl_user("dropping!\n"); return; } - amsg.len = len; - /* notice we over-allocate by LWS_PRE */ - amsg.payload = malloc(LWS_PRE + len); - if (!amsg.payload) { + lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: %4d (rpp %5d, first %d, last %d, bin %d)\n", + (int)len, (int)lws_remaining_packet_payload(wsi), + lws_is_first_fragment(wsi), + lws_is_final_fragment(wsi), + lws_frame_is_binary(wsi)); + + // lwsl_hexdump_notice(in, len); + + IZ_WebsocketMessage* amsg = IZ_WSClientCreateMessage(wsi, (bool) lws_frame_is_binary(wsi), in, len); + if (!amsg) { lwsl_user("OOM: dropping\n"); return; } - memcpy((char *)amsg.payload + LWS_PRE, in, len); IZ_WSClientVHostData* vhd = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get( lws_get_vhost(wsi), lws_get_protocol(wsi) ); IZ_App* app = (IZ_App*) vhd->app; - if (amsg.binary) { + if (amsg->binary) { IZ_AppHandleNetworkingInboundBinaryEvents(app, in, len); } else { IZ_AppHandleNetworkingInboundTextEvents(app, in, len); } - if (!lws_ring_insert(pss->ring, &amsg, 1)) { - IZ_WebsocketDestroyMessage(&amsg); + if (!lws_ring_insert(pss->ring, amsg, 1)) { + IZ_WebsocketDestroyMessage(amsg); lwsl_user("dropping!\n"); return; } lws_callback_on_writable(wsi); if (!pss->flow_controlled && n < 3) { - pss->flow_controlled = 1; + pss->flow_controlled = true; lws_rx_flow_control(wsi, 0); } } - -IZ_ProcedureResult IZ_WSClientCallback( - struct lws* wsi, - enum lws_callback_reasons reason, - void* user, - void* in, - size_t len -) { - switch (reason) { - case LWS_CALLBACK_PROTOCOL_INIT: - return IZ_WSClientProtocolInitialize(wsi, in); - case LWS_CALLBACK_PROTOCOL_DESTROY: - IZ_WSClientProtocolTeardown(wsi); - break; - case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: - IZ_WSClientConnectionError(wsi, in); - break; - case LWS_CALLBACK_CLIENT_ESTABLISHED: - return IZ_WSClientOnOpen(wsi, user); - case LWS_CALLBACK_CLIENT_CLOSED: - IZ_WSClientOnClose(wsi); - break; - case LWS_CALLBACK_CLIENT_RECEIVE: - IZ_WSClientOnReceive(wsi, user, in, len); - break; - case LWS_CALLBACK_CLIENT_WRITEABLE: - return IZ_WSClientWritable(wsi); - default: - break; - } - - return lws_callback_http_dummy(wsi, reason, user, in, len); -} diff --git a/src/packages/game/IZ_config.c b/src/packages/game/IZ_config.c index 21ef66d..0fa3f4a 100644 --- a/src/packages/game/IZ_config.c +++ b/src/packages/game/IZ_config.c @@ -4,5 +4,5 @@ void IZ_ConfigGetDefaultPath(const char* config_path, size_t string_size) { //const char* config_path_dir = SDL_GetPrefPath("Modal Studios", APP_NAME); const char* config_path_dir = SDL_GetBasePath(); memcpy_s(config_path, string_size, config_path_dir, 128); - strcat_s(config_path, string_size, "config.ini"); + strcat_s(config_path, string_size, "config-game.ini"); } diff --git a/src/packages/game/network/IZ_websocket.h b/src/packages/game/network/IZ_websocket.h index bd08d89..5364518 100644 --- a/src/packages/game/network/IZ_websocket.h +++ b/src/packages/game/network/IZ_websocket.h @@ -18,6 +18,7 @@ typedef struct { typedef struct { struct lws_context* context; + struct lws* connection; u8 interrupted: 1; } IZ_Websocket; diff --git a/src/packages/game/network/IZ_wsclient.c b/src/packages/game/network/IZ_wsclient.c index cd9ac1a..99ff27a 100644 --- a/src/packages/game/network/IZ_wsclient.c +++ b/src/packages/game/network/IZ_wsclient.c @@ -1,6 +1,39 @@ #include "IZ_wsclient.h" -IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState* state, IZ_WSClientInitializeParams params) { +IZ_ProcedureResult IZ_WSClientCallback( + struct lws* wsi, + enum lws_callback_reasons reason, + void* user, + void* in, + size_t len +) { + switch (reason) { + case LWS_CALLBACK_PROTOCOL_INIT: + return IZ_WSClientProtocolInitialize(wsi, in); + case LWS_CALLBACK_PROTOCOL_DESTROY: + IZ_WSClientProtocolTeardown(wsi); + break; + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + IZ_WSClientConnectionError(wsi, in); + break; + case LWS_CALLBACK_CLIENT_ESTABLISHED: + return IZ_WSClientOnOpen(wsi, user); + case LWS_CALLBACK_CLIENT_CLOSED: + IZ_WSClientOnClose(wsi); + break; + case LWS_CALLBACK_CLIENT_RECEIVE: + IZ_WSClientOnReceive(wsi, user, in, len); + break; + case LWS_CALLBACK_CLIENT_WRITEABLE: + return IZ_WSClientWritable(wsi); + default: + break; + } + + return lws_callback_http_dummy(wsi, reason, user, in, len); +} + +IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState* state) { struct lws_context_creation_info info; memset(&info, 0, sizeof info); info.port = CONTEXT_PORT_NO_LISTEN; @@ -25,7 +58,7 @@ IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState* state, IZ_WSClientIni "address", /* pvo name */ "localhost" /* pvo value */ }; - pvo_address.value = params.address; + pvo_address.value = state->params.address; static struct lws_protocol_vhost_options pvo_path = { &pvo_address, @@ -33,7 +66,7 @@ IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState* state, IZ_WSClientIni "path", /* pvo name */ "/" /* pvo value */ }; - pvo_path.value = params.path; + pvo_path.value = state->params.path; static struct lws_protocol_vhost_options pvo_port = { &pvo_path, @@ -41,7 +74,7 @@ IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState* state, IZ_WSClientIni "port", /* pvo name */ NULL /* pvo value */ }; - pvo_port.value = (void*) ¶ms.port; + pvo_port.value = (void*) &state->params.port; static struct lws_protocol_vhost_options pvo_app = { &pvo_port, @@ -49,7 +82,7 @@ IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState* state, IZ_WSClientIni "app", NULL, }; - pvo_app.value = params.userdata; + pvo_app.value = state->userdata; static const struct lws_protocol_vhost_options pvo = { NULL, /* "next" pvo linked-list */ diff --git a/src/packages/game/network/IZ_wsclient.h b/src/packages/game/network/IZ_wsclient.h index d390081..0d01daf 100644 --- a/src/packages/game/network/IZ_wsclient.h +++ b/src/packages/game/network/IZ_wsclient.h @@ -18,8 +18,6 @@ typedef struct { struct lws_client_connect_info i; struct lws *client_wsi; - u8 finished: 1; - u8 established: 1; const char* address; const char* path; u16* port; @@ -29,23 +27,22 @@ typedef struct { typedef struct { struct lws_ring *ring; u32 tail; - char flow_controlled; - u8 completed: 1; - u8 write_consume_pending: 1; + unsigned char flow_controlled: 1; } IZ_WSClientSessionData; typedef struct { const char* address; const char* path; u16 port; - void* userdata; } IZ_WSClientInitializeParams; typedef struct { IZ_Websocket ws; + void* userdata; + IZ_WSClientInitializeParams params; } IZ_WSClientState; -IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState*, IZ_WSClientInitializeParams); +IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState*); IZ_ProcedureResult IZ_WSClientHandle(IZ_WSClientState*); @@ -53,6 +50,20 @@ void IZ_WSClientTeardown(IZ_WSClientState*); void IZ_WSClientCancelService(IZ_WSClientState*); -IZ_ProcedureResult IZ_WSClientCallback(struct lws*, enum lws_callback_reasons, void*, void*, size_t); +IZ_ProcedureResult IZ_WSClientProtocolInitialize(struct lws*, void*); + +void IZ_WSClientProtocolTeardown(struct lws*); + +void IZ_WSClientConnectionError(struct lws*, void*); + +IZ_ProcedureResult IZ_WSClientOnOpen(struct lws*, IZ_WSClientSessionData*); + +void IZ_WSClientOnClose(struct lws*); + +IZ_ProcedureResult IZ_WSClientWritable(struct lws*); + +void IZ_WSClientOnReceive(struct lws*, IZ_WSClientSessionData*, void*, size_t); + +IZ_WebsocketMessage* IZ_WSClientCreateMessage(struct lws*, bool, void*, size_t); #endif diff --git a/src/packages/server/IZ_app.c b/src/packages/server/IZ_app.c index 20a0d2e..0f9baee 100644 --- a/src/packages/server/IZ_app.c +++ b/src/packages/server/IZ_app.c @@ -1,42 +1,33 @@ #include "IZ_app.h" +static IZ_App* global_app; + void IZ_AppHandleSignal(i32 _signal) { global_app->server.ws.interrupted = true; IZ_WSServerCancelService(&global_app->server); } -// TODO move to each subsystem -// TODO unify loading of config from cmdline and config file -void IZ_AppLoadConfig(IZ_App *app, u8 argc, const char **argv) { - memcpy_s(app, sizeof(IZ_App), &IZ_APP_DEFAULT_STATE, sizeof(IZ_App)); +IZ_ProcedureResult IZ_AppInitialize(IZ_App *app, u8 argc, const char **argv) { + global_app = app; + memset(app, 0, sizeof(IZ_App)); + signal(SIGINT, IZ_AppHandleSignal); - const char *cmdline_buffer; - if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-d"))) { - app->config.log_level = atoi(cmdline_buffer); - } + // IZ_LogInterceptWSMessages(app->config.log_level); + IZ_LogInterceptWSMessages(LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE); - if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-p"))) { - app->config.port = atoi(cmdline_buffer); + const char* cmdline_buffer; + char config_path[128]; + // TODO abstract command line args parsing + if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-c"))) { + memcpy_s(config_path, 128, cmdline_buffer, 128); + } else { + IZ_ConfigGetDefaultPath(config_path, 128); } -} -IZ_ProcedureResult IZ_AppInitialize(IZ_App *app, u8 argc, const char **argv) { - signal(SIGINT, IZ_AppHandleSignal); - signal(9, IZ_AppHandleSignal); - signal(SIGTERM, IZ_AppHandleSignal); - - IZ_AppLoadConfig(app, argc, argv); - IZ_LogInterceptWSMessages(app->config.log_level); - if (IZ_WSServerInitialize(&app->server, (IZ_WSServerInitializeParams) { - .protocol = NULL, - .port = app->config.port, - .default_filename = NULL, - .origin = NULL, - })) { + if (IZ_WSServerInitialize(&app->server, app, config_path, argc, argv)) { return -1; } - global_app = app; return 0; } @@ -61,3 +52,255 @@ IZ_ProcedureResult IZ_AppRun(IZ_App *app, u8 argc, const char **argv) { lwsl_user("Server closed. Bye!\n"); return result; } + +void IZ_WSServerCullLaggingClients(IZ_WSServerVHostData *vhd) { + u32 oldest_tail = lws_ring_get_oldest_tail(vhd->ring); + IZ_WSServerSessionData *old_pss = NULL; + i32 most = 0; + i32 before = (i32) lws_ring_get_count_waiting_elements(vhd->ring, &oldest_tail); + i32 m; + + /* + * At least one guy with the oldest tail has lagged too far, filling + * the ringbuffer with stuff waiting for them, while new stuff is + * coming in, and they must close, freeing up ringbuffer entries. + */ + + lws_start_foreach_llp_safe( + IZ_WSServerSessionData**, + ppss, + vhd->pss_list, + pss_list + ) { + if ((*ppss)->tail == oldest_tail) { + old_pss = *ppss; + + lwsl_user("Killing lagging client %p\n", (*ppss)->wsi); + + lws_set_timeout((*ppss)->wsi, PENDING_TIMEOUT_LAGGING, + /* + * we may kill the wsi we came in on, + * so the actual close is deferred + */ + LWS_TO_KILL_ASYNC); + + /* + * We might try to write something before we get a + * chance to close. But this pss is now detached + * from the ring buffer. Mark this pss as culled so we + * don't try to do anything more with it. + */ + + (*ppss)->culled = true; + + /* + * Because we can't kill it synchronously, but we + * know it's closing momentarily and don't want its + * participation any more, remove its pss from the + * vhd pss list early. (This is safe to repeat + * uselessly later in the close flow). + * + * Notice this changes *ppss! + */ + + lws_ll_fwd_remove(IZ_WSServerSessionData, pss_list, (*ppss), vhd->pss_list); + + /* use the changed *ppss so we won't skip anything */ + + continue; + } + + /* + * so this guy is a survivor of the cull. Let's track + * what is the largest number of pending ring elements + * for any survivor. + */ + m = (i32) lws_ring_get_count_waiting_elements(vhd->ring, &((*ppss)->tail)); + if (m > most) { + most = m; + } + } lws_end_foreach_llp_safe(ppss); + + /* it would mean we lost track of oldest... but Coverity insists */ + if (!old_pss) { + return; + } + + /* + * Let's recover (ie, free up) all the ring slots between the + * original oldest's last one and the "worst" survivor. + */ + + lws_ring_consume_and_update_oldest_tail( + vhd->ring, + IZ_WSServerSessionData, + &old_pss->tail, + (size_t) (before - most), + vhd->pss_list, + tail, + pss_list + ); + + lwsl_user("%s: shrunk ring from %d to %d\n", __func__, before, most); +} + +/* destroys the message when everyone has had a copy of it */ + +IZ_ProcedureResult IZ_WSServerProtocolInitialize(struct lws* wsi, void* in) { + IZ_WSServerVHostData* vhd_instance = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( + lws_get_vhost(wsi), + lws_get_protocol(wsi) + ); + IZ_WSServerVHostData** vhd = &vhd_instance; + *vhd = lws_protocol_vh_priv_zalloc( + lws_get_vhost(wsi), + lws_get_protocol(wsi), + sizeof(IZ_WSServerVHostData) + ); + (*vhd)->context = lws_get_context(wsi); + (*vhd)->protocol = lws_get_protocol(wsi); + (*vhd)->vhost = lws_get_vhost(wsi); + (*vhd)->port = (u16*) lws_pvo_search( + (const struct lws_protocol_vhost_options *)in, + "port" + )->value; + (*vhd)->app = lws_pvo_search( + (const struct lws_protocol_vhost_options *)in, + "app" + )->value; + (*vhd)->ring = lws_ring_create( + sizeof(IZ_WebsocketMessage), + RING_COUNT, + IZ_WebsocketDestroyMessage + ); + if (!(*vhd)->ring) { + return -1; + } + return 0; +} + +void IZ_WSServerProtocolTeardown(struct lws* wsi) { + IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( + lws_get_vhost(wsi), + lws_get_protocol(wsi) + ); + lws_ring_destroy(vhd->ring); +} + +void IZ_WSServerOnOpen(struct lws* wsi, IZ_WSServerSessionData* pss) { + IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( + lws_get_vhost(wsi), + lws_get_protocol(wsi) + ); + + /* add ourselves to the list of live pss held in the vhd */ + lwsl_user("LWS_CALLBACK_ESTABLISHED: wsi %p\n", wsi); + lws_ll_fwd_insert(pss, pss_list, vhd->pss_list); + pss->tail = lws_ring_get_oldest_tail(vhd->ring); + pss->wsi = wsi; +} + +void IZ_WSServerOnClose(struct lws* wsi, IZ_WSServerSessionData* pss) { + IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( + lws_get_vhost(wsi), + lws_get_protocol(wsi) + ); + + lwsl_user("LWS_CALLBACK_CLOSED: wsi %p\n", wsi); + /* remove our closing pss from the list of live pss */ + lws_ll_fwd_remove(IZ_WSServerSessionData, pss_list, pss, vhd->pss_list); +} + +IZ_ProcedureResult IZ_WSServerWritable(struct lws* wsi, IZ_WSServerSessionData* pss) { + IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( + lws_get_vhost(wsi), + lws_get_protocol(wsi) + ); + + if (pss->culled) { + return 0; + } + const IZ_WebsocketMessage* pmsg = lws_ring_get_element(vhd->ring, &pss->tail); + if (!pmsg) { + return 0; + } + + /* notice we allowed for LWS_PRE in the payload already */ + i32 m = lws_write( + wsi, + ((unsigned char*) pmsg->payload) + LWS_PRE, + pmsg->len, + pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT + ); + + if (m < (i32) pmsg->len) { + lwsl_err("ERROR %d writing to ws socket\n", m); + return -1; + } + + lws_ring_consume_and_update_oldest_tail( + vhd->ring, /* lws_ring object */ + IZ_WSServerSessionData, /* type of objects with tails */ + &pss->tail, /* tail of guy doing the consuming */ + 1, /* number of payload objects being consumed */ + vhd->pss_list, /* head of list of objects with tails */ + tail, /* member name of tail in objects with tails */ + pss_list /* member name of next object in objects with tails */ + ); + + /* more to do for us? */ + if (lws_ring_get_element(vhd->ring, &pss->tail)) { + /* come back as soon as we can write more */ + lws_callback_on_writable(pss->wsi); + } + + return 0; +} + +IZ_ProcedureResult IZ_WSServerOnReceive(struct lws* wsi, void* in, size_t len) { + IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( + lws_get_vhost(wsi), + lws_get_protocol(wsi) + ); + i32 n = (i32) lws_ring_get_count_free_elements(vhd->ring); + if (!n) { + /* forcibly make space */ + IZ_WSServerCullLaggingClients(vhd); + n = (i32) lws_ring_get_count_free_elements(vhd->ring); + } + + if (!n) { + return 0; + } + + lwsl_user("LWS_CALLBACK_RECEIVE: free space %d\n", n); + + IZ_WebsocketMessage amsg; + amsg.len = len; + /* notice we over-allocate by LWS_PRE... */ + amsg.payload = malloc(LWS_PRE + len); + amsg.binary = (u8) lws_frame_is_binary(wsi); + if (!amsg.payload) { + lwsl_user("OOM: dropping\n"); + return 1; + } + + /* ...and we copy the payload in at +LWS_PRE */ + memcpy((char *) amsg.payload + LWS_PRE, in, len); + if (!lws_ring_insert(vhd->ring, &amsg, 1)) { + IZ_WebsocketDestroyMessage(&amsg); + lwsl_user("dropping!\n"); + return 1; + } + + /* + * let everybody know we want to write something on them + * as soon as they are ready + */ + lws_start_foreach_llp(IZ_WSServerSessionData**, ppss, vhd->pss_list) { + lws_callback_on_writable((*ppss)->wsi); + } + + lws_end_foreach_llp(ppss, pss_list); + return 0; +} diff --git a/src/packages/server/IZ_app.h b/src/packages/server/IZ_app.h index a1561bc..45dab43 100644 --- a/src/packages/server/IZ_app.h +++ b/src/packages/server/IZ_app.h @@ -6,26 +6,12 @@ #include "network/IZ_wsserver.h" #include "log/IZ_log.h" #include "IZ_common.h" +#include "IZ_config.h" typedef struct { - i32 log_level; - u16 port; -} IZ_AppConfig; - -typedef struct { - IZ_AppConfig config; IZ_WSServerState server; } IZ_App; -static const IZ_App IZ_APP_DEFAULT_STATE = { - .config = { - .log_level = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE, - .port = 42069, - }, -}; - -static IZ_App* global_app; - IZ_ProcedureResult IZ_AppRun(IZ_App*, u8, const char**); #endif diff --git a/src/packages/server/IZ_config.c b/src/packages/server/IZ_config.c new file mode 100644 index 0000000..5b79fba --- /dev/null +++ b/src/packages/server/IZ_config.c @@ -0,0 +1,8 @@ +#include "IZ_config.h" + +void IZ_ConfigGetDefaultPath(const char* config_path, size_t string_size) { + //const char* config_path_dir = SDL_GetPrefPath("Modal Studios", APP_NAME); + const char* config_path_dir = SDL_GetBasePath(); + memcpy_s(config_path, string_size, config_path_dir, 128); + strcat_s(config_path, string_size, "config-server.ini"); +} diff --git a/src/packages/server/IZ_config.h b/src/packages/server/IZ_config.h new file mode 100644 index 0000000..168daf4 --- /dev/null +++ b/src/packages/server/IZ_config.h @@ -0,0 +1,10 @@ +#ifndef IZ_CONFIG_H +#define IZ_CONFIG_H + +#include +#include + +// TODO unify loading of config from cmdline and config file +void IZ_ConfigGetDefaultPath(const char*, size_t); + +#endif diff --git a/src/packages/server/network/IZ_wsserver.c b/src/packages/server/network/IZ_wsserver.c index 4d5625f..134f272 100644 --- a/src/packages/server/network/IZ_wsserver.c +++ b/src/packages/server/network/IZ_wsserver.c @@ -1,248 +1,5 @@ #include "IZ_wsserver.h" -void IZ_ProtocolCullLaggingClients(IZ_WSServerVHostData *vhd) { - u32 oldest_tail = lws_ring_get_oldest_tail(vhd->ring); - IZ_WSServerSessionData *old_pss = NULL; - i32 most = 0; - i32 before = (i32) lws_ring_get_count_waiting_elements(vhd->ring, &oldest_tail); - i32 m; - - /* - * At least one guy with the oldest tail has lagged too far, filling - * the ringbuffer with stuff waiting for them, while new stuff is - * coming in, and they must close, freeing up ringbuffer entries. - */ - - lws_start_foreach_llp_safe( - IZ_WSServerSessionData**, - ppss, - vhd->pss_list, - pss_list - ) { - if ((*ppss)->tail == oldest_tail) { - old_pss = *ppss; - - lwsl_user("Killing lagging client %p\n", (*ppss)->wsi); - - lws_set_timeout((*ppss)->wsi, PENDING_TIMEOUT_LAGGING, - /* - * we may kill the wsi we came in on, - * so the actual close is deferred - */ - LWS_TO_KILL_ASYNC); - - /* - * We might try to write something before we get a - * chance to close. But this pss is now detached - * from the ring buffer. Mark this pss as culled so we - * don't try to do anything more with it. - */ - - (*ppss)->culled = true; - - /* - * Because we can't kill it synchronously, but we - * know it's closing momentarily and don't want its - * participation any more, remove its pss from the - * vhd pss list early. (This is safe to repeat - * uselessly later in the close flow). - * - * Notice this changes *ppss! - */ - - lws_ll_fwd_remove(IZ_WSServerSessionData, pss_list, (*ppss), vhd->pss_list); - - /* use the changed *ppss so we won't skip anything */ - - continue; - } - - /* - * so this guy is a survivor of the cull. Let's track - * what is the largest number of pending ring elements - * for any survivor. - */ - m = (i32) lws_ring_get_count_waiting_elements(vhd->ring, &((*ppss)->tail)); - if (m > most) { - most = m; - } - } lws_end_foreach_llp_safe(ppss); - - /* it would mean we lost track of oldest... but Coverity insists */ - if (!old_pss) { - return; - } - - /* - * Let's recover (ie, free up) all the ring slots between the - * original oldest's last one and the "worst" survivor. - */ - - lws_ring_consume_and_update_oldest_tail( - vhd->ring, - IZ_WSServerSessionData, - &old_pss->tail, - (size_t) (before - most), - vhd->pss_list, - tail, - pss_list - ); - - lwsl_user("%s: shrunk ring from %d to %d\n", __func__, before, most); -} - -/* destroys the message when everyone has had a copy of it */ - -IZ_ProcedureResult IZ_WSServerProtocolInitialize(struct lws* wsi) { - IZ_WSServerVHostData* vhd_instance = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( - lws_get_vhost(wsi), - lws_get_protocol(wsi) - ); - IZ_WSServerVHostData** vhd = &vhd_instance; - *vhd = lws_protocol_vh_priv_zalloc( - lws_get_vhost(wsi), - lws_get_protocol(wsi), - sizeof(IZ_WSServerVHostData) - ); - (*vhd)->context = lws_get_context(wsi); - (*vhd)->protocol = lws_get_protocol(wsi); - (*vhd)->vhost = lws_get_vhost(wsi); - (*vhd)->ring = lws_ring_create( - sizeof(IZ_WebsocketMessage), - RING_COUNT, - IZ_WebsocketDestroyMessage - ); - if (!(*vhd)->ring) { - return 1; - } - return 0; -} - -void IZ_WSServerProtocolTeardown(struct lws* wsi) { - IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( - lws_get_vhost(wsi), - lws_get_protocol(wsi) - ); - lws_ring_destroy(vhd->ring); -} - -void IZ_WSServerOnOpen(struct lws* wsi, IZ_WSServerSessionData* pss) { - IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( - lws_get_vhost(wsi), - lws_get_protocol(wsi) - ); - - /* add ourselves to the list of live pss held in the vhd */ - lwsl_user("LWS_CALLBACK_ESTABLISHED: wsi %p\n", wsi); - lws_ll_fwd_insert(pss, pss_list, vhd->pss_list); - pss->tail = lws_ring_get_oldest_tail(vhd->ring); - pss->wsi = wsi; -} - -void IZ_WSServerOnClose(struct lws* wsi, IZ_WSServerSessionData* pss) { - IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( - lws_get_vhost(wsi), - lws_get_protocol(wsi) - ); - - lwsl_user("LWS_CALLBACK_CLOSED: wsi %p\n", wsi); - /* remove our closing pss from the list of live pss */ - lws_ll_fwd_remove(IZ_WSServerSessionData, pss_list, pss, vhd->pss_list); -} - -IZ_ProcedureResult IZ_WSServerWritable(struct lws* wsi, IZ_WSServerSessionData* pss) { - IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( - lws_get_vhost(wsi), - lws_get_protocol(wsi) - ); - - if (pss->culled) { - return 0; - } - const IZ_WebsocketMessage* pmsg = lws_ring_get_element(vhd->ring, &pss->tail); - if (!pmsg) { - return 0; - } - - /* notice we allowed for LWS_PRE in the payload already */ - i32 m = lws_write( - wsi, - ((unsigned char*) pmsg->payload) + LWS_PRE, - pmsg->len, - pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT - ); - - if (m < (i32) pmsg->len) { - lwsl_err("ERROR %d writing to ws socket\n", m); - return -1; - } - - lws_ring_consume_and_update_oldest_tail( - vhd->ring, /* lws_ring object */ - IZ_WSServerSessionData, /* type of objects with tails */ - &pss->tail, /* tail of guy doing the consuming */ - 1, /* number of payload objects being consumed */ - vhd->pss_list, /* head of list of objects with tails */ - tail, /* member name of tail in objects with tails */ - pss_list /* member name of next object in objects with tails */ - ); - - /* more to do for us? */ - if (lws_ring_get_element(vhd->ring, &pss->tail)) { - /* come back as soon as we can write more */ - lws_callback_on_writable(pss->wsi); - } - - return 0; -} - -void IZ_WSServerOnReceive(struct lws* wsi, void* in, size_t len) { - IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get( - lws_get_vhost(wsi), - lws_get_protocol(wsi) - ); - IZ_WebsocketMessage amsg; - i32 n = (i32) lws_ring_get_count_free_elements(vhd->ring); - if (!n) { - /* forcibly make space */ - IZ_ProtocolCullLaggingClients(vhd); - n = (i32) lws_ring_get_count_free_elements(vhd->ring); - } - - if (!n) { - return; - } - - lwsl_user("LWS_CALLBACK_RECEIVE: free space %d\n", n); - - amsg.len = len; - /* notice we over-allocate by LWS_PRE... */ - amsg.payload = malloc(LWS_PRE + len); - amsg.binary = (u8) lws_frame_is_binary(wsi); - if (!amsg.payload) { - lwsl_user("OOM: dropping\n"); - return; - } - - /* ...and we copy the payload in at +LWS_PRE */ - memcpy((char *) amsg.payload + LWS_PRE, in, len); - if (!lws_ring_insert(vhd->ring, &amsg, 1)) { - IZ_WebsocketDestroyMessage(&amsg); - lwsl_user("dropping!\n"); - return; - } - - /* - * let everybody know we want to write something on them - * as soon as they are ready - */ - lws_start_foreach_llp(IZ_WSServerSessionData**, ppss, vhd->pss_list) { - lws_callback_on_writable((*ppss)->wsi); - } - - lws_end_foreach_llp(ppss, pss_list); -} - IZ_ProcedureResult IZ_WSServerCallback( struct lws* wsi, enum lws_callback_reasons reason, @@ -252,7 +9,7 @@ IZ_ProcedureResult IZ_WSServerCallback( ) { switch (reason) { case LWS_CALLBACK_PROTOCOL_INIT: - return IZ_WSServerProtocolInitialize(wsi); + return IZ_WSServerProtocolInitialize(wsi, in); case LWS_CALLBACK_PROTOCOL_DESTROY: IZ_WSServerProtocolTeardown(wsi); break; @@ -265,8 +22,7 @@ IZ_ProcedureResult IZ_WSServerCallback( case LWS_CALLBACK_SERVER_WRITEABLE: return IZ_WSServerWritable(wsi, user); case LWS_CALLBACK_RECEIVE: - IZ_WSServerOnReceive(wsi, in, len); - break; + return IZ_WSServerOnReceive(wsi, in, len); default: break; } @@ -274,10 +30,27 @@ IZ_ProcedureResult IZ_WSServerCallback( return 0; } -IZ_ProcedureResult IZ_WSServerInitialize(IZ_WSServerState* state, IZ_WSServerInitializeParams params) { +void IZ_WSServerLoadConfig(IZ_WSServerState* state, const char* config_path, u8 argc, const char* argv[]) { + // TODO unify loading of config from cmdline and config file + memcpy_s(state, sizeof(IZ_WSServerState), &IZ_DEFAULT_STATE, sizeof(IZ_WSServerState)); + + const char *cmdline_buffer; +// if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-d"))) { +// state->config.log_level = atoi(cmdline_buffer); +// } + + if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-p"))) { + state->config.port = atoi(cmdline_buffer); + } +} + +IZ_ProcedureResult IZ_WSServerInitialize(IZ_WSServerState* state, void* userdata, const char* config_path, u8 argc, const char* argv[]) { + IZ_WSServerLoadConfig(state, config_path, argc, argv); + state->userdata = userdata; + struct lws_context_creation_info info; memset(&info, 0, sizeof info); - info.port = params.port; + info.port = state->config.port; static struct lws_http_mount mount = { .mount_next = NULL, /* linked-list "next" */ @@ -298,17 +71,11 @@ IZ_ProcedureResult IZ_WSServerInitialize(IZ_WSServerState* state, IZ_WSServerIni .mountpoint_len = 1, /* char count */ .basic_auth_login_file = NULL, }; - - if (params.origin) { - mount.origin = params.origin; + if (state->config.origin) { + mount.origin = state->config.origin; } - - if (params.default_filename) { - mount.def = params.default_filename; - } - - if (params.protocol) { - mount.protocol = params.protocol; + if (state->config.default_filename) { + mount.def = state->config.default_filename; } info.mounts = &mount; @@ -334,6 +101,30 @@ IZ_ProcedureResult IZ_WSServerInitialize(IZ_WSServerState* state, IZ_WSServerIni LWS_PROTOCOL_LIST_TERM, }; info.protocols = protocols; + + static struct lws_protocol_vhost_options pvo_port = { + NULL, + NULL, + "port", /* pvo name */ + NULL /* pvo value */ + }; + pvo_port.value = (void*) &state->config.port; + + static struct lws_protocol_vhost_options pvo_app = { + &pvo_port, + NULL, + "app", + NULL, + }; + pvo_app.value = state->userdata; + + static const struct lws_protocol_vhost_options pvo = { + NULL, /* "next" pvo linked-list */ + &pvo_app, /* "child" pvo linked-list */ + NETWORK_PROTOCOL, /* protocol name we belong to on this vhost */ + "" /* ignored */ + }; + info.pvo = &pvo; info.options = ( LWS_SERVER_OPTION_VALIDATE_UTF8 | LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE diff --git a/src/packages/server/network/IZ_wsserver.h b/src/packages/server/network/IZ_wsserver.h index 7bc4759..1cc757f 100644 --- a/src/packages/server/network/IZ_wsserver.h +++ b/src/packages/server/network/IZ_wsserver.h @@ -21,34 +21,36 @@ typedef struct { const struct lws_protocols *protocol; IZ_WSServerSessionData *pss_list; /* linked-list of live pss*/ struct lws_ring *ring; /* ringbuffer holding unsent messages */ + u16* port; + const void* app; } IZ_WSServerVHostData; -IZ_ProcedureResult IZ_WSServerCallback( - struct lws*, - enum lws_callback_reasons, - void*, - void*, - size_t -); - typedef struct { u16 port; const char* origin; const char* default_filename; - const char* protocol; } IZ_WSServerInitializeParams; typedef struct { - i32 log_level; - u16 port; -} IZ_WSServerConfig; - -typedef struct { - IZ_WSServerConfig config; + IZ_WSServerInitializeParams config; + void* userdata; IZ_Websocket ws; } IZ_WSServerState; -IZ_ProcedureResult IZ_WSServerInitialize(IZ_WSServerState*, IZ_WSServerInitializeParams); +static IZ_WSServerState IZ_DEFAULT_STATE = { + .config = { + .port = 42069, + .origin = "./public", + .default_filename = "index.html", + }, + .userdata = NULL, + .ws = { + .interrupted = false, + .context = NULL, + }, +}; + +IZ_ProcedureResult IZ_WSServerInitialize(IZ_WSServerState*, void*, const char*, u8, const char**); IZ_ProcedureResult IZ_WSServerHandle(IZ_WSServerState*); @@ -56,4 +58,16 @@ void IZ_WSServerTeardown(IZ_WSServerState*); void IZ_WSServerCancelService(IZ_WSServerState*); +IZ_ProcedureResult IZ_WSServerProtocolInitialize(struct lws*, void*); + +void IZ_WSServerProtocolTeardown(struct lws*); + +IZ_ProcedureResult IZ_WSServerWritable(struct lws*, IZ_WSServerSessionData*); + +IZ_ProcedureResult IZ_WSServerOnReceive(struct lws*, void*, size_t); + +void IZ_WSServerOnOpen(struct lws*, IZ_WSServerSessionData*); + +void IZ_WSServerOnClose(struct lws*, IZ_WSServerSessionData*); + #endif