From e789b7032f01305e944d6e130d843aa7511c9b94 Mon Sep 17 00:00:00 2001 From: TheoryOfNekomata Date: Mon, 13 Jun 2022 08:55:38 +0800 Subject: [PATCH] Update example Use ring example. --- CMakeLists.txt | 2 +- src/packages/server/main.c | 109 +++---- src/packages/server/protocol_lws_minimal.c | 267 ++++++++++++++++++ .../server/protocol_lws_minimal_server_echo.c | 219 -------------- 4 files changed, 306 insertions(+), 291 deletions(-) create mode 100644 src/packages/server/protocol_lws_minimal.c delete mode 100644 src/packages/server/protocol_lws_minimal_server_echo.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 2a5d754..9d0a27c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -173,7 +173,7 @@ add_executable( src/packages/server/log/IZ_log.h src/packages/server/log/IZ_log.c src/packages/server/main.c - src/packages/server/protocol_lws_minimal_server_echo.c src/packages/server/IZ_app.c src/packages/server/IZ_app.h) + src/packages/server/protocol_lws_minimal.c src/packages/server/IZ_app.c src/packages/server/IZ_app.h) target_link_libraries( server diff --git a/src/packages/server/main.c b/src/packages/server/main.c index 7d9e83a..bc8e782 100644 --- a/src/packages/server/main.c +++ b/src/packages/server/main.c @@ -6,46 +6,34 @@ #include "log/IZ_log.h" #define LWS_PLUGIN_STATIC -#include "protocol_lws_minimal_server_echo.c" +#include "protocol_lws_minimal.c" static struct lws_protocols protocols[] = { - LWS_PLUGIN_PROTOCOL_MINIMAL_SERVER_ECHO, + //{ "http", lws_callback_http_dummy, 0, 0, 0, NULL, 0 }, + LWS_PLUGIN_PROTOCOL_MINIMAL, LWS_PROTOCOL_LIST_TERM }; -static int interrupted, port = 7681, options; - -/* pass pointers to shared vars to the protocol */ - -static const struct lws_protocol_vhost_options pvo_options = { - NULL, - NULL, - "options", /* pvo name */ - (void *)&options /* pvo value */ -}; - -static const struct lws_protocol_vhost_options pvo_interrupted = { - &pvo_options, - NULL, - "interrupted", /* pvo name */ - (void *)&interrupted /* pvo value */ -}; - -static const struct lws_protocol_vhost_options pvo = { - NULL, /* "next" pvo linked-list */ - &pvo_interrupted, /* "child" pvo linked-list */ - "lws-minimal-server-echo", /* protocol name we belong to on this vhost */ - "" /* ignored */ -}; -static const struct lws_extension extensions[] = { - { - "permessage-deflate", - lws_extension_callback_pm_deflate, - "permessage-deflate" - "; client_no_context_takeover" - "; client_max_window_bits" - }, - { NULL, NULL, NULL /* terminator */ } +static int interrupted; + +static const struct lws_http_mount mount = { + /* .mount_next */ NULL, /* linked-list "next" */ + /* .mountpoint */ "/", /* mountpoint URL */ + /* .origin */ "./mount-origin", /* serve from dir */ + /* .def */ "index.html", /* default filename */ + /* .protocol */ NULL, + /* .cgienv */ NULL, + /* .extra_mimetypes */ NULL, + /* .interpret */ NULL, + /* .cgi_timeout */ 0, + /* .cache_max_age */ 0, + /* .auth_mask */ 0, + /* .cache_reusable */ 0, + /* .cache_revalidate */ 0, + /* .cache_intermediaries */ 0, + /* .origin_protocol */ LWSMPRO_FILE, /* files in a dir */ + /* .mountpoint_len */ 1, /* char count */ + /* .basic_auth_login_file */ NULL, }; void sigint_handler(int sig) @@ -53,10 +41,12 @@ void sigint_handler(int sig) interrupted = 1; } -IZ_ProcedureResult main(i32 argc, const char **argv) +int main(int argc, const char **argv) { - const char *cmdline_buffer; - i32 logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE + struct lws_context_creation_info info; + struct lws_context *context; + const char *p; + int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE /* for LLL_ verbosity above NOTICE to be built into lws, * lws must have been configured and built with * -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */ @@ -65,53 +55,30 @@ IZ_ProcedureResult main(i32 argc, const char **argv) /* | LLL_DEBUG */; signal(SIGINT, sigint_handler); - if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-d"))) { - logs = atoi(cmdline_buffer); - } - IZ_LogInterceptWSMessages(logs); - lwsl_user("LWS minimal ws client echo + permessage-deflate + multifragment bulk message\n"); - lwsl_user(" lws-minimal-ws-client-echo [-n (no exts)] [-p port] [-o (once)]\n"); - - if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-p"))) { - port = atoi(cmdline_buffer); - } + if ((p = lws_cmdline_option(argc, argv, "-d"))) + logs = atoi(p); - if (lws_cmdline_option(argc, argv, "-o")) { - // connect once - options |= 1; - } + IZ_LogInterceptWSMessages(logs); + lwsl_user("LWS minimal ws server (lws_ring) | visit http://localhost:7681\n"); - struct lws_context_creation_info info; memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ - info.port = port; + info.port = 42069; + info.mounts = &mount; info.protocols = protocols; - info.pvo = &pvo; - if (!lws_cmdline_option(argc, argv, "-n")) - info.extensions = extensions; - info.pt_serv_buf_size = 32 * 1024; - info.options = LWS_SERVER_OPTION_VALIDATE_UTF8 | - LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE; + info.options = + LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE; - struct lws_context *context; context = lws_create_context(&info); if (!context) { lwsl_err("lws init failed\n"); return 1; } - i32 n; - while (!interrupted) { + while (n >= 0 && !interrupted) n = lws_service(context, 0); - printf("%d\n", n); - if (n < 0) { - break; - } - } lws_context_destroy(context); - lwsl_user("Completed %s\n", interrupted == 2 ? "OK" : "failed"); - - return interrupted != 2; + return 0; } diff --git a/src/packages/server/protocol_lws_minimal.c b/src/packages/server/protocol_lws_minimal.c new file mode 100644 index 0000000..b46dd1a --- /dev/null +++ b/src/packages/server/protocol_lws_minimal.c @@ -0,0 +1,267 @@ +#if !defined (LWS_PLUGIN_STATIC) +#define LWS_DLL +#define LWS_INTERNAL +#include +#endif + +#include + +#define RING_COUNT 32 +/* one of these created for each message */ + +struct msg { + void *payload; /* is malloc'd */ + size_t len; +}; + +/* one of these is created for each client connecting to us */ + +struct per_session_data__minimal { + struct per_session_data__minimal *pss_list; + struct lws *wsi; + uint32_t tail; + + unsigned int culled:1; +}; + +/* one of these is created for each vhost our protocol is used with */ + +struct per_vhost_data__minimal { + struct lws_context *context; + struct lws_vhost *vhost; + const struct lws_protocols *protocol; + + struct per_session_data__minimal *pss_list; /* linked-list of live pss*/ + + struct lws_ring *ring; /* ringbuffer holding unsent messages */ +}; + +static void +cull_lagging_clients(struct per_vhost_data__minimal *vhd) +{ + uint32_t oldest_tail = lws_ring_get_oldest_tail(vhd->ring); + struct per_session_data__minimal *old_pss = NULL; + int most = 0, before = (int)lws_ring_get_count_waiting_elements(vhd->ring, + &oldest_tail), m; + + /* + * At least one guy with the oldest tail has lagged too far, filling + * the ringbuffer with stuff waiting for them, while new stuff is + * coming in, and they must close, freeing up ringbuffer entries. + */ + + lws_start_foreach_llp_safe(struct per_session_data__minimal **, + ppss, vhd->pss_list, pss_list) { + + if ((*ppss)->tail == oldest_tail) { + old_pss = *ppss; + + lwsl_user("Killing lagging client %p\n", (*ppss)->wsi); + + lws_set_timeout((*ppss)->wsi, PENDING_TIMEOUT_LAGGING, + /* + * we may kill the wsi we came in on, + * so the actual close is deferred + */ + LWS_TO_KILL_ASYNC); + + /* + * We might try to write something before we get a + * chance to close. But this pss is now detached + * from the ring buffer. Mark this pss as culled so we + * don't try to do anything more with it. + */ + + (*ppss)->culled = 1; + + /* + * Because we can't kill it synchronously, but we + * know it's closing momentarily and don't want its + * participation any more, remove its pss from the + * vhd pss list early. (This is safe to repeat + * uselessly later in the close flow). + * + * Notice this changes *ppss! + */ + + lws_ll_fwd_remove(struct per_session_data__minimal, + pss_list, (*ppss), vhd->pss_list); + + /* use the changed *ppss so we won't skip anything */ + + continue; + + } else { + /* + * so this guy is a survivor of the cull. Let's track + * what is the largest number of pending ring elements + * for any survivor. + */ + m = (int)lws_ring_get_count_waiting_elements(vhd->ring, + &((*ppss)->tail)); + if (m > most) + most = m; + } + + } lws_end_foreach_llp_safe(ppss); + + /* it would mean we lost track of oldest... but Coverity insists */ + if (!old_pss) + return; + + /* + * Let's recover (ie, free up) all the ring slots between the + * original oldest's last one and the "worst" survivor. + */ + + lws_ring_consume_and_update_oldest_tail(vhd->ring, + struct per_session_data__minimal, &old_pss->tail, (size_t)(before - most), + vhd->pss_list, tail, pss_list); + + lwsl_user("%s: shrunk ring from %d to %d\n", __func__, before, most); +} + +/* destroys the message when everyone has had a copy of it */ + +static void +__minimal_destroy_message(void *_msg) +{ + struct msg *msg = _msg; + + free(msg->payload); + msg->payload = NULL; + msg->len = 0; +} + +static int +callback_minimal(struct lws *wsi, enum lws_callback_reasons reason, + void *user, void *in, size_t len) +{ + struct per_session_data__minimal *pss = + (struct per_session_data__minimal *)user; + struct per_vhost_data__minimal *vhd = + (struct per_vhost_data__minimal *) + lws_protocol_vh_priv_get(lws_get_vhost(wsi), + lws_get_protocol(wsi)); + const struct msg *pmsg; + struct msg amsg; + int n, m; + + switch (reason) { + case LWS_CALLBACK_PROTOCOL_INIT: + vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), + lws_get_protocol(wsi), + sizeof(struct per_vhost_data__minimal)); + vhd->context = lws_get_context(wsi); + vhd->protocol = lws_get_protocol(wsi); + vhd->vhost = lws_get_vhost(wsi); + + vhd->ring = lws_ring_create(sizeof(struct msg), RING_COUNT, + __minimal_destroy_message); + if (!vhd->ring) + return 1; + break; + + case LWS_CALLBACK_PROTOCOL_DESTROY: + lws_ring_destroy(vhd->ring); + break; + + case LWS_CALLBACK_ESTABLISHED: + /* add ourselves to the list of live pss held in the vhd */ + lwsl_user("LWS_CALLBACK_ESTABLISHED: wsi %p\n", wsi); + lws_ll_fwd_insert(pss, pss_list, vhd->pss_list); + pss->tail = lws_ring_get_oldest_tail(vhd->ring); + pss->wsi = wsi; + break; + + case LWS_CALLBACK_CLOSED: + lwsl_user("LWS_CALLBACK_CLOSED: wsi %p\n", wsi); + /* remove our closing pss from the list of live pss */ + lws_ll_fwd_remove(struct per_session_data__minimal, pss_list, + pss, vhd->pss_list); + break; + + case LWS_CALLBACK_SERVER_WRITEABLE: + if (pss->culled) + break; + pmsg = lws_ring_get_element(vhd->ring, &pss->tail); + if (!pmsg) + break; + + /* notice we allowed for LWS_PRE in the payload already */ + m = lws_write(wsi, ((unsigned char *)pmsg->payload) + + LWS_PRE, pmsg->len, LWS_WRITE_TEXT); + if (m < (int)pmsg->len) { + lwsl_err("ERROR %d writing to ws socket\n", m); + return -1; + } + + lws_ring_consume_and_update_oldest_tail( + vhd->ring, /* lws_ring object */ + struct per_session_data__minimal, /* type of objects with tails */ + &pss->tail, /* tail of guy doing the consuming */ + 1, /* number of payload objects being consumed */ + vhd->pss_list, /* head of list of objects with tails */ + tail, /* member name of tail in objects with tails */ + pss_list /* member name of next object in objects with tails */ + ); + + /* more to do for us? */ + if (lws_ring_get_element(vhd->ring, &pss->tail)) + /* come back as soon as we can write more */ + lws_callback_on_writable(pss->wsi); + break; + + case LWS_CALLBACK_RECEIVE: + n = (int)lws_ring_get_count_free_elements(vhd->ring); + if (!n) { + /* forcibly make space */ + cull_lagging_clients(vhd); + n = (int)lws_ring_get_count_free_elements(vhd->ring); + } + if (!n) + break; + + lwsl_user("LWS_CALLBACK_RECEIVE: free space %d\n", n); + + amsg.len = len; + /* notice we over-allocate by LWS_PRE... */ + amsg.payload = malloc(LWS_PRE + len); + if (!amsg.payload) { + lwsl_user("OOM: dropping\n"); + break; + } + + /* ...and we copy the payload in at +LWS_PRE */ + memcpy((char *)amsg.payload + LWS_PRE, in, len); + if (!lws_ring_insert(vhd->ring, &amsg, 1)) { + __minimal_destroy_message(&amsg); + lwsl_user("dropping!\n"); + break; + } + + /* + * let everybody know we want to write something on them + * as soon as they are ready + */ + lws_start_foreach_llp(struct per_session_data__minimal **, + ppss, vhd->pss_list) { + lws_callback_on_writable((*ppss)->wsi); + } lws_end_foreach_llp(ppss, pss_list); + break; + + default: + break; + } + + return 0; +} + +#define LWS_PLUGIN_PROTOCOL_MINIMAL \ + { \ + "lws-minimal", \ + callback_minimal, \ + sizeof(struct per_session_data__minimal), \ + 0, \ + 0, NULL, 0 \ + } diff --git a/src/packages/server/protocol_lws_minimal_server_echo.c b/src/packages/server/protocol_lws_minimal_server_echo.c deleted file mode 100644 index ae8419d..0000000 --- a/src/packages/server/protocol_lws_minimal_server_echo.c +++ /dev/null @@ -1,219 +0,0 @@ -#ifndef LWS_PLUGIN_STATIC -#define LWS_DLL -#define LWS_INTERNAL - -#include -#endif - -#include - -#define RING_DEPTH 4096 - -/* one of these created for each message */ - -struct msg { - void *payload; /* is malloc'd */ - size_t len; - char binary; - char first; - char final; -}; - -struct per_session_data__minimal_server_echo { - struct lws_ring *ring; - uint32_t msglen; - uint32_t tail; - uint8_t completed: 1; - uint8_t flow_controlled: 1; - uint8_t write_consume_pending: 1; -}; - -struct vhd_minimal_server_echo { - struct lws_context *context; - struct lws_vhost *vhost; - - int *interrupted; - int *options; -}; - -static void -__minimal_destroy_message(void *_msg) { - struct msg *msg = _msg; - - free(msg->payload); - msg->payload = NULL; - msg->len = 0; -} - -static int -callback_minimal_server_echo(struct lws *wsi, enum lws_callback_reasons reason, - void *user, void *in, size_t len) { - struct per_session_data__minimal_server_echo *pss = - (struct per_session_data__minimal_server_echo *) user; - struct vhd_minimal_server_echo *vhd = (struct vhd_minimal_server_echo *) - lws_protocol_vh_priv_get(lws_get_vhost(wsi), - lws_get_protocol(wsi)); - const struct msg *pmsg; - struct msg amsg; - int m, n, flags; - - switch (reason) { - - case LWS_CALLBACK_PROTOCOL_INIT: - vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), - lws_get_protocol(wsi), - sizeof(struct vhd_minimal_server_echo)); - if (!vhd) - return -1; - - vhd->context = lws_get_context(wsi); - vhd->vhost = lws_get_vhost(wsi); - - /* get the pointers we were passed in pvo */ - - vhd->interrupted = (int *) lws_pvo_search( - (const struct lws_protocol_vhost_options *) in, - "interrupted")->value; - vhd->options = (int *) lws_pvo_search( - (const struct lws_protocol_vhost_options *) in, - "options")->value; - break; - - case LWS_CALLBACK_ESTABLISHED: - /* generate a block of output before travis times us out */ - lwsl_warn("LWS_CALLBACK_ESTABLISHED\n"); - pss->ring = lws_ring_create(sizeof(struct msg), RING_DEPTH, - __minimal_destroy_message); - if (!pss->ring) - return 1; - pss->tail = 0; - break; - - case LWS_CALLBACK_SERVER_WRITEABLE: - - lwsl_user("LWS_CALLBACK_SERVER_WRITEABLE\n"); - - if (pss->write_consume_pending) { - /* perform the deferred fifo consume */ - lws_ring_consume_single_tail(pss->ring, &pss->tail, 1); - pss->write_consume_pending = 0; - } - - pmsg = lws_ring_get_element(pss->ring, &pss->tail); - if (!pmsg) { - lwsl_user(" (nothing in ring)\n"); - break; - } - - flags = lws_write_ws_flags( - pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT, - pmsg->first, pmsg->final); - - /* notice we allowed for LWS_PRE in the payload already */ - m = lws_write(wsi, ((unsigned char *) pmsg->payload) + - LWS_PRE, pmsg->len, (enum lws_write_protocol) flags); - if (m < (int) pmsg->len) { - lwsl_err("ERROR %d writing to ws socket\n", m); - return -1; - } - - lwsl_user(" wrote %d: flags: 0x%x first: %d final %d\n", - m, flags, pmsg->first, pmsg->final); - /* - * Workaround deferred deflate in pmd extension by only - * consuming the fifo entry when we are certain it has been - * fully deflated at the next WRITABLE callback. You only need - * this if you're using pmd. - */ - pss->write_consume_pending = 1; - lws_callback_on_writable(wsi); - - if (pss->flow_controlled && - (int) lws_ring_get_count_free_elements(pss->ring) > RING_DEPTH - 5) { - lws_rx_flow_control(wsi, 1); - pss->flow_controlled = 0; - } - - if ((*vhd->options & 1) && pmsg && pmsg->final) - pss->completed = 1; - - break; - - case LWS_CALLBACK_RECEIVE: - - lwsl_user("LWS_CALLBACK_RECEIVE: %4d (rpp %5d, first %d, " - "last %d, bin %d, msglen %d (+ %d = %d))\n", - (int) len, (int) lws_remaining_packet_payload(wsi), - lws_is_first_fragment(wsi), - lws_is_final_fragment(wsi), - lws_frame_is_binary(wsi), pss->msglen, (int) len, - (int) pss->msglen + (int) len); - - if (len) { ; - //puts((const char *)in); - //lwsl_hexdump_notice(in, len); - } - - amsg.first = (char) lws_is_first_fragment(wsi); - amsg.final = (char) lws_is_final_fragment(wsi); - amsg.binary = (char) lws_frame_is_binary(wsi); - n = (int) lws_ring_get_count_free_elements(pss->ring); - if (!n) { - lwsl_user("dropping!\n"); - break; - } - - if (amsg.final) - pss->msglen = 0; - else - pss->msglen += (uint32_t) len; - - amsg.len = len; - /* notice we over-allocate by LWS_PRE */ - amsg.payload = malloc(LWS_PRE + len); - if (!amsg.payload) { - lwsl_user("OOM: dropping\n"); - break; - } - - memcpy((char *) amsg.payload + LWS_PRE, in, len); - if (!lws_ring_insert(pss->ring, &amsg, 1)) { - __minimal_destroy_message(&amsg); - lwsl_user("dropping!\n"); - break; - } - lws_callback_on_writable(wsi); - - if (n < 3 && !pss->flow_controlled) { - pss->flow_controlled = 1; - lws_rx_flow_control(wsi, 0); - } - break; - - case LWS_CALLBACK_CLOSED: - lwsl_user("LWS_CALLBACK_CLOSED\n"); - lws_ring_destroy(pss->ring); - - if (*vhd->options & 1) { - if (!*vhd->interrupted) - *vhd->interrupted = 1 + pss->completed; - lws_cancel_service(lws_get_context(wsi)); - } - break; - - default: - break; - } - - return 0; -} - -#define LWS_PLUGIN_PROTOCOL_MINIMAL_SERVER_ECHO \ - { \ - "lws-minimal-server-echo", \ - callback_minimal_server_echo, \ - sizeof(struct per_session_data__minimal_server_echo), \ - 1024, \ - 0, NULL, 0 \ - } -