About to make the processing queue lock properly

This commit is contained in:
2025-04-10 17:14:25 -05:00
parent 1b336ff559
commit 697b5f7ee2
25 changed files with 309 additions and 22 deletions

View File

@ -27,5 +27,6 @@ add_subdirectory(assert)
add_subdirectory(client)
add_subdirectory(console)
add_subdirectory(error)
add_subdirectory(packet)
add_subdirectory(server)
add_subdirectory(util)

View File

@ -75,6 +75,12 @@ void clientDisconnect() {
default:
assertUnreachable("Invalid client type");
}
consolePrint("Client disconnected");
}
void clientProcess() {
if(CLIENT.state == CLIENT_STATE_DISCONNECTED) return;
}
void clientDispose() {

View File

@ -7,6 +7,7 @@
#pragma once
#include "client/networked/networkedclient.h"
#include "packet/packetqueue.h"
typedef enum {
CLIENT_TYPE_NETWORKED,
@ -30,6 +31,7 @@ typedef struct clientconnect_s {
typedef struct client_s {
clientstate_t state;
clienttype_t type;
packetqueue_t packetQueue;
union {
networkedclient_t networked;

View File

@ -96,7 +96,7 @@ errorret_t networkedClientConnect(
ret = pthread_create(
&client->networked.thread,
NULL,
clientThread,
networkedClientThread,
client
);
@ -123,9 +123,9 @@ void networkedClientDisconnect(client_t *client) {
int32_t maxAttempts = 0;
while(client->state == CLIENT_STATE_DISCONNECTING) {
usleep(1000);
usleep(100 * 1000);// Sleep for 100ms
maxAttempts++;
if(maxAttempts > 15) {
if(maxAttempts > 5) {
consolePrint("Client disconnect timed out, force closing");
break;
}
@ -142,8 +142,6 @@ void networkedClientDisconnect(client_t *client) {
close(client->networked.socket);
client->networked.socket = 0;
}
consolePrint("Client disconnected");
}
errorret_t networkedClientWrite(
@ -237,10 +235,11 @@ errorret_t networkedClientReadPacket(
return ERROR_OK;
}
void * clientThread(void *arg) {
void * networkedClientThread(void *arg) {
assertNotNull(arg, "Client thread argument is NULL");
assertNotMainThread("Client thread must not be on main thread");
packet_t packet;
client_t *client = (client_t *)arg;
assertTrue(
client->type == CLIENT_TYPE_NETWORKED,
@ -254,8 +253,14 @@ void * clientThread(void *arg) {
client->state = CLIENT_STATE_CONNECTED;
while(client->state == CLIENT_STATE_CONNECTED) {
usleep(1000 * 1000);
usleep(1000*1000);// Sleep for 1 second
packetPingCreate(&packet);
packetQueuePushOutbound(
&client->packetQueue,
&packet
);
}
printf("Client thread exiting\n");
client->state = CLIENT_STATE_DISCONNECTED;
}

View File

@ -8,7 +8,7 @@
#pragma once
#include "error/error.h"
#include <arpa/inet.h>
#include "server/packet/packet.h"
#include "packet/packet.h"
typedef struct client_s client_t;
typedef struct clientconnect_s clientconnect_t;

View File

@ -8,6 +8,7 @@
#include "console/console.h"
#include "client/client.h"
#include "server/server.h"
#include "util/random.h"
#include "util/string.h"
#include "assert/assert.h"
@ -20,6 +21,7 @@ void cmdExit(const consolecmdexec_t *exec) {
int main(void) {
assertInit();
consoleInit();
randomInit();
clientInit();
serverInit();

View File

@ -9,4 +9,6 @@ target_sources(${DUSK_TARGET_NAME}
packet.c
packetwelcome.c
packetdisconnect.c
packetqueue.c
packetping.c
)

View File

@ -9,16 +9,19 @@
#include "dusk.h"
#include "packetwelcome.h"
#include "packetdisconnect.h"
#include "packetping.h"
typedef enum {
PACKET_TYPE_INVALID = 0,
PACKET_TYPE_WELCOME = 1,
PACKET_TYPE_DISCONNECT = 2,
PACKET_TYPE_PING = 3,
} packettype_t;
typedef union {
packetwelcome_t welcome;
packetdisconnect_t disconnect;
packetping_t ping;
} packetdata_t;
typedef struct packet_s {

26
src/packet/packetping.c Normal file
View File

@ -0,0 +1,26 @@
/**
* Copyright (c) 2025 Dominic Masters
*
* This software is released under the MIT License.
* https://opensource.org/licenses/MIT
*/
#include "packet.h"
#include "util/memory.h"
#include "assert/assert.h"
void packetPingCreate(packet_t *packet) {
packetInit(packet, PACKET_TYPE_PING, sizeof(packetping_t));
packet->data.ping.number = GetRandomValue(0, INT32_MAX);
}
errorret_t packetPingClient(packet_t *packet) {
assertNotNull(packet, "Packet is NULL");
assertTrue(packet->type == PACKET_TYPE_PING, "Packet type is not PING");
if(packet->length != sizeof(packetping_t)) {
return error("Ping packet length is not %d", sizeof(packetping_t));
}
return ERROR_OK;
}

28
src/packet/packetping.h Normal file
View File

@ -0,0 +1,28 @@
/**
* Copyright (c) 2025 Dominic Masters
*
* This software is released under the MIT License.
* https://opensource.org/licenses/MIT
*/
#pragma once
#include "error/error.h"
typedef struct {
int32_t number;
} packetping_t;
/**
* Creates a ping packet.
*
* @param packet Pointer to the packet structure to initialize.
*/
void packetPingCreate(packet_t *packet);
/**
* Handles a ping packet received FROM a server INTO a client.
*
* @param packet Pointer to the packet structure to handle.
* @return ERROR_OK on success, or an error code on failure.
*/
errorret_t packetPingClient(packet_t *packet);

35
src/packet/packetqueue.c Normal file
View File

@ -0,0 +1,35 @@
/**
* Copyright (c) 2025 Dominic Masters
*
* This software is released under the MIT License.
* https://opensource.org/licenses/MIT
*/
#include "packetqueue.h"
#include "assert/assert.h"
#include "util/memory.h"
void packetQueueInit(packetqueue_t *queue) {
assertNotNull(queue, "Packet queue is NULL");
memoryZero(queue, sizeof(packetqueue_t));
}
void packetQueuePushIn(packetqueue_t *queue, const packet_t *packet) {
assertNotNull(queue, "Packet queue is NULL");
assertNotNull(packet, "Packet is NULL");
assertTrue(
queue->packetsInCount < PACKET_QUEUE_MAX_SIZE, "Packet queue is full"
);
queue->packetsIn[queue->packetsInCount++] = *packet;
}
void packetQueuePushOut(packetqueue_t *queue, const packet_t *packet) {
assertNotNull(queue, "Packet queue is NULL");
assertNotNull(packet, "Packet is NULL");
assertTrue(
queue->packetsOutCount < PACKET_QUEUE_MAX_SIZE, "Packet queue is full"
);
queue->packetsOut[queue->packetsOutCount++] = *packet;
}

49
src/packet/packetqueue.h Normal file
View File

@ -0,0 +1,49 @@
/**
* Copyright (c) 2025 Dominic Masters
*
* This software is released under the MIT License.
* https://opensource.org/licenses/MIT
*/
#pragma once
#include "packet/packet.h"
#define PACKET_QUEUE_MAX_SIZE 512
typedef struct {
packet_t packetsIn[PACKET_QUEUE_MAX_SIZE];
uint32_t packetsInCount;
packet_t packetsOut[PACKET_QUEUE_MAX_SIZE];
uint32_t packetsOutCount;
// Mutex for thread safety
pthread_mutex_t mutex;
// Condition variables for signaling
pthread_cond_t condIn;
pthread_cond_t condOut;
// Thread ID of the thread that owns the queue
} packetqueue_t;
/**
* Initializes the packet queue.
*
* @param queue Pointer to the packet queue structure.
*/
void packetQueueInit(packetqueue_t *queue);
/**
* Pushes a packet into the inbound packet queue.
*
* @param queue Pointer to the packet queue structure.
* @param packet Pointer to the packet to be pushed.
*/
void packetQueuePushIn(packetqueue_t *queue, const packet_t *packet);
/**
* Pushes a packet from the outbound packet queue.
*
* @param queue Pointer to the packet queue structure.
* @param packet Pointer to the packet to be pushed.
*/
void packetQueuePushOut(packetqueue_t *queue, const packet_t *packet);

View File

@ -11,6 +11,5 @@ target_sources(${DUSK_TARGET_NAME}
)
# Subdirs
add_subdirectory(packet)
add_subdirectory(networked)
add_subdirectory(singleplayer)

View File

@ -113,7 +113,7 @@ void networkedServerClientCloseOnThread(
}
ssize_t networkedServerClientRead(
serverclient_t * client,
const serverclient_t * client,
uint8_t *buffer,
const size_t len
) {
@ -136,6 +136,60 @@ ssize_t networkedServerClientRead(
return recv(client->networked.socket, buffer, len, 0);
}
errorret_t networkedServerClientReadPacket(
const serverclient_t * client,
packet_t *packet
) {
uint8_t buffer[sizeof(packet_t)];
ssize_t read;
assertNotNull(client, "Client is NULL");
assertNotNull(packet, "Packet is NULL");
assertTrue(
client->server->type == SERVER_TYPE_NETWORKED,
"Server is not networked"
);
// Read packet ID
read = networkedServerClientRead(client, buffer, sizeof(packettype_t));
if(read != sizeof(packettype_t)) {
return error("Failed to read packet ID");
}
packet->type = *(packettype_t *)buffer;
if(packet->type == PACKET_TYPE_INVALID) {
return error("Invalid packet type");
}
// Read length
read = networkedServerClientRead(
client,
buffer,
sizeof(uint32_t)
);
if(read != sizeof(uint32_t)) {
return error("Failed to read packet length");
}
packet->length = *(uint32_t *)buffer;
if(packet->length > sizeof(packetdata_t)) {
return error("Packet length is too large");
}
// Read data
read = networkedServerClientRead(
client,
(uint8_t *)&packet->data,
packet->length
);
if(read != packet->length) {
return error("Failed to read packet data");
}
return ERROR_OK;
}
errorret_t networkedServerClientWrite(
serverclient_t * client,
const uint8_t *data,
@ -225,18 +279,17 @@ void * networkedServerClientThread(void *arg) {
return NULL;
}
// Start listening for packets.
client->state = SERVER_CLIENT_STATE_CONNECTED;
while(client->state == SERVER_CLIENT_STATE_CONNECTED) {
read = networkedServerClientRead(client, buffer, sizeof(buffer));
if(read <= 0) {
networkedServerClientCloseOnThread(client, "Failed to receive data");
err = networkedServerClientReadPacket(client, &packet);
if(err != ERROR_OK) {
errorPrint();
networkedServerClientCloseOnThread(client, "Failed to read packet");
break;
}
buffer[read] = '\0'; // Null-terminate the string
consolePrint("Received: %s", buffer);
printf("Received packet type %d\n", packet.type);
if(SERVER.state != SERVER_STATE_RUNNING) {
packetDisconnectCreate(

View File

@ -60,7 +60,7 @@ void networkedServerClientCloseOnThread(
* @return Number of bytes received. 0 or less indicates an error.
*/
ssize_t networkedServerClientRead(
serverclient_t * client,
const serverclient_t * client,
uint8_t *buffer,
const size_t len
);

View File

@ -20,10 +20,12 @@ errorret_t serverClientAccept(
assertNotMainThread("Server client accept must not be main thread");
client->server = accept.server;
packetQueueInit(&client->packetQueue);
switch(accept.server->type) {
case SERVER_TYPE_NETWORKED:
ret = networkedServerClientAccept(client, accept);
break;
default:
assertUnreachable("Unknown server type");

View File

@ -6,8 +6,9 @@
*/
#pragma once
#include "server/packet/packet.h"
#include "packet/packet.h"
#include "server/networked/networkedserverclient.h"
#include "packet/packetqueue.h"
typedef struct server_s server_t;
@ -28,6 +29,8 @@ typedef struct serverclientaccept_s {
typedef struct serverclient_s {
server_t *server;
serverclientstate_t state;
packetqueue_t packetQueue;
union {
networkedserverclient_t networked;
};

View File

@ -8,4 +8,5 @@ target_sources(${DUSK_TARGET_NAME}
PRIVATE
memory.c
string.c
random.c
)

27
src/util/random.c Normal file
View File

@ -0,0 +1,27 @@
/**
* Copyright (c) 2025 Dominic Masters
*
* This software is released under the MIT License.
* https://opensource.org/licenses/MIT
*/
#include "random.h"
#include "assert/assert.h"
void randomInit() {
randomSeed(time(NULL));
}
void randomSeed(const uint32_t seed) {
srand(seed);
}
int32_t randomI32(const int32_t min, const int32_t max) {
assertTrue(min < max, "Min is not less than max");
return (rand() % (max - min)) + min;
}
float_t randomF32(const float_t min, const float_t max) {
assertTrue(min < max, "Min is not less than max");
return ((float_t)rand() / (float_t)RAND_MAX) * (max - min) + min;
}

43
src/util/random.h Normal file
View File

@ -0,0 +1,43 @@
/**
* Copyright (c) 2025 Dominic Masters
*
* This software is released under the MIT License.
* https://opensource.org/licenses/MIT
*/
#pragma once
#include "dusk.h"
/**
* Initializes the random number generator with a random seed.
*
* This function should be called once at the start of the program to
* initialize the random number generator with a random seed. It uses
* the current time to generate a seed.
*/
void randomInit();
/**
* Sets the random seed for the random number generator.
*
* @param seed The seed to set.
*/
void randomSeed(const uint32_t seed);
/**
* Generates a random integer between min and max.
*
* @param min The minimum value (inclusive).
* @param max The maximum value (exclusive).
* @return A random integer between min and max.
*/
int32_t randomI32(const int32_t min, const int32_t max);
/**
* Generates a random float between min and max.
*
* @param min The minimum value (inclusive).
* @param max The maximum value (exclusive).
* @return A random float between min and max.
*/
float_t randomF32(const float_t min, const float_t max);