Browse Source

Update example

Use ring example.
feature/data-structs
TheoryOfNekomata 1 year ago
parent
commit
e789b7032f
4 changed files with 306 additions and 291 deletions
  1. +1
    -1
      CMakeLists.txt
  2. +38
    -71
      src/packages/server/main.c
  3. +267
    -0
      src/packages/server/protocol_lws_minimal.c
  4. +0
    -219
      src/packages/server/protocol_lws_minimal_server_echo.c

+ 1
- 1
CMakeLists.txt View File

@@ -173,7 +173,7 @@ add_executable(
src/packages/server/log/IZ_log.h
src/packages/server/log/IZ_log.c
src/packages/server/main.c
src/packages/server/protocol_lws_minimal_server_echo.c src/packages/server/IZ_app.c src/packages/server/IZ_app.h)
src/packages/server/protocol_lws_minimal.c src/packages/server/IZ_app.c src/packages/server/IZ_app.h)

target_link_libraries(
server


+ 38
- 71
src/packages/server/main.c View File

@@ -6,46 +6,34 @@
#include "log/IZ_log.h"

#define LWS_PLUGIN_STATIC
#include "protocol_lws_minimal_server_echo.c"
#include "protocol_lws_minimal.c"

static struct lws_protocols protocols[] = {
LWS_PLUGIN_PROTOCOL_MINIMAL_SERVER_ECHO,
//{ "http", lws_callback_http_dummy, 0, 0, 0, NULL, 0 },
LWS_PLUGIN_PROTOCOL_MINIMAL,
LWS_PROTOCOL_LIST_TERM
};

static int interrupted, port = 7681, options;

/* pass pointers to shared vars to the protocol */

static const struct lws_protocol_vhost_options pvo_options = {
NULL,
NULL,
"options", /* pvo name */
(void *)&options /* pvo value */
};

static const struct lws_protocol_vhost_options pvo_interrupted = {
&pvo_options,
NULL,
"interrupted", /* pvo name */
(void *)&interrupted /* pvo value */
};

static const struct lws_protocol_vhost_options pvo = {
NULL, /* "next" pvo linked-list */
&pvo_interrupted, /* "child" pvo linked-list */
"lws-minimal-server-echo", /* protocol name we belong to on this vhost */
"" /* ignored */
};
static const struct lws_extension extensions[] = {
{
"permessage-deflate",
lws_extension_callback_pm_deflate,
"permessage-deflate"
"; client_no_context_takeover"
"; client_max_window_bits"
},
{ NULL, NULL, NULL /* terminator */ }
static int interrupted;

static const struct lws_http_mount mount = {
/* .mount_next */ NULL, /* linked-list "next" */
/* .mountpoint */ "/", /* mountpoint URL */
/* .origin */ "./mount-origin", /* serve from dir */
/* .def */ "index.html", /* default filename */
/* .protocol */ NULL,
/* .cgienv */ NULL,
/* .extra_mimetypes */ NULL,
/* .interpret */ NULL,
/* .cgi_timeout */ 0,
/* .cache_max_age */ 0,
/* .auth_mask */ 0,
/* .cache_reusable */ 0,
/* .cache_revalidate */ 0,
/* .cache_intermediaries */ 0,
/* .origin_protocol */ LWSMPRO_FILE, /* files in a dir */
/* .mountpoint_len */ 1, /* char count */
/* .basic_auth_login_file */ NULL,
};

void sigint_handler(int sig)
@@ -53,10 +41,12 @@ void sigint_handler(int sig)
interrupted = 1;
}

IZ_ProcedureResult main(i32 argc, const char **argv)
int main(int argc, const char **argv)
{
const char *cmdline_buffer;
i32 logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
struct lws_context_creation_info info;
struct lws_context *context;
const char *p;
int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
/* for LLL_ verbosity above NOTICE to be built into lws,
* lws must have been configured and built with
* -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */
@@ -65,53 +55,30 @@ IZ_ProcedureResult main(i32 argc, const char **argv)
/* | LLL_DEBUG */;

signal(SIGINT, sigint_handler);
if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-d"))) {
logs = atoi(cmdline_buffer);
}

IZ_LogInterceptWSMessages(logs);
lwsl_user("LWS minimal ws client echo + permessage-deflate + multifragment bulk message\n");
lwsl_user(" lws-minimal-ws-client-echo [-n (no exts)] [-p port] [-o (once)]\n");

if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-p"))) {
port = atoi(cmdline_buffer);
}
if ((p = lws_cmdline_option(argc, argv, "-d")))
logs = atoi(p);

if (lws_cmdline_option(argc, argv, "-o")) {
// connect once
options |= 1;
}
IZ_LogInterceptWSMessages(logs);
lwsl_user("LWS minimal ws server (lws_ring) | visit http://localhost:7681\n");

struct lws_context_creation_info info;
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
info.port = port;
info.port = 42069;
info.mounts = &mount;
info.protocols = protocols;
info.pvo = &pvo;
if (!lws_cmdline_option(argc, argv, "-n"))
info.extensions = extensions;
info.pt_serv_buf_size = 32 * 1024;
info.options = LWS_SERVER_OPTION_VALIDATE_UTF8 |
LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE;
info.options =
LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE;

struct lws_context *context;
context = lws_create_context(&info);
if (!context) {
lwsl_err("lws init failed\n");
return 1;
}

i32 n;
while (!interrupted) {
while (n >= 0 && !interrupted)
n = lws_service(context, 0);
printf("%d\n", n);
if (n < 0) {
break;
}
}

lws_context_destroy(context);

lwsl_user("Completed %s\n", interrupted == 2 ? "OK" : "failed");

return interrupted != 2;
return 0;
}

+ 267
- 0
src/packages/server/protocol_lws_minimal.c View File

@@ -0,0 +1,267 @@
#if !defined (LWS_PLUGIN_STATIC)
#define LWS_DLL
#define LWS_INTERNAL
#include <libwebsockets.h>
#endif

#include <string.h>

#define RING_COUNT 32
/* one of these created for each message */

struct msg {
void *payload; /* is malloc'd */
size_t len;
};

/* one of these is created for each client connecting to us */

struct per_session_data__minimal {
struct per_session_data__minimal *pss_list;
struct lws *wsi;
uint32_t tail;

unsigned int culled:1;
};

/* one of these is created for each vhost our protocol is used with */

struct per_vhost_data__minimal {
struct lws_context *context;
struct lws_vhost *vhost;
const struct lws_protocols *protocol;

struct per_session_data__minimal *pss_list; /* linked-list of live pss*/

struct lws_ring *ring; /* ringbuffer holding unsent messages */
};

static void
cull_lagging_clients(struct per_vhost_data__minimal *vhd)
{
uint32_t oldest_tail = lws_ring_get_oldest_tail(vhd->ring);
struct per_session_data__minimal *old_pss = NULL;
int most = 0, before = (int)lws_ring_get_count_waiting_elements(vhd->ring,
&oldest_tail), m;

/*
* At least one guy with the oldest tail has lagged too far, filling
* the ringbuffer with stuff waiting for them, while new stuff is
* coming in, and they must close, freeing up ringbuffer entries.
*/

lws_start_foreach_llp_safe(struct per_session_data__minimal **,
ppss, vhd->pss_list, pss_list) {

if ((*ppss)->tail == oldest_tail) {
old_pss = *ppss;

lwsl_user("Killing lagging client %p\n", (*ppss)->wsi);

lws_set_timeout((*ppss)->wsi, PENDING_TIMEOUT_LAGGING,
/*
* we may kill the wsi we came in on,
* so the actual close is deferred
*/
LWS_TO_KILL_ASYNC);

/*
* We might try to write something before we get a
* chance to close. But this pss is now detached
* from the ring buffer. Mark this pss as culled so we
* don't try to do anything more with it.
*/

(*ppss)->culled = 1;

/*
* Because we can't kill it synchronously, but we
* know it's closing momentarily and don't want its
* participation any more, remove its pss from the
* vhd pss list early. (This is safe to repeat
* uselessly later in the close flow).
*
* Notice this changes *ppss!
*/

lws_ll_fwd_remove(struct per_session_data__minimal,
pss_list, (*ppss), vhd->pss_list);

/* use the changed *ppss so we won't skip anything */

continue;

} else {
/*
* so this guy is a survivor of the cull. Let's track
* what is the largest number of pending ring elements
* for any survivor.
*/
m = (int)lws_ring_get_count_waiting_elements(vhd->ring,
&((*ppss)->tail));
if (m > most)
most = m;
}

} lws_end_foreach_llp_safe(ppss);

/* it would mean we lost track of oldest... but Coverity insists */
if (!old_pss)
return;

/*
* Let's recover (ie, free up) all the ring slots between the
* original oldest's last one and the "worst" survivor.
*/

lws_ring_consume_and_update_oldest_tail(vhd->ring,
struct per_session_data__minimal, &old_pss->tail, (size_t)(before - most),
vhd->pss_list, tail, pss_list);

lwsl_user("%s: shrunk ring from %d to %d\n", __func__, before, most);
}

/* destroys the message when everyone has had a copy of it */

static void
__minimal_destroy_message(void *_msg)
{
struct msg *msg = _msg;

free(msg->payload);
msg->payload = NULL;
msg->len = 0;
}

static int
callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
struct per_session_data__minimal *pss =
(struct per_session_data__minimal *)user;
struct per_vhost_data__minimal *vhd =
(struct per_vhost_data__minimal *)
lws_protocol_vh_priv_get(lws_get_vhost(wsi),
lws_get_protocol(wsi));
const struct msg *pmsg;
struct msg amsg;
int n, m;

switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT:
vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
lws_get_protocol(wsi),
sizeof(struct per_vhost_data__minimal));
vhd->context = lws_get_context(wsi);
vhd->protocol = lws_get_protocol(wsi);
vhd->vhost = lws_get_vhost(wsi);

vhd->ring = lws_ring_create(sizeof(struct msg), RING_COUNT,
__minimal_destroy_message);
if (!vhd->ring)
return 1;
break;

case LWS_CALLBACK_PROTOCOL_DESTROY:
lws_ring_destroy(vhd->ring);
break;

case LWS_CALLBACK_ESTABLISHED:
/* add ourselves to the list of live pss held in the vhd */
lwsl_user("LWS_CALLBACK_ESTABLISHED: wsi %p\n", wsi);
lws_ll_fwd_insert(pss, pss_list, vhd->pss_list);
pss->tail = lws_ring_get_oldest_tail(vhd->ring);
pss->wsi = wsi;
break;

case LWS_CALLBACK_CLOSED:
lwsl_user("LWS_CALLBACK_CLOSED: wsi %p\n", wsi);
/* remove our closing pss from the list of live pss */
lws_ll_fwd_remove(struct per_session_data__minimal, pss_list,
pss, vhd->pss_list);
break;

case LWS_CALLBACK_SERVER_WRITEABLE:
if (pss->culled)
break;
pmsg = lws_ring_get_element(vhd->ring, &pss->tail);
if (!pmsg)
break;

/* notice we allowed for LWS_PRE in the payload already */
m = lws_write(wsi, ((unsigned char *)pmsg->payload) +
LWS_PRE, pmsg->len, LWS_WRITE_TEXT);
if (m < (int)pmsg->len) {
lwsl_err("ERROR %d writing to ws socket\n", m);
return -1;
}

lws_ring_consume_and_update_oldest_tail(
vhd->ring, /* lws_ring object */
struct per_session_data__minimal, /* type of objects with tails */
&pss->tail, /* tail of guy doing the consuming */
1, /* number of payload objects being consumed */
vhd->pss_list, /* head of list of objects with tails */
tail, /* member name of tail in objects with tails */
pss_list /* member name of next object in objects with tails */
);

/* more to do for us? */
if (lws_ring_get_element(vhd->ring, &pss->tail))
/* come back as soon as we can write more */
lws_callback_on_writable(pss->wsi);
break;

case LWS_CALLBACK_RECEIVE:
n = (int)lws_ring_get_count_free_elements(vhd->ring);
if (!n) {
/* forcibly make space */
cull_lagging_clients(vhd);
n = (int)lws_ring_get_count_free_elements(vhd->ring);
}
if (!n)
break;

lwsl_user("LWS_CALLBACK_RECEIVE: free space %d\n", n);

amsg.len = len;
/* notice we over-allocate by LWS_PRE... */
amsg.payload = malloc(LWS_PRE + len);
if (!amsg.payload) {
lwsl_user("OOM: dropping\n");
break;
}

/* ...and we copy the payload in at +LWS_PRE */
memcpy((char *)amsg.payload + LWS_PRE, in, len);
if (!lws_ring_insert(vhd->ring, &amsg, 1)) {
__minimal_destroy_message(&amsg);
lwsl_user("dropping!\n");
break;
}

/*
* let everybody know we want to write something on them
* as soon as they are ready
*/
lws_start_foreach_llp(struct per_session_data__minimal **,
ppss, vhd->pss_list) {
lws_callback_on_writable((*ppss)->wsi);
} lws_end_foreach_llp(ppss, pss_list);
break;

default:
break;
}

return 0;
}

#define LWS_PLUGIN_PROTOCOL_MINIMAL \
{ \
"lws-minimal", \
callback_minimal, \
sizeof(struct per_session_data__minimal), \
0, \
0, NULL, 0 \
}

+ 0
- 219
src/packages/server/protocol_lws_minimal_server_echo.c View File

@@ -1,219 +0,0 @@
#ifndef LWS_PLUGIN_STATIC
#define LWS_DLL
#define LWS_INTERNAL

#include <libwebsockets.h>
#endif

#include <string.h>

#define RING_DEPTH 4096

/* one of these created for each message */

struct msg {
void *payload; /* is malloc'd */
size_t len;
char binary;
char first;
char final;
};

struct per_session_data__minimal_server_echo {
struct lws_ring *ring;
uint32_t msglen;
uint32_t tail;
uint8_t completed: 1;
uint8_t flow_controlled: 1;
uint8_t write_consume_pending: 1;
};

struct vhd_minimal_server_echo {
struct lws_context *context;
struct lws_vhost *vhost;

int *interrupted;
int *options;
};

static void
__minimal_destroy_message(void *_msg) {
struct msg *msg = _msg;

free(msg->payload);
msg->payload = NULL;
msg->len = 0;
}

static int
callback_minimal_server_echo(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len) {
struct per_session_data__minimal_server_echo *pss =
(struct per_session_data__minimal_server_echo *) user;
struct vhd_minimal_server_echo *vhd = (struct vhd_minimal_server_echo *)
lws_protocol_vh_priv_get(lws_get_vhost(wsi),
lws_get_protocol(wsi));
const struct msg *pmsg;
struct msg amsg;
int m, n, flags;

switch (reason) {

case LWS_CALLBACK_PROTOCOL_INIT:
vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
lws_get_protocol(wsi),
sizeof(struct vhd_minimal_server_echo));
if (!vhd)
return -1;

vhd->context = lws_get_context(wsi);
vhd->vhost = lws_get_vhost(wsi);

/* get the pointers we were passed in pvo */

vhd->interrupted = (int *) lws_pvo_search(
(const struct lws_protocol_vhost_options *) in,
"interrupted")->value;
vhd->options = (int *) lws_pvo_search(
(const struct lws_protocol_vhost_options *) in,
"options")->value;
break;

case LWS_CALLBACK_ESTABLISHED:
/* generate a block of output before travis times us out */
lwsl_warn("LWS_CALLBACK_ESTABLISHED\n");
pss->ring = lws_ring_create(sizeof(struct msg), RING_DEPTH,
__minimal_destroy_message);
if (!pss->ring)
return 1;
pss->tail = 0;
break;

case LWS_CALLBACK_SERVER_WRITEABLE:

lwsl_user("LWS_CALLBACK_SERVER_WRITEABLE\n");

if (pss->write_consume_pending) {
/* perform the deferred fifo consume */
lws_ring_consume_single_tail(pss->ring, &pss->tail, 1);
pss->write_consume_pending = 0;
}

pmsg = lws_ring_get_element(pss->ring, &pss->tail);
if (!pmsg) {
lwsl_user(" (nothing in ring)\n");
break;
}

flags = lws_write_ws_flags(
pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT,
pmsg->first, pmsg->final);

/* notice we allowed for LWS_PRE in the payload already */
m = lws_write(wsi, ((unsigned char *) pmsg->payload) +
LWS_PRE, pmsg->len, (enum lws_write_protocol) flags);
if (m < (int) pmsg->len) {
lwsl_err("ERROR %d writing to ws socket\n", m);
return -1;
}

lwsl_user(" wrote %d: flags: 0x%x first: %d final %d\n",
m, flags, pmsg->first, pmsg->final);
/*
* Workaround deferred deflate in pmd extension by only
* consuming the fifo entry when we are certain it has been
* fully deflated at the next WRITABLE callback. You only need
* this if you're using pmd.
*/
pss->write_consume_pending = 1;
lws_callback_on_writable(wsi);

if (pss->flow_controlled &&
(int) lws_ring_get_count_free_elements(pss->ring) > RING_DEPTH - 5) {
lws_rx_flow_control(wsi, 1);
pss->flow_controlled = 0;
}

if ((*vhd->options & 1) && pmsg && pmsg->final)
pss->completed = 1;

break;

case LWS_CALLBACK_RECEIVE:

lwsl_user("LWS_CALLBACK_RECEIVE: %4d (rpp %5d, first %d, "
"last %d, bin %d, msglen %d (+ %d = %d))\n",
(int) len, (int) lws_remaining_packet_payload(wsi),
lws_is_first_fragment(wsi),
lws_is_final_fragment(wsi),
lws_frame_is_binary(wsi), pss->msglen, (int) len,
(int) pss->msglen + (int) len);

if (len) { ;
//puts((const char *)in);
//lwsl_hexdump_notice(in, len);
}

amsg.first = (char) lws_is_first_fragment(wsi);
amsg.final = (char) lws_is_final_fragment(wsi);
amsg.binary = (char) lws_frame_is_binary(wsi);
n = (int) lws_ring_get_count_free_elements(pss->ring);
if (!n) {
lwsl_user("dropping!\n");
break;
}

if (amsg.final)
pss->msglen = 0;
else
pss->msglen += (uint32_t) len;

amsg.len = len;
/* notice we over-allocate by LWS_PRE */
amsg.payload = malloc(LWS_PRE + len);
if (!amsg.payload) {
lwsl_user("OOM: dropping\n");
break;
}

memcpy((char *) amsg.payload + LWS_PRE, in, len);
if (!lws_ring_insert(pss->ring, &amsg, 1)) {
__minimal_destroy_message(&amsg);
lwsl_user("dropping!\n");
break;
}
lws_callback_on_writable(wsi);

if (n < 3 && !pss->flow_controlled) {
pss->flow_controlled = 1;
lws_rx_flow_control(wsi, 0);
}
break;

case LWS_CALLBACK_CLOSED:
lwsl_user("LWS_CALLBACK_CLOSED\n");
lws_ring_destroy(pss->ring);

if (*vhd->options & 1) {
if (!*vhd->interrupted)
*vhd->interrupted = 1 + pss->completed;
lws_cancel_service(lws_get_context(wsi));
}
break;

default:
break;
}

return 0;
}

#define LWS_PLUGIN_PROTOCOL_MINIMAL_SERVER_ECHO \
{ \
"lws-minimal-server-echo", \
callback_minimal_server_echo, \
sizeof(struct per_session_data__minimal_server_echo), \
1024, \
0, NULL, 0 \
}


Loading…
Cancel
Save