Browse Source

Add write logic from client

Connect client to server, save connection details.
feature/data-structs
TheoryOfNekomata 2 years ago
parent
commit
67036afd8a
12 changed files with 492 additions and 398 deletions
  1. +2
    -0
      CMakeLists.txt
  2. +65
    -70
      src/packages/game/IZ_app.c
  3. +1
    -1
      src/packages/game/IZ_config.c
  4. +1
    -0
      src/packages/game/network/IZ_websocket.h
  5. +38
    -5
      src/packages/game/network/IZ_wsclient.c
  6. +19
    -8
      src/packages/game/network/IZ_wsclient.h
  7. +268
    -25
      src/packages/server/IZ_app.c
  8. +1
    -15
      src/packages/server/IZ_app.h
  9. +8
    -0
      src/packages/server/IZ_config.c
  10. +10
    -0
      src/packages/server/IZ_config.h
  11. +49
    -258
      src/packages/server/network/IZ_wsserver.c
  12. +30
    -16
      src/packages/server/network/IZ_wsserver.h

+ 2
- 0
CMakeLists.txt View File

@@ -186,6 +186,8 @@ add_executable(
src/packages/server/main.c
src/packages/server/IZ_app.c
src/packages/server/IZ_app.h
src/packages/server/IZ_config.c
src/packages/server/IZ_config.h
src/packages/server/network/IZ_wsserver.c
src/packages/server/network/IZ_wsserver.h
src/packages/server/network/IZ_websocket.h


+ 65
- 70
src/packages/game/IZ_app.c View File

@@ -1,7 +1,7 @@
#include "IZ_app.h"

IZ_ProcedureResult IZ_AppWSClientInitialize(IZ_App* app, IZ_WSClientInitializeParams params) {
if (IZ_WSClientInitialize(&app->client, params)) {
IZ_ProcedureResult IZ_AppWSClientInitialize(IZ_App* app) {
if (IZ_WSClientInitialize(&app->client)) {
printf("error\n");
return -1;
}
@@ -10,14 +10,7 @@ IZ_ProcedureResult IZ_AppWSClientInitialize(IZ_App* app, IZ_WSClientInitializePa

IZ_ProcedureResult IZ_AppConnect(void* app_raw) {
IZ_App* app = app_raw;
IZ_WSClientInitializeParams params = {
.address = "127.0.0.1",
.path = "/",
.port = 42069,
.userdata = app,
};

if (IZ_AppWSClientInitialize(app, params)) {
if (IZ_AppWSClientInitialize(app)) {
return -1;
}

@@ -37,11 +30,29 @@ IZ_ProcedureResult IZ_AppConnect(void* app_raw) {
return result;
}

void IZ_AppEstablishConnection(IZ_App* app) {
void IZ_AppHandleNetworkingOutboundEvents(IZ_App* app) {
struct lws* wsi = app->client.ws.connection;
printf("%p\n", wsi);
if (!wsi) {
return;
}
IZ_WSClientVHostData* vhd = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

IZ_WebsocketMessage* amsg = IZ_WSClientCreateMessage(wsi, false, "hello", 5);
lws_ring_insert(vhd->ring, amsg, 1);
lws_callback_on_writable(wsi);
}

void IZ_AppEstablishConnection(IZ_App* app, IZ_WSClientInitializeParams params) {
if (app->client.ws.context) {
return;
}

app->client.params = params;
app->client.userdata = app;
app->client_thread = SDL_CreateThread(IZ_AppConnect, "networking", app);
SDL_DetachThread(app->client_thread);
}
@@ -108,9 +119,15 @@ IZ_ProcedureResult IZ_AppHandleSDLEvents(IZ_App* app) {

if (e.type == SDL_KEYDOWN) {
if (e.key.keysym.sym == SDLK_PAGEUP) {
IZ_AppEstablishConnection(app);
IZ_AppEstablishConnection(app, (IZ_WSClientInitializeParams){
.address = "127.0.0.1",
.path = "/",
.port = 42069,
});
} else if (e.key.keysym.sym == SDLK_PAGEDOWN) {
IZ_AppCloseConnection(app);
} else if (e.key.keysym.sym == SDLK_INSERT) {
IZ_AppHandleNetworkingOutboundEvents(app);
}
}

@@ -184,15 +201,12 @@ IZ_ProcedureResult IZ_AppRun(IZ_App* app, u8 argc, const char* argv[]) {

// TODO do audio processing
// TODO do networking
//if (app->client.connection) {
// FIXME stuck in infinite loop
//IZ_WSClientHandle(&app->client);
//}

if (IZ_AppHandleInputEvents(app)) {
break;
}


IZ_VideoUpdate(&app->video_state, app->ticks, &app->input_state);
}

@@ -275,7 +289,6 @@ void IZ_WSClientProtocolTeardown(struct lws* wsi) {
lws_get_protocol(wsi)
);

vhd->finished = true;
if (vhd->ring) {
lws_ring_destroy(vhd->ring);
}
@@ -290,6 +303,8 @@ void IZ_WSClientConnectionError(struct lws* wsi, void* in) {
);

lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", in ? (char *)in : "(null)");
IZ_App* app = (IZ_App*) vhd->app;
app->client.ws.connection = NULL;
vhd->client_wsi = NULL;
lws_sul_schedule(
vhd->context,
@@ -306,7 +321,8 @@ IZ_ProcedureResult IZ_WSClientOnOpen(struct lws* wsi, IZ_WSClientSessionData* ps
lws_get_protocol(wsi)
);

vhd->established = true;
IZ_App* app = (IZ_App*) vhd->app;
app->client.ws.connection = wsi;
pss->ring = lws_ring_create(sizeof(IZ_WebsocketMessage), RING_COUNT,IZ_WebsocketDestroyMessage);
if (!pss->ring) {
return -1;
@@ -321,8 +337,9 @@ void IZ_WSClientOnClose(struct lws* wsi) {
lws_get_protocol(wsi)
);

IZ_App* app = (IZ_App*) vhd->app;
app->client.ws.connection = NULL;
vhd->client_wsi = NULL;
vhd->established = false;
lws_sul_schedule(
vhd->context,
0,
@@ -367,87 +384,65 @@ IZ_ProcedureResult IZ_WSClientWritable(struct lws* wsi) {
return 0;
}

void IZ_WSClientOnReceive(struct lws* wsi, IZ_WSClientSessionData* pss, void* in, size_t len) {
lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: %4d (rpp %5d, first %d, last %d, bin %d)\n",
(int)len, (int)lws_remaining_packet_payload(wsi),
lws_is_first_fragment(wsi),
lws_is_final_fragment(wsi),
lws_frame_is_binary(wsi));

// lwsl_hexdump_notice(in, len);
IZ_WebsocketMessage* IZ_WSClientCreateMessage(struct lws* wsi, bool binary, void* in, size_t len) {
IZ_WebsocketMessage amsg;
amsg.first = (char)lws_is_first_fragment(wsi);
amsg.final = (char)lws_is_final_fragment(wsi);
amsg.binary = (char)lws_frame_is_binary(wsi);
amsg.binary = binary;

amsg.len = len;
/* notice we over-allocate by LWS_PRE */
amsg.payload = malloc(LWS_PRE + len);
if (!amsg.payload) {
return NULL;
}

memcpy((char*) amsg.payload + LWS_PRE, in, len);
return &amsg;
}

void IZ_WSClientOnReceive(struct lws* wsi, IZ_WSClientSessionData* pss, void* in, size_t len) {
i32 n = (i32) lws_ring_get_count_free_elements(pss->ring);
if (!n) {
lwsl_user("dropping!\n");
return;
}

amsg.len = len;
/* notice we over-allocate by LWS_PRE */
amsg.payload = malloc(LWS_PRE + len);
if (!amsg.payload) {
lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: %4d (rpp %5d, first %d, last %d, bin %d)\n",
(int)len, (int)lws_remaining_packet_payload(wsi),
lws_is_first_fragment(wsi),
lws_is_final_fragment(wsi),
lws_frame_is_binary(wsi));

// lwsl_hexdump_notice(in, len);

IZ_WebsocketMessage* amsg = IZ_WSClientCreateMessage(wsi, (bool) lws_frame_is_binary(wsi), in, len);
if (!amsg) {
lwsl_user("OOM: dropping\n");
return;
}

memcpy((char *)amsg.payload + LWS_PRE, in, len);
IZ_WSClientVHostData* vhd = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

IZ_App* app = (IZ_App*) vhd->app;
if (amsg.binary) {
if (amsg->binary) {
IZ_AppHandleNetworkingInboundBinaryEvents(app, in, len);
} else {
IZ_AppHandleNetworkingInboundTextEvents(app, in, len);
}

if (!lws_ring_insert(pss->ring, &amsg, 1)) {
IZ_WebsocketDestroyMessage(&amsg);
if (!lws_ring_insert(pss->ring, amsg, 1)) {
IZ_WebsocketDestroyMessage(amsg);
lwsl_user("dropping!\n");
return;
}
lws_callback_on_writable(wsi);

if (!pss->flow_controlled && n < 3) {
pss->flow_controlled = 1;
pss->flow_controlled = true;
lws_rx_flow_control(wsi, 0);
}
}

IZ_ProcedureResult IZ_WSClientCallback(
struct lws* wsi,
enum lws_callback_reasons reason,
void* user,
void* in,
size_t len
) {
switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT:
return IZ_WSClientProtocolInitialize(wsi, in);
case LWS_CALLBACK_PROTOCOL_DESTROY:
IZ_WSClientProtocolTeardown(wsi);
break;
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
IZ_WSClientConnectionError(wsi, in);
break;
case LWS_CALLBACK_CLIENT_ESTABLISHED:
return IZ_WSClientOnOpen(wsi, user);
case LWS_CALLBACK_CLIENT_CLOSED:
IZ_WSClientOnClose(wsi);
break;
case LWS_CALLBACK_CLIENT_RECEIVE:
IZ_WSClientOnReceive(wsi, user, in, len);
break;
case LWS_CALLBACK_CLIENT_WRITEABLE:
return IZ_WSClientWritable(wsi);
default:
break;
}

return lws_callback_http_dummy(wsi, reason, user, in, len);
}

+ 1
- 1
src/packages/game/IZ_config.c View File

@@ -4,5 +4,5 @@ void IZ_ConfigGetDefaultPath(const char* config_path, size_t string_size) {
//const char* config_path_dir = SDL_GetPrefPath("Modal Studios", APP_NAME);
const char* config_path_dir = SDL_GetBasePath();
memcpy_s(config_path, string_size, config_path_dir, 128);
strcat_s(config_path, string_size, "config.ini");
strcat_s(config_path, string_size, "config-game.ini");
}

+ 1
- 0
src/packages/game/network/IZ_websocket.h View File

@@ -18,6 +18,7 @@ typedef struct {

typedef struct {
struct lws_context* context;
struct lws* connection;
u8 interrupted: 1;
} IZ_Websocket;



+ 38
- 5
src/packages/game/network/IZ_wsclient.c View File

@@ -1,6 +1,39 @@
#include "IZ_wsclient.h"

IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState* state, IZ_WSClientInitializeParams params) {
IZ_ProcedureResult IZ_WSClientCallback(
struct lws* wsi,
enum lws_callback_reasons reason,
void* user,
void* in,
size_t len
) {
switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT:
return IZ_WSClientProtocolInitialize(wsi, in);
case LWS_CALLBACK_PROTOCOL_DESTROY:
IZ_WSClientProtocolTeardown(wsi);
break;
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
IZ_WSClientConnectionError(wsi, in);
break;
case LWS_CALLBACK_CLIENT_ESTABLISHED:
return IZ_WSClientOnOpen(wsi, user);
case LWS_CALLBACK_CLIENT_CLOSED:
IZ_WSClientOnClose(wsi);
break;
case LWS_CALLBACK_CLIENT_RECEIVE:
IZ_WSClientOnReceive(wsi, user, in, len);
break;
case LWS_CALLBACK_CLIENT_WRITEABLE:
return IZ_WSClientWritable(wsi);
default:
break;
}

return lws_callback_http_dummy(wsi, reason, user, in, len);
}

IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState* state) {
struct lws_context_creation_info info;
memset(&info, 0, sizeof info);
info.port = CONTEXT_PORT_NO_LISTEN;
@@ -25,7 +58,7 @@ IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState* state, IZ_WSClientIni
"address", /* pvo name */
"localhost" /* pvo value */
};
pvo_address.value = params.address;
pvo_address.value = state->params.address;

static struct lws_protocol_vhost_options pvo_path = {
&pvo_address,
@@ -33,7 +66,7 @@ IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState* state, IZ_WSClientIni
"path", /* pvo name */
"/" /* pvo value */
};
pvo_path.value = params.path;
pvo_path.value = state->params.path;

static struct lws_protocol_vhost_options pvo_port = {
&pvo_path,
@@ -41,7 +74,7 @@ IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState* state, IZ_WSClientIni
"port", /* pvo name */
NULL /* pvo value */
};
pvo_port.value = (void*) &params.port;
pvo_port.value = (void*) &state->params.port;

static struct lws_protocol_vhost_options pvo_app = {
&pvo_port,
@@ -49,7 +82,7 @@ IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState* state, IZ_WSClientIni
"app",
NULL,
};
pvo_app.value = params.userdata;
pvo_app.value = state->userdata;

static const struct lws_protocol_vhost_options pvo = {
NULL, /* "next" pvo linked-list */


+ 19
- 8
src/packages/game/network/IZ_wsclient.h View File

@@ -18,8 +18,6 @@ typedef struct {
struct lws_client_connect_info i;
struct lws *client_wsi;

u8 finished: 1;
u8 established: 1;
const char* address;
const char* path;
u16* port;
@@ -29,23 +27,22 @@ typedef struct {
typedef struct {
struct lws_ring *ring;
u32 tail;
char flow_controlled;
u8 completed: 1;
u8 write_consume_pending: 1;
unsigned char flow_controlled: 1;
} IZ_WSClientSessionData;

typedef struct {
const char* address;
const char* path;
u16 port;
void* userdata;
} IZ_WSClientInitializeParams;

typedef struct {
IZ_Websocket ws;
void* userdata;
IZ_WSClientInitializeParams params;
} IZ_WSClientState;

IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState*, IZ_WSClientInitializeParams);
IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState*);

IZ_ProcedureResult IZ_WSClientHandle(IZ_WSClientState*);

@@ -53,6 +50,20 @@ void IZ_WSClientTeardown(IZ_WSClientState*);

void IZ_WSClientCancelService(IZ_WSClientState*);

IZ_ProcedureResult IZ_WSClientCallback(struct lws*, enum lws_callback_reasons, void*, void*, size_t);
IZ_ProcedureResult IZ_WSClientProtocolInitialize(struct lws*, void*);

void IZ_WSClientProtocolTeardown(struct lws*);

void IZ_WSClientConnectionError(struct lws*, void*);

IZ_ProcedureResult IZ_WSClientOnOpen(struct lws*, IZ_WSClientSessionData*);

void IZ_WSClientOnClose(struct lws*);

IZ_ProcedureResult IZ_WSClientWritable(struct lws*);

void IZ_WSClientOnReceive(struct lws*, IZ_WSClientSessionData*, void*, size_t);

IZ_WebsocketMessage* IZ_WSClientCreateMessage(struct lws*, bool, void*, size_t);

#endif

+ 268
- 25
src/packages/server/IZ_app.c View File

@@ -1,42 +1,33 @@
#include "IZ_app.h"

static IZ_App* global_app;

void IZ_AppHandleSignal(i32 _signal) {
global_app->server.ws.interrupted = true;
IZ_WSServerCancelService(&global_app->server);
}

// TODO move to each subsystem
// TODO unify loading of config from cmdline and config file
void IZ_AppLoadConfig(IZ_App *app, u8 argc, const char **argv) {
memcpy_s(app, sizeof(IZ_App), &IZ_APP_DEFAULT_STATE, sizeof(IZ_App));
IZ_ProcedureResult IZ_AppInitialize(IZ_App *app, u8 argc, const char **argv) {
global_app = app;
memset(app, 0, sizeof(IZ_App));
signal(SIGINT, IZ_AppHandleSignal);

const char *cmdline_buffer;
if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-d"))) {
app->config.log_level = atoi(cmdline_buffer);
}
// IZ_LogInterceptWSMessages(app->config.log_level);
IZ_LogInterceptWSMessages(LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE);

if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-p"))) {
app->config.port = atoi(cmdline_buffer);
const char* cmdline_buffer;
char config_path[128];
// TODO abstract command line args parsing
if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-c"))) {
memcpy_s(config_path, 128, cmdline_buffer, 128);
} else {
IZ_ConfigGetDefaultPath(config_path, 128);
}
}

IZ_ProcedureResult IZ_AppInitialize(IZ_App *app, u8 argc, const char **argv) {
signal(SIGINT, IZ_AppHandleSignal);
signal(9, IZ_AppHandleSignal);
signal(SIGTERM, IZ_AppHandleSignal);

IZ_AppLoadConfig(app, argc, argv);
IZ_LogInterceptWSMessages(app->config.log_level);
if (IZ_WSServerInitialize(&app->server, (IZ_WSServerInitializeParams) {
.protocol = NULL,
.port = app->config.port,
.default_filename = NULL,
.origin = NULL,
})) {
if (IZ_WSServerInitialize(&app->server, app, config_path, argc, argv)) {
return -1;
}

global_app = app;
return 0;
}

@@ -61,3 +52,255 @@ IZ_ProcedureResult IZ_AppRun(IZ_App *app, u8 argc, const char **argv) {
lwsl_user("Server closed. Bye!\n");
return result;
}

void IZ_WSServerCullLaggingClients(IZ_WSServerVHostData *vhd) {
u32 oldest_tail = lws_ring_get_oldest_tail(vhd->ring);
IZ_WSServerSessionData *old_pss = NULL;
i32 most = 0;
i32 before = (i32) lws_ring_get_count_waiting_elements(vhd->ring, &oldest_tail);
i32 m;

/*
* At least one guy with the oldest tail has lagged too far, filling
* the ringbuffer with stuff waiting for them, while new stuff is
* coming in, and they must close, freeing up ringbuffer entries.
*/

lws_start_foreach_llp_safe(
IZ_WSServerSessionData**,
ppss,
vhd->pss_list,
pss_list
) {
if ((*ppss)->tail == oldest_tail) {
old_pss = *ppss;

lwsl_user("Killing lagging client %p\n", (*ppss)->wsi);

lws_set_timeout((*ppss)->wsi, PENDING_TIMEOUT_LAGGING,
/*
* we may kill the wsi we came in on,
* so the actual close is deferred
*/
LWS_TO_KILL_ASYNC);

/*
* We might try to write something before we get a
* chance to close. But this pss is now detached
* from the ring buffer. Mark this pss as culled so we
* don't try to do anything more with it.
*/

(*ppss)->culled = true;

/*
* Because we can't kill it synchronously, but we
* know it's closing momentarily and don't want its
* participation any more, remove its pss from the
* vhd pss list early. (This is safe to repeat
* uselessly later in the close flow).
*
* Notice this changes *ppss!
*/

lws_ll_fwd_remove(IZ_WSServerSessionData, pss_list, (*ppss), vhd->pss_list);

/* use the changed *ppss so we won't skip anything */

continue;
}

/*
* so this guy is a survivor of the cull. Let's track
* what is the largest number of pending ring elements
* for any survivor.
*/
m = (i32) lws_ring_get_count_waiting_elements(vhd->ring, &((*ppss)->tail));
if (m > most) {
most = m;
}
} lws_end_foreach_llp_safe(ppss);

/* it would mean we lost track of oldest... but Coverity insists */
if (!old_pss) {
return;
}

/*
* Let's recover (ie, free up) all the ring slots between the
* original oldest's last one and the "worst" survivor.
*/

lws_ring_consume_and_update_oldest_tail(
vhd->ring,
IZ_WSServerSessionData,
&old_pss->tail,
(size_t) (before - most),
vhd->pss_list,
tail,
pss_list
);

lwsl_user("%s: shrunk ring from %d to %d\n", __func__, before, most);
}

/* destroys the message when everyone has had a copy of it */

IZ_ProcedureResult IZ_WSServerProtocolInitialize(struct lws* wsi, void* in) {
IZ_WSServerVHostData* vhd_instance = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);
IZ_WSServerVHostData** vhd = &vhd_instance;
*vhd = lws_protocol_vh_priv_zalloc(
lws_get_vhost(wsi),
lws_get_protocol(wsi),
sizeof(IZ_WSServerVHostData)
);
(*vhd)->context = lws_get_context(wsi);
(*vhd)->protocol = lws_get_protocol(wsi);
(*vhd)->vhost = lws_get_vhost(wsi);
(*vhd)->port = (u16*) lws_pvo_search(
(const struct lws_protocol_vhost_options *)in,
"port"
)->value;
(*vhd)->app = lws_pvo_search(
(const struct lws_protocol_vhost_options *)in,
"app"
)->value;
(*vhd)->ring = lws_ring_create(
sizeof(IZ_WebsocketMessage),
RING_COUNT,
IZ_WebsocketDestroyMessage
);
if (!(*vhd)->ring) {
return -1;
}
return 0;
}

void IZ_WSServerProtocolTeardown(struct lws* wsi) {
IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);
lws_ring_destroy(vhd->ring);
}

void IZ_WSServerOnOpen(struct lws* wsi, IZ_WSServerSessionData* pss) {
IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

/* add ourselves to the list of live pss held in the vhd */
lwsl_user("LWS_CALLBACK_ESTABLISHED: wsi %p\n", wsi);
lws_ll_fwd_insert(pss, pss_list, vhd->pss_list);
pss->tail = lws_ring_get_oldest_tail(vhd->ring);
pss->wsi = wsi;
}

void IZ_WSServerOnClose(struct lws* wsi, IZ_WSServerSessionData* pss) {
IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

lwsl_user("LWS_CALLBACK_CLOSED: wsi %p\n", wsi);
/* remove our closing pss from the list of live pss */
lws_ll_fwd_remove(IZ_WSServerSessionData, pss_list, pss, vhd->pss_list);
}

IZ_ProcedureResult IZ_WSServerWritable(struct lws* wsi, IZ_WSServerSessionData* pss) {
IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

if (pss->culled) {
return 0;
}
const IZ_WebsocketMessage* pmsg = lws_ring_get_element(vhd->ring, &pss->tail);
if (!pmsg) {
return 0;
}

/* notice we allowed for LWS_PRE in the payload already */
i32 m = lws_write(
wsi,
((unsigned char*) pmsg->payload) + LWS_PRE,
pmsg->len,
pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT
);

if (m < (i32) pmsg->len) {
lwsl_err("ERROR %d writing to ws socket\n", m);
return -1;
}

lws_ring_consume_and_update_oldest_tail(
vhd->ring, /* lws_ring object */
IZ_WSServerSessionData, /* type of objects with tails */
&pss->tail, /* tail of guy doing the consuming */
1, /* number of payload objects being consumed */
vhd->pss_list, /* head of list of objects with tails */
tail, /* member name of tail in objects with tails */
pss_list /* member name of next object in objects with tails */
);

/* more to do for us? */
if (lws_ring_get_element(vhd->ring, &pss->tail)) {
/* come back as soon as we can write more */
lws_callback_on_writable(pss->wsi);
}

return 0;
}

IZ_ProcedureResult IZ_WSServerOnReceive(struct lws* wsi, void* in, size_t len) {
IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);
i32 n = (i32) lws_ring_get_count_free_elements(vhd->ring);
if (!n) {
/* forcibly make space */
IZ_WSServerCullLaggingClients(vhd);
n = (i32) lws_ring_get_count_free_elements(vhd->ring);
}

if (!n) {
return 0;
}

lwsl_user("LWS_CALLBACK_RECEIVE: free space %d\n", n);

IZ_WebsocketMessage amsg;
amsg.len = len;
/* notice we over-allocate by LWS_PRE... */
amsg.payload = malloc(LWS_PRE + len);
amsg.binary = (u8) lws_frame_is_binary(wsi);
if (!amsg.payload) {
lwsl_user("OOM: dropping\n");
return 1;
}

/* ...and we copy the payload in at +LWS_PRE */
memcpy((char *) amsg.payload + LWS_PRE, in, len);
if (!lws_ring_insert(vhd->ring, &amsg, 1)) {
IZ_WebsocketDestroyMessage(&amsg);
lwsl_user("dropping!\n");
return 1;
}

/*
* let everybody know we want to write something on them
* as soon as they are ready
*/
lws_start_foreach_llp(IZ_WSServerSessionData**, ppss, vhd->pss_list) {
lws_callback_on_writable((*ppss)->wsi);
}

lws_end_foreach_llp(ppss, pss_list);
return 0;
}

+ 1
- 15
src/packages/server/IZ_app.h View File

@@ -6,26 +6,12 @@
#include "network/IZ_wsserver.h"
#include "log/IZ_log.h"
#include "IZ_common.h"
#include "IZ_config.h"

typedef struct {
i32 log_level;
u16 port;
} IZ_AppConfig;

typedef struct {
IZ_AppConfig config;
IZ_WSServerState server;
} IZ_App;

static const IZ_App IZ_APP_DEFAULT_STATE = {
.config = {
.log_level = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE,
.port = 42069,
},
};

static IZ_App* global_app;

IZ_ProcedureResult IZ_AppRun(IZ_App*, u8, const char**);

#endif

+ 8
- 0
src/packages/server/IZ_config.c View File

@@ -0,0 +1,8 @@
#include "IZ_config.h"

void IZ_ConfigGetDefaultPath(const char* config_path, size_t string_size) {
//const char* config_path_dir = SDL_GetPrefPath("Modal Studios", APP_NAME);
const char* config_path_dir = SDL_GetBasePath();
memcpy_s(config_path, string_size, config_path_dir, 128);
strcat_s(config_path, string_size, "config-server.ini");
}

+ 10
- 0
src/packages/server/IZ_config.h View File

@@ -0,0 +1,10 @@
#ifndef IZ_CONFIG_H
#define IZ_CONFIG_H

#include <SDL_filesystem.h>
#include <string.h>

// TODO unify loading of config from cmdline and config file
void IZ_ConfigGetDefaultPath(const char*, size_t);

#endif

+ 49
- 258
src/packages/server/network/IZ_wsserver.c View File

@@ -1,248 +1,5 @@
#include "IZ_wsserver.h"

void IZ_ProtocolCullLaggingClients(IZ_WSServerVHostData *vhd) {
u32 oldest_tail = lws_ring_get_oldest_tail(vhd->ring);
IZ_WSServerSessionData *old_pss = NULL;
i32 most = 0;
i32 before = (i32) lws_ring_get_count_waiting_elements(vhd->ring, &oldest_tail);
i32 m;

/*
* At least one guy with the oldest tail has lagged too far, filling
* the ringbuffer with stuff waiting for them, while new stuff is
* coming in, and they must close, freeing up ringbuffer entries.
*/

lws_start_foreach_llp_safe(
IZ_WSServerSessionData**,
ppss,
vhd->pss_list,
pss_list
) {
if ((*ppss)->tail == oldest_tail) {
old_pss = *ppss;

lwsl_user("Killing lagging client %p\n", (*ppss)->wsi);

lws_set_timeout((*ppss)->wsi, PENDING_TIMEOUT_LAGGING,
/*
* we may kill the wsi we came in on,
* so the actual close is deferred
*/
LWS_TO_KILL_ASYNC);

/*
* We might try to write something before we get a
* chance to close. But this pss is now detached
* from the ring buffer. Mark this pss as culled so we
* don't try to do anything more with it.
*/

(*ppss)->culled = true;

/*
* Because we can't kill it synchronously, but we
* know it's closing momentarily and don't want its
* participation any more, remove its pss from the
* vhd pss list early. (This is safe to repeat
* uselessly later in the close flow).
*
* Notice this changes *ppss!
*/

lws_ll_fwd_remove(IZ_WSServerSessionData, pss_list, (*ppss), vhd->pss_list);

/* use the changed *ppss so we won't skip anything */

continue;
}

/*
* so this guy is a survivor of the cull. Let's track
* what is the largest number of pending ring elements
* for any survivor.
*/
m = (i32) lws_ring_get_count_waiting_elements(vhd->ring, &((*ppss)->tail));
if (m > most) {
most = m;
}
} lws_end_foreach_llp_safe(ppss);

/* it would mean we lost track of oldest... but Coverity insists */
if (!old_pss) {
return;
}

/*
* Let's recover (ie, free up) all the ring slots between the
* original oldest's last one and the "worst" survivor.
*/

lws_ring_consume_and_update_oldest_tail(
vhd->ring,
IZ_WSServerSessionData,
&old_pss->tail,
(size_t) (before - most),
vhd->pss_list,
tail,
pss_list
);

lwsl_user("%s: shrunk ring from %d to %d\n", __func__, before, most);
}

/* destroys the message when everyone has had a copy of it */

IZ_ProcedureResult IZ_WSServerProtocolInitialize(struct lws* wsi) {
IZ_WSServerVHostData* vhd_instance = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);
IZ_WSServerVHostData** vhd = &vhd_instance;
*vhd = lws_protocol_vh_priv_zalloc(
lws_get_vhost(wsi),
lws_get_protocol(wsi),
sizeof(IZ_WSServerVHostData)
);
(*vhd)->context = lws_get_context(wsi);
(*vhd)->protocol = lws_get_protocol(wsi);
(*vhd)->vhost = lws_get_vhost(wsi);
(*vhd)->ring = lws_ring_create(
sizeof(IZ_WebsocketMessage),
RING_COUNT,
IZ_WebsocketDestroyMessage
);
if (!(*vhd)->ring) {
return 1;
}
return 0;
}

void IZ_WSServerProtocolTeardown(struct lws* wsi) {
IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);
lws_ring_destroy(vhd->ring);
}

void IZ_WSServerOnOpen(struct lws* wsi, IZ_WSServerSessionData* pss) {
IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

/* add ourselves to the list of live pss held in the vhd */
lwsl_user("LWS_CALLBACK_ESTABLISHED: wsi %p\n", wsi);
lws_ll_fwd_insert(pss, pss_list, vhd->pss_list);
pss->tail = lws_ring_get_oldest_tail(vhd->ring);
pss->wsi = wsi;
}

void IZ_WSServerOnClose(struct lws* wsi, IZ_WSServerSessionData* pss) {
IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

lwsl_user("LWS_CALLBACK_CLOSED: wsi %p\n", wsi);
/* remove our closing pss from the list of live pss */
lws_ll_fwd_remove(IZ_WSServerSessionData, pss_list, pss, vhd->pss_list);
}

IZ_ProcedureResult IZ_WSServerWritable(struct lws* wsi, IZ_WSServerSessionData* pss) {
IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

if (pss->culled) {
return 0;
}
const IZ_WebsocketMessage* pmsg = lws_ring_get_element(vhd->ring, &pss->tail);
if (!pmsg) {
return 0;
}

/* notice we allowed for LWS_PRE in the payload already */
i32 m = lws_write(
wsi,
((unsigned char*) pmsg->payload) + LWS_PRE,
pmsg->len,
pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT
);

if (m < (i32) pmsg->len) {
lwsl_err("ERROR %d writing to ws socket\n", m);
return -1;
}

lws_ring_consume_and_update_oldest_tail(
vhd->ring, /* lws_ring object */
IZ_WSServerSessionData, /* type of objects with tails */
&pss->tail, /* tail of guy doing the consuming */
1, /* number of payload objects being consumed */
vhd->pss_list, /* head of list of objects with tails */
tail, /* member name of tail in objects with tails */
pss_list /* member name of next object in objects with tails */
);

/* more to do for us? */
if (lws_ring_get_element(vhd->ring, &pss->tail)) {
/* come back as soon as we can write more */
lws_callback_on_writable(pss->wsi);
}

return 0;
}

void IZ_WSServerOnReceive(struct lws* wsi, void* in, size_t len) {
IZ_WSServerVHostData* vhd = (IZ_WSServerVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);
IZ_WebsocketMessage amsg;
i32 n = (i32) lws_ring_get_count_free_elements(vhd->ring);
if (!n) {
/* forcibly make space */
IZ_ProtocolCullLaggingClients(vhd);
n = (i32) lws_ring_get_count_free_elements(vhd->ring);
}

if (!n) {
return;
}

lwsl_user("LWS_CALLBACK_RECEIVE: free space %d\n", n);

amsg.len = len;
/* notice we over-allocate by LWS_PRE... */
amsg.payload = malloc(LWS_PRE + len);
amsg.binary = (u8) lws_frame_is_binary(wsi);
if (!amsg.payload) {
lwsl_user("OOM: dropping\n");
return;
}

/* ...and we copy the payload in at +LWS_PRE */
memcpy((char *) amsg.payload + LWS_PRE, in, len);
if (!lws_ring_insert(vhd->ring, &amsg, 1)) {
IZ_WebsocketDestroyMessage(&amsg);
lwsl_user("dropping!\n");
return;
}

/*
* let everybody know we want to write something on them
* as soon as they are ready
*/
lws_start_foreach_llp(IZ_WSServerSessionData**, ppss, vhd->pss_list) {
lws_callback_on_writable((*ppss)->wsi);
}

lws_end_foreach_llp(ppss, pss_list);
}

IZ_ProcedureResult IZ_WSServerCallback(
struct lws* wsi,
enum lws_callback_reasons reason,
@@ -252,7 +9,7 @@ IZ_ProcedureResult IZ_WSServerCallback(
) {
switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT:
return IZ_WSServerProtocolInitialize(wsi);
return IZ_WSServerProtocolInitialize(wsi, in);
case LWS_CALLBACK_PROTOCOL_DESTROY:
IZ_WSServerProtocolTeardown(wsi);
break;
@@ -265,8 +22,7 @@ IZ_ProcedureResult IZ_WSServerCallback(
case LWS_CALLBACK_SERVER_WRITEABLE:
return IZ_WSServerWritable(wsi, user);
case LWS_CALLBACK_RECEIVE:
IZ_WSServerOnReceive(wsi, in, len);
break;
return IZ_WSServerOnReceive(wsi, in, len);
default:
break;
}
@@ -274,10 +30,27 @@ IZ_ProcedureResult IZ_WSServerCallback(
return 0;
}

IZ_ProcedureResult IZ_WSServerInitialize(IZ_WSServerState* state, IZ_WSServerInitializeParams params) {
void IZ_WSServerLoadConfig(IZ_WSServerState* state, const char* config_path, u8 argc, const char* argv[]) {
// TODO unify loading of config from cmdline and config file
memcpy_s(state, sizeof(IZ_WSServerState), &IZ_DEFAULT_STATE, sizeof(IZ_WSServerState));

const char *cmdline_buffer;
// if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-d"))) {
// state->config.log_level = atoi(cmdline_buffer);
// }

if ((cmdline_buffer = lws_cmdline_option(argc, argv, "-p"))) {
state->config.port = atoi(cmdline_buffer);
}
}

IZ_ProcedureResult IZ_WSServerInitialize(IZ_WSServerState* state, void* userdata, const char* config_path, u8 argc, const char* argv[]) {
IZ_WSServerLoadConfig(state, config_path, argc, argv);
state->userdata = userdata;

struct lws_context_creation_info info;
memset(&info, 0, sizeof info);
info.port = params.port;
info.port = state->config.port;

static struct lws_http_mount mount = {
.mount_next = NULL, /* linked-list "next" */
@@ -298,17 +71,11 @@ IZ_ProcedureResult IZ_WSServerInitialize(IZ_WSServerState* state, IZ_WSServerIni
.mountpoint_len = 1, /* char count */
.basic_auth_login_file = NULL,
};

if (params.origin) {
mount.origin = params.origin;
if (state->config.origin) {
mount.origin = state->config.origin;
}

if (params.default_filename) {
mount.def = params.default_filename;
}

if (params.protocol) {
mount.protocol = params.protocol;
if (state->config.default_filename) {
mount.def = state->config.default_filename;
}
info.mounts = &mount;

@@ -334,6 +101,30 @@ IZ_ProcedureResult IZ_WSServerInitialize(IZ_WSServerState* state, IZ_WSServerIni
LWS_PROTOCOL_LIST_TERM,
};
info.protocols = protocols;

static struct lws_protocol_vhost_options pvo_port = {
NULL,
NULL,
"port", /* pvo name */
NULL /* pvo value */
};
pvo_port.value = (void*) &state->config.port;

static struct lws_protocol_vhost_options pvo_app = {
&pvo_port,
NULL,
"app",
NULL,
};
pvo_app.value = state->userdata;

static const struct lws_protocol_vhost_options pvo = {
NULL, /* "next" pvo linked-list */
&pvo_app, /* "child" pvo linked-list */
NETWORK_PROTOCOL, /* protocol name we belong to on this vhost */
"" /* ignored */
};
info.pvo = &pvo;
info.options = (
LWS_SERVER_OPTION_VALIDATE_UTF8
| LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE


+ 30
- 16
src/packages/server/network/IZ_wsserver.h View File

@@ -21,34 +21,36 @@ typedef struct {
const struct lws_protocols *protocol;
IZ_WSServerSessionData *pss_list; /* linked-list of live pss*/
struct lws_ring *ring; /* ringbuffer holding unsent messages */
u16* port;
const void* app;
} IZ_WSServerVHostData;

IZ_ProcedureResult IZ_WSServerCallback(
struct lws*,
enum lws_callback_reasons,
void*,
void*,
size_t
);

typedef struct {
u16 port;
const char* origin;
const char* default_filename;
const char* protocol;
} IZ_WSServerInitializeParams;

typedef struct {
i32 log_level;
u16 port;
} IZ_WSServerConfig;

typedef struct {
IZ_WSServerConfig config;
IZ_WSServerInitializeParams config;
void* userdata;
IZ_Websocket ws;
} IZ_WSServerState;

IZ_ProcedureResult IZ_WSServerInitialize(IZ_WSServerState*, IZ_WSServerInitializeParams);
static IZ_WSServerState IZ_DEFAULT_STATE = {
.config = {
.port = 42069,
.origin = "./public",
.default_filename = "index.html",
},
.userdata = NULL,
.ws = {
.interrupted = false,
.context = NULL,
},
};

IZ_ProcedureResult IZ_WSServerInitialize(IZ_WSServerState*, void*, const char*, u8, const char**);

IZ_ProcedureResult IZ_WSServerHandle(IZ_WSServerState*);

@@ -56,4 +58,16 @@ void IZ_WSServerTeardown(IZ_WSServerState*);

void IZ_WSServerCancelService(IZ_WSServerState*);

IZ_ProcedureResult IZ_WSServerProtocolInitialize(struct lws*, void*);

void IZ_WSServerProtocolTeardown(struct lws*);

IZ_ProcedureResult IZ_WSServerWritable(struct lws*, IZ_WSServerSessionData*);

IZ_ProcedureResult IZ_WSServerOnReceive(struct lws*, void*, size_t);

void IZ_WSServerOnOpen(struct lws*, IZ_WSServerSessionData*);

void IZ_WSServerOnClose(struct lws*, IZ_WSServerSessionData*);

#endif

Loading…
Cancel
Save