Pārlūkot izejas kodu

Make client receive messages

With the help of the test client app, the real client is able to receive
messages which are transmitted through the server.
feature/data-structs
TheoryOfNekomata pirms 2 gadiem
vecāks
revīzija
74302fadcc
4 mainītis faili ar 293 papildinājumiem un 262 dzēšanām
  1. +280
    -29
      src/packages/game/IZ_app.c
  2. +0
    -2
      src/packages/game/network/IZ_websocket.c
  3. +9
    -231
      src/packages/game/network/IZ_wsclient.c
  4. +4
    -0
      src/packages/game/network/IZ_wsclient.h

+ 280
- 29
src/packages/game/IZ_app.c Parādīt failu

@@ -1,12 +1,6 @@
#include "IZ_app.h"

IZ_ProcedureResult IZ_AppWSClientInitialize(IZ_App* app) {
IZ_WSClientInitializeParams params = {
.address = "localhost",
.path = "/",
.port = 42069,
};

IZ_ProcedureResult IZ_AppWSClientInitialize(IZ_App* app, IZ_WSClientInitializeParams params) {
if (IZ_WSClientInitialize(&app->client, params)) {
printf("error\n");
return -1;
@@ -16,7 +10,14 @@ IZ_ProcedureResult IZ_AppWSClientInitialize(IZ_App* app) {

IZ_ProcedureResult IZ_AppConnect(void* app_raw) {
IZ_App* app = app_raw;
if (IZ_AppWSClientInitialize(app)) {
IZ_WSClientInitializeParams params = {
.address = "127.0.0.1",
.path = "/",
.port = 42069,
.userdata = app,
};

if (IZ_AppWSClientInitialize(app, params)) {
return -1;
}

@@ -148,15 +149,27 @@ void IZ_AppHandlePortMIDIEvents(IZ_App* app) {
}
}

IZ_ProcedureResult IZ_AppHandleEvents(IZ_App* app) {
if (IZ_AppHandleSDLEvents(app)) {
return 1;
void IZ_AppHandleNetworkingInboundBinaryEvents(IZ_App* app, void* binary_raw, size_t len) {
u8* binary = binary_raw;
size_t i;
printf("Binary");
for (i = 0; i < len; i += 1) {
printf("%c%x", i == 0 ? '(' : ' ', binary[i]);
}
IZ_AppHandlePortMIDIEvents(app);
return 0;
printf(")\n");
}

IZ_ProcedureResult IZ_AppRunNetworkingThread(void* ptr) {
void IZ_AppHandleNetworkingInboundTextEvents(IZ_App* app, const char* text, size_t len) {
printf("String(%s)\n", text);
}

IZ_ProcedureResult IZ_AppHandleInputEvents(IZ_App* app) {
i32 sdl_events_result = IZ_AppHandleSDLEvents(app);
if (sdl_events_result) {
return sdl_events_result;
}

IZ_AppHandlePortMIDIEvents(app);
return 0;
}

@@ -166,20 +179,6 @@ IZ_ProcedureResult IZ_AppRun(IZ_App* app, u8 argc, const char* argv[]) {
return init_result;
}

// if (IZ_WSClientConnect(&app->client, (IZ_WSClientConnectParams) {
// .userdata = app,
// .path = "/",
// .protocol = NETWORK_PROTOCOL, // TODO handle ws protocol correctly
// .address = "localhost",
// .port = 42069,
// .callback = IZ_AppWSClientCallback,
// })) {
// return IZ_APP_RUN_NETWORKING_ERROR;
// }

//app->client_thread = SDL_CreateThread(IZ_AppRunNetworkingThread, "Networking", app);
//SDL_DetachThread(app->client_thread);

while (true) {
app->ticks = SDL_GetTicks64();

@@ -190,7 +189,7 @@ IZ_ProcedureResult IZ_AppRun(IZ_App* app, u8 argc, const char* argv[]) {
//IZ_WSClientHandle(&app->client);
//}

if (IZ_AppHandleEvents(app)) {
if (IZ_AppHandleInputEvents(app)) {
break;
}

@@ -200,3 +199,255 @@ IZ_ProcedureResult IZ_AppRun(IZ_App* app, u8 argc, const char* argv[]) {
IZ_AppTeardown(app);
return IZ_APP_RUN_RESULT_OK;
}

void IZ_WSClientAttemptConnect(struct lws_sorted_usec_list *sul) {
IZ_WSClientVHostData* vhd = lws_container_of(sul, IZ_WSClientVHostData, sul);

vhd->i.context = vhd->context;
vhd->i.port = *vhd->port;
vhd->i.address = vhd->address;
vhd->i.path = vhd->path;
vhd->i.host = vhd->i.address;
vhd->i.origin = vhd->i.address;
vhd->i.ssl_connection = 0;

vhd->i.protocol = NETWORK_PROTOCOL;
vhd->i.pwsi = &vhd->client_wsi;

if (lws_client_connect_via_info(&vhd->i)) {
return;
}

lws_sul_schedule(
vhd->context,
0,
&vhd->sul,
IZ_WSClientAttemptConnect,
10 * LWS_US_PER_SEC
);
}

IZ_ProcedureResult IZ_WSClientProtocolInitialize(struct lws* wsi, void* in) {
IZ_WSClientVHostData* vhd_instance = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);
IZ_WSClientVHostData** vhd = &vhd_instance;
*vhd = lws_protocol_vh_priv_zalloc(
lws_get_vhost(wsi),
lws_get_protocol(wsi),
sizeof(IZ_WSClientVHostData)
);
(*vhd)->context = lws_get_context(wsi);
(*vhd)->protocol = lws_get_protocol(wsi);
(*vhd)->vhost = lws_get_vhost(wsi);
(*vhd)->port = (u16*) lws_pvo_search(
(const struct lws_protocol_vhost_options *)in,
"port"
)->value;
(*vhd)->address = lws_pvo_search(
(const struct lws_protocol_vhost_options *)in,
"address"
)->value;
(*vhd)->path = lws_pvo_search(
(const struct lws_protocol_vhost_options *)in,
"path"
)->value;
(*vhd)->app = lws_pvo_search(
(const struct lws_protocol_vhost_options *)in,
"app"
)->value;
(*vhd)->ring = lws_ring_create(
sizeof(IZ_WebsocketMessage),
RING_COUNT,
IZ_WebsocketDestroyMessage
);
if (!(*vhd)->ring) {
return -1;
}
IZ_WSClientAttemptConnect(&(*vhd)->sul);
return 0;
}

void IZ_WSClientProtocolTeardown(struct lws* wsi) {
IZ_WSClientVHostData* vhd = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

vhd->finished = true;
if (vhd->ring) {
lws_ring_destroy(vhd->ring);
}

lws_sul_cancel(&vhd->sul);
}

void IZ_WSClientConnectionError(struct lws* wsi, void* in) {
IZ_WSClientVHostData* vhd = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", in ? (char *)in : "(null)");
vhd->client_wsi = NULL;
lws_sul_schedule(
vhd->context,
0,
&vhd->sul,
IZ_WSClientAttemptConnect,
LWS_US_PER_SEC
);
}

IZ_ProcedureResult IZ_WSClientOnOpen(struct lws* wsi, IZ_WSClientSessionData* pss) {
IZ_WSClientVHostData* vhd = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

vhd->established = true;
pss->ring = lws_ring_create(sizeof(IZ_WebsocketMessage), RING_COUNT,IZ_WebsocketDestroyMessage);
if (!pss->ring) {
return -1;
}
pss->tail = 0;
return 0;
}

void IZ_WSClientOnClose(struct lws* wsi) {
IZ_WSClientVHostData* vhd = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

vhd->client_wsi = NULL;
vhd->established = false;
lws_sul_schedule(
vhd->context,
0,
&vhd->sul,
IZ_WSClientAttemptConnect,
LWS_US_PER_SEC
);
}

IZ_ProcedureResult IZ_WSClientWritable(struct lws* wsi) {
IZ_WSClientVHostData* vhd = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

const IZ_WebsocketMessage* pmsg = lws_ring_get_element(vhd->ring, &vhd->tail);
if (!pmsg) {
return 0;
}

/* notice we allowed for LWS_PRE in the payload already */
i32 m = lws_write(
wsi,
((unsigned char*) pmsg->payload) + LWS_PRE,
pmsg->len,
pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT
);

if (m < (i32)pmsg->len) {
lwsl_err("ERROR %d writing to ws socket\n", m);
return -1;
}

lws_ring_consume_single_tail(vhd->ring, &vhd->tail, 1);

/* more to do for us? */
if (lws_ring_get_element(vhd->ring, &vhd->tail)) {
/* come back as soon as we can write more */
lws_callback_on_writable(wsi);
}

return 0;
}

void IZ_WSClientOnReceive(struct lws* wsi, IZ_WSClientSessionData* pss, void* in, size_t len) {
lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: %4d (rpp %5d, first %d, last %d, bin %d)\n",
(int)len, (int)lws_remaining_packet_payload(wsi),
lws_is_first_fragment(wsi),
lws_is_final_fragment(wsi),
lws_frame_is_binary(wsi));

// lwsl_hexdump_notice(in, len);
IZ_WebsocketMessage amsg;
amsg.first = (char)lws_is_first_fragment(wsi);
amsg.final = (char)lws_is_final_fragment(wsi);
amsg.binary = (char)lws_frame_is_binary(wsi);
i32 n = (i32) lws_ring_get_count_free_elements(pss->ring);
if (!n) {
lwsl_user("dropping!\n");
return;
}

amsg.len = len;
/* notice we over-allocate by LWS_PRE */
amsg.payload = malloc(LWS_PRE + len);
if (!amsg.payload) {
lwsl_user("OOM: dropping\n");
return;
}

memcpy((char *)amsg.payload + LWS_PRE, in, len);
IZ_WSClientVHostData* vhd = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

IZ_App* app = (IZ_App*) vhd->app;
if (amsg.binary) {
IZ_AppHandleNetworkingInboundBinaryEvents(app, in, len);
} else {
IZ_AppHandleNetworkingInboundTextEvents(app, in, len);
}

if (!lws_ring_insert(pss->ring, &amsg, 1)) {
IZ_WebsocketDestroyMessage(&amsg);
lwsl_user("dropping!\n");
return;
}
lws_callback_on_writable(wsi);

if (!pss->flow_controlled && n < 3) {
pss->flow_controlled = 1;
lws_rx_flow_control(wsi, 0);
}
}

IZ_ProcedureResult IZ_WSClientCallback(
struct lws* wsi,
enum lws_callback_reasons reason,
void* user,
void* in,
size_t len
) {
switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT:
return IZ_WSClientProtocolInitialize(wsi, in);
case LWS_CALLBACK_PROTOCOL_DESTROY:
IZ_WSClientProtocolTeardown(wsi);
break;
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
IZ_WSClientConnectionError(wsi, in);
break;
case LWS_CALLBACK_CLIENT_ESTABLISHED:
return IZ_WSClientOnOpen(wsi, user);
case LWS_CALLBACK_CLIENT_CLOSED:
IZ_WSClientOnClose(wsi);
break;
case LWS_CALLBACK_CLIENT_RECEIVE:
IZ_WSClientOnReceive(wsi, user, in, len);
break;
case LWS_CALLBACK_CLIENT_WRITEABLE:
return IZ_WSClientWritable(wsi);
default:
break;
}

return lws_callback_http_dummy(wsi, reason, user, in, len);
}

+ 0
- 2
src/packages/game/network/IZ_websocket.c Parādīt failu

@@ -6,8 +6,6 @@ void IZ_WebsocketInitialize(IZ_Websocket* ws) {
}

IZ_ProcedureResult IZ_WebsocketHandle(IZ_Websocket* ws) {
// FIXME: https://libwebsockets.org/git/libwebsockets/tree/minimal-examples-lowlevel/http-server/minimal-http-server-eventlib-foreign
// return lws_service(ws->context, 0);
return lws_service_tsi(ws->context, -1, 0);
}



+ 9
- 231
src/packages/game/network/IZ_wsclient.c Parādīt failu

@@ -1,235 +1,5 @@
#include "IZ_wsclient.h"

void IZ_WSClientAttemptConnect(struct lws_sorted_usec_list *sul) {
IZ_WSClientVHostData* vhd = lws_container_of(sul, IZ_WSClientVHostData, sul);

printf("%s %s %u", vhd->address, vhd->path, *vhd->port);

vhd->i.context = vhd->context;
vhd->i.port = *vhd->port;
vhd->i.address = vhd->address;
vhd->i.path = vhd->path;
vhd->i.host = vhd->i.address;
vhd->i.origin = vhd->i.address;
vhd->i.ssl_connection = 0;

vhd->i.protocol = NETWORK_PROTOCOL;
vhd->i.pwsi = &vhd->client_wsi;

if (lws_client_connect_via_info(&vhd->i)) {
return;
}

lws_sul_schedule(
vhd->context,
0,
&vhd->sul,
IZ_WSClientAttemptConnect,
10 * LWS_US_PER_SEC
);
}

IZ_ProcedureResult IZ_WSClientProtocolInitialize(struct lws* wsi, void* in) {
IZ_WSClientVHostData* vhd_instance = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);
IZ_WSClientVHostData** vhd = &vhd_instance;
*vhd = lws_protocol_vh_priv_zalloc(
lws_get_vhost(wsi),
lws_get_protocol(wsi),
sizeof(IZ_WSClientVHostData)
);
(*vhd)->context = lws_get_context(wsi);
(*vhd)->protocol = lws_get_protocol(wsi);
(*vhd)->vhost = lws_get_vhost(wsi);
(*vhd)->port = (u16*) lws_pvo_search(
(const struct lws_protocol_vhost_options *)in,
"port")->value;
(*vhd)->address = lws_pvo_search(
(const struct lws_protocol_vhost_options *)in,
"address")->value;
(*vhd)->path = lws_pvo_search(
(const struct lws_protocol_vhost_options *)in,
"path")->value;
(*vhd)->ring = lws_ring_create(
sizeof(IZ_WebsocketMessage),
RING_COUNT,
IZ_WebsocketDestroyMessage
);
if (!(*vhd)->ring) {
return 1;
}
IZ_WSClientAttemptConnect(&(*vhd)->sul);
return 0;
}

void IZ_WSClientProtocolTeardown(struct lws* wsi) {
IZ_WSClientVHostData* vhd = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

vhd->finished = true;
if (vhd->ring) {
lws_ring_destroy(vhd->ring);
}

lws_sul_cancel(&vhd->sul);
}

void IZ_WSClientConnectionError(struct lws* wsi, void* in) {
IZ_WSClientVHostData* vhd = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", in ? (char *)in : "(null)");
vhd->client_wsi = NULL;
lws_sul_schedule(
vhd->context,
0,
&vhd->sul,
IZ_WSClientAttemptConnect,
LWS_US_PER_SEC
);
}

void IZ_WSClientOnOpen(struct lws* wsi) {
IZ_WSClientVHostData* vhd = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

vhd->established = true;
}

IZ_ProcedureResult IZ_WSClientWritable(struct lws* wsi) {
IZ_WSClientVHostData* vhd = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

const IZ_WebsocketMessage* pmsg = lws_ring_get_element(vhd->ring, &vhd->tail);
if (!pmsg) {
return 0;
}

/* notice we allowed for LWS_PRE in the payload already */
i32 m = lws_write(
wsi,
((unsigned char*) pmsg->payload) + LWS_PRE,
pmsg->len,
pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT
);

if (m < (i32)pmsg->len) {
lwsl_err("ERROR %d writing to ws socket\n", m);
return -1;
}

lws_ring_consume_single_tail(vhd->ring, &vhd->tail, 1);

/* more to do for us? */
if (lws_ring_get_element(vhd->ring, &vhd->tail)) {
/* come back as soon as we can write more */
lws_callback_on_writable(wsi);
}

return 0;
}

void IZ_WSClientOnClose(struct lws* wsi) {
IZ_WSClientVHostData* vhd = (IZ_WSClientVHostData*) lws_protocol_vh_priv_get(
lws_get_vhost(wsi),
lws_get_protocol(wsi)
);

vhd->client_wsi = NULL;
vhd->established = false;
lws_sul_schedule(
vhd->context,
0,
&vhd->sul,
IZ_WSClientAttemptConnect,
LWS_US_PER_SEC
);
}

void IZ_WSClientOnReceive(struct lws* wsi, IZ_WSClientSessionData* pss, void* in, size_t len) {
lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: %4d (rpp %5d, first %d, last %d, bin %d)\n",
(int)len, (int)lws_remaining_packet_payload(wsi),
lws_is_first_fragment(wsi),
lws_is_final_fragment(wsi),
lws_frame_is_binary(wsi));

// lwsl_hexdump_notice(in, len);
IZ_WebsocketMessage amsg;
amsg.first = (char)lws_is_first_fragment(wsi);
amsg.final = (char)lws_is_final_fragment(wsi);
amsg.binary = (char)lws_frame_is_binary(wsi);
i32 n = (int)lws_ring_get_count_free_elements(pss->ring);
if (!n) {
lwsl_user("dropping!\n");
return;
}

amsg.len = len;
/* notice we over-allocate by LWS_PRE */
amsg.payload = malloc(LWS_PRE + len);
if (!amsg.payload) {
lwsl_user("OOM: dropping\n");
return;
}

memcpy((char *)amsg.payload + LWS_PRE, in, len);
if (!lws_ring_insert(pss->ring, &amsg, 1)) {
IZ_WebsocketDestroyMessage(&amsg);
lwsl_user("dropping!\n");
return;
}
lws_callback_on_writable(wsi);

if (!pss->flow_controlled && n < 3) {
pss->flow_controlled = 1;
lws_rx_flow_control(wsi, 0);
}
}

IZ_ProcedureResult IZ_WSClientCallback(
struct lws* wsi,
enum lws_callback_reasons reason,
void* user,
void* in,
size_t len
) {
switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT:
return IZ_WSClientProtocolInitialize(wsi, in);
case LWS_CALLBACK_PROTOCOL_DESTROY:
IZ_WSClientProtocolTeardown(wsi);
break;
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
IZ_WSClientConnectionError(wsi, in);
break;
case LWS_CALLBACK_CLIENT_ESTABLISHED:
IZ_WSClientOnOpen(wsi);
break;
case LWS_CALLBACK_CLIENT_CLOSED:
IZ_WSClientOnClose(wsi);
break;
case LWS_CALLBACK_CLIENT_RECEIVE:
IZ_WSClientOnReceive(wsi, user, in, len);
break;
case LWS_CALLBACK_CLIENT_WRITEABLE:
return IZ_WSClientWritable(wsi);
default:
break;
}

return lws_callback_http_dummy(wsi, reason, user, in, len);
}

IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState* state, IZ_WSClientInitializeParams params) {
struct lws_context_creation_info info;
memset(&info, 0, sizeof info);
@@ -273,9 +43,17 @@ IZ_ProcedureResult IZ_WSClientInitialize(IZ_WSClientState* state, IZ_WSClientIni
};
pvo_port.value = (void*) &params.port;

static struct lws_protocol_vhost_options pvo_app = {
&pvo_port,
NULL,
"app",
NULL,
};
pvo_app.value = params.userdata;

static const struct lws_protocol_vhost_options pvo = {
NULL, /* "next" pvo linked-list */
&pvo_port, /* "child" pvo linked-list */
&pvo_app, /* "child" pvo linked-list */
NETWORK_PROTOCOL, /* protocol name we belong to on this vhost */
"" /* ignored */
};


+ 4
- 0
src/packages/game/network/IZ_wsclient.h Parādīt failu

@@ -23,6 +23,7 @@ typedef struct {
const char* address;
const char* path;
u16* port;
const void* app;
} IZ_WSClientVHostData;

typedef struct {
@@ -37,6 +38,7 @@ typedef struct {
const char* address;
const char* path;
u16 port;
void* userdata;
} IZ_WSClientInitializeParams;

typedef struct {
@@ -51,4 +53,6 @@ void IZ_WSClientTeardown(IZ_WSClientState*);

void IZ_WSClientCancelService(IZ_WSClientState*);

IZ_ProcedureResult IZ_WSClientCallback(struct lws*, enum lws_callback_reasons, void*, void*, size_t);

#endif

Notiek ielāde…
Atcelt
Saglabāt