Setup server client to have two threads.

This commit is contained in:
2025-04-21 20:18:10 -05:00
parent 8108a2bb0f
commit 923726902b
6 changed files with 231 additions and 88 deletions

View File

@ -67,6 +67,10 @@ errorret_t networkedClientConnect(
pthread_mutex_init(&client->networked.lock, NULL);
pthread_cond_init(&client->networked.cond, NULL);
// Initialize read and write locks
pthread_mutex_init(&client->networked.readLock, NULL);
pthread_mutex_init(&client->networked.writeLock, NULL);
// Send the version
{
const char_t *message = "DUSK|"DUSK_VERSION;
@ -96,22 +100,41 @@ errorret_t networkedClientConnect(
return error("Server did not send welcome message.");
}
// Connection was established, hand off to thread
// Connection was established, hand off to read thread
ret = pthread_create(
&client->networked.thread,
&client->networked.readThread,
NULL,
networkedClientThread,
networkedClientReadThread,
client
);
if(ret != 0) {
close(client->networked.socket);
return error("Failed to create client thread %s", strerror(errno));
pthread_mutex_destroy(&client->networked.readLock);
pthread_mutex_destroy(&client->networked.writeLock);
return error("Failed to create client read thread %s", strerror(errno));
}
// Wait for the thread to start
while(client->state == CLIENT_STATE_CONNECTING) {
usleep(1000);
// Wait for the read thread to signal that it has started
pthread_mutex_lock(&client->networked.lock);
while (client->state == CLIENT_STATE_CONNECTING) {
pthread_cond_wait(&client->networked.cond, &client->networked.lock);
}
pthread_mutex_unlock(&client->networked.lock);
// Start the write thread after the handshake
ret = pthread_create(
&client->networked.writeThread,
NULL,
networkedClientWriteThread, // Renamed from networkedClientRightThread
client
);
if(ret != 0) {
close(client->networked.socket);
pthread_mutex_destroy(&client->networked.readLock);
pthread_mutex_destroy(&client->networked.writeLock);
return error("Failed to create client write thread %s", strerror(errno)); // Updated error message
}
return ERROR_OK;
@ -131,7 +154,11 @@ void networkedClientDisconnect(client_t *client) {
timeout.tv_sec += 1; // Wait for up to 1 second
while (client->state == CLIENT_STATE_DISCONNECTING) {
if (pthread_cond_timedwait(&client->networked.cond, &client->networked.lock, &timeout) == ETIMEDOUT) {
if(pthread_cond_timedwait(
&client->networked.cond,
&client->networked.lock,
&timeout
) == ETIMEDOUT) {
consolePrint("Client disconnect timed out, force closing");
break;
}
@ -140,11 +167,22 @@ void networkedClientDisconnect(client_t *client) {
client->state = CLIENT_STATE_DISCONNECTED;
if (client->networked.thread) {
pthread_join(client->networked.thread, NULL);
client->networked.thread = 0;
// Wait for the threads to finish
if (client->networked.readThread) {
pthread_join(client->networked.readThread, NULL);
client->networked.readThread = 0;
}
if (client->networked.writeThread) {
pthread_join(client->networked.writeThread, NULL);
client->networked.writeThread = 0;
}
// Destroy read and write locks
pthread_mutex_destroy(&client->networked.readLock);
pthread_mutex_destroy(&client->networked.writeLock);
// Close the socket
if (client->networked.socket) {
shutdown(client->networked.socket, SHUT_RDWR);
close(client->networked.socket);
@ -247,7 +285,7 @@ errorret_t networkedClientReadPacket(
return ERROR_OK;
}
void * networkedClientThread(void *arg) {
void * networkedClientReadThread(void *arg) {
assertNotNull(arg, "Client thread argument is NULL");
assertNotMainThread("Client thread must not be on main thread");
@ -262,18 +300,27 @@ void * networkedClientThread(void *arg) {
"Client thread argument is not connecting"
);
// Notify the main thread that we are connected
pthread_mutex_lock(&client->networked.lock);
client->state = CLIENT_STATE_CONNECTED;
pthread_cond_signal(&client->networked.cond); // Notify the main thread
pthread_cond_signal(&client->networked.cond);
pthread_mutex_unlock(&client->networked.lock);
// Main loop
while(client->state == CLIENT_STATE_CONNECTED) {
// errorret_t err = networkedClientReadPacket(client, &packet);
// if(err) {
// consolePrint("Error reading packet: %s", err.message);
// break;
// }
pthread_mutex_lock(&client->networked.readLock);
// Read a packet from the server
errorret_t err = networkedClientReadPacket(client, &packet);
if(err) {
consolePrint("Failed to read packet %s", errorString());
errorFlush();
break;
}
printf("Received packet type: %d, length: %d\n", packet.type, packet.length);
pthread_mutex_unlock(&client->networked.readLock);
}
client->state = CLIENT_STATE_DISCONNECTED;
@ -283,3 +330,27 @@ void * networkedClientThread(void *arg) {
pthread_cond_signal(&client->networked.cond); // Notify the main thread
pthread_mutex_unlock(&client->networked.lock);
}
void * networkedClientWriteThread(void *arg) { // Renamed from networkedClientRightThread
assertNotNull(arg, "Write thread argument is NULL");
assertNotMainThread("Write thread must not be on main thread");
client_t *client = (client_t *)arg;
assertTrue(
client->type == CLIENT_TYPE_NETWORKED,
"Write thread argument is not networked"
);
// Perform additional operations
while(client->state == CLIENT_STATE_CONNECTED) {
pthread_mutex_lock(&client->networked.writeLock); // Lock before writing
// Add logic for the write thread here
// Example: Writing packets to the client
// networkedClientWritePacket(client, &somePacket);
pthread_mutex_unlock(&client->networked.writeLock); // Unlock after writing
}
return NULL;
}

View File

@ -20,9 +20,12 @@ typedef struct {
typedef struct {
int32_t socket;
struct sockaddr_in address;
pthread_t thread;
pthread_t readThread;
pthread_t writeThread;
pthread_mutex_t lock;
pthread_cond_t cond;
pthread_mutex_t readLock;
pthread_mutex_t writeLock;
} networkedclient_t;
/**
@ -84,4 +87,12 @@ errorret_t networkedClientReadPacket(const client_t *client, packet_t *packet);
* @param arg Pointer to the client structure.
* @return NULL.
*/
void * networkedClientThread(void *arg);
void * networkedClientReadThread(void *arg); // Renamed from networkedClientThread
/**
* Thread function for handling additional client operations.
*
* @param arg Pointer to the client structure.
* @return NULL.
*/
void * networkedClientWriteThread(void *arg); // Renamed from networkedClientRightThread

View File

@ -18,7 +18,7 @@ void packetQueuePushIn(packetqueue_t *queue, const packet_t *packet) {
assertNotNull(queue, "Packet queue is NULL");
assertNotNull(packet, "Packet is NULL");
assertTrue(
queue->packetsInCount < PACKET_QUEUE_MAX_SIZE, "Packet queue is full"
queue->packetsInCount < PACKET_QUEUE_MAX_SIZE, "Inbound packet queue is full"
);
queue->packetsIn[queue->packetsInCount++] = *packet;
@ -28,8 +28,36 @@ void packetQueuePushOut(packetqueue_t *queue, const packet_t *packet) {
assertNotNull(queue, "Packet queue is NULL");
assertNotNull(packet, "Packet is NULL");
assertTrue(
queue->packetsOutCount < PACKET_QUEUE_MAX_SIZE, "Packet queue is full"
queue->packetsOutCount < PACKET_QUEUE_MAX_SIZE, "Outbound packet queue is full"
);
queue->packetsOut[queue->packetsOutCount++] = *packet;
}
int packetQueuePopIn(packetqueue_t *queue, packet_t *packet) {
assertNotNull(queue, "Packet queue is NULL");
assertNotNull(packet, "Packet is NULL");
if(queue->packetsInCount == 0) return 0;
*packet = queue->packetsIn[0];
for(uint32_t i = 1; i < queue->packetsInCount; i++) {
queue->packetsIn[i - 1] = queue->packetsIn[i];
}
queue->packetsInCount--;
return 1;
}
int packetQueuePopOut(packetqueue_t *queue, packet_t *packet) {
assertNotNull(queue, "Packet queue is NULL");
assertNotNull(packet, "Packet is NULL");
if(queue->packetsOutCount == 0) return 0;
*packet = queue->packetsOut[0];
for(uint32_t i = 1; i < queue->packetsOutCount; i++) {
queue->packetsOut[i - 1] = queue->packetsOut[i];
}
queue->packetsOutCount--;
return 1;
}

View File

@ -15,14 +15,6 @@ typedef struct {
uint32_t packetsInCount;
packet_t packetsOut[PACKET_QUEUE_MAX_SIZE];
uint32_t packetsOutCount;
// Mutex for thread safety
pthread_mutex_t mutex;
// Condition variables for signaling
pthread_cond_t condIn;
pthread_cond_t condOut;
// Thread ID of the thread that owns the queue
} packetqueue_t;
/**
@ -41,9 +33,27 @@ void packetQueueInit(packetqueue_t *queue);
void packetQueuePushIn(packetqueue_t *queue, const packet_t *packet);
/**
* Pushes a packet from the outbound packet queue.
* Pushes a packet into the outbound packet queue.
*
* @param queue Pointer to the packet queue structure.
* @param packet Pointer to the packet to be pushed.
*/
void packetQueuePushOut(packetqueue_t *queue, const packet_t *packet);
/**
* Pops a packet from the inbound packet queue.
*
* @param queue Pointer to the packet queue structure.
* @param packet Pointer to the packet to store the popped packet.
* @return 1 if a packet was popped, 0 otherwise.
*/
int packetQueuePopIn(packetqueue_t *queue, packet_t *packet);
/**
* Pops a packet from the outbound packet queue.
*
* @param queue Pointer to the packet queue structure.
* @param packet Pointer to the packet to store the popped packet.
* @return 1 if a packet was popped, 0 otherwise.
*/
int packetQueuePopOut(packetqueue_t *queue, packet_t *packet);

View File

@ -29,20 +29,20 @@ errorret_t networkedServerClientAccept(
client->networked.timeout.tv_sec = 8;
client->networked.timeout.tv_usec = 0;
// Initialize mutex and condition variable
pthread_mutex_init(&client->networked.lock, NULL);
pthread_cond_init(&client->networked.cond, NULL);
// Initialize mutexs
pthread_mutex_init(&client->networked.readLock, NULL);
pthread_mutex_init(&client->networked.writeLock, NULL);
// Create a thread for the client
// Create a read thread for the client
int32_t ret = pthread_create(
&client->networked.thread,
&client->networked.readThread,
NULL,
networkedServerClientThread,
networkedServerClientReadThread,
client
);
if(ret != 0) {
client->state = SERVER_CLIENT_STATE_DISCONNECTED;
return error("Failed to create client thread");
return error("Failed to create client read thread");
}
// Set socket timeout
@ -69,20 +69,21 @@ void networkedServerClientClose(serverclient_t *client) {
);
// Mark client as disconnecting
pthread_mutex_lock(&client->networked.lock);
client->state = SERVER_CLIENT_STATE_DISCONNECTING;
// Signal the condition variable to wake up the thread
pthread_cond_signal(&client->networked.cond);
pthread_mutex_unlock(&client->networked.lock);
// Wait for the read thread to finish
pthread_mutex_lock(&client->networked.readLock);
pthread_mutex_unlock(&client->networked.readLock);
pthread_join(client->networked.readThread, NULL);
client->networked.readThread = 0;
pthread_mutex_destroy(&client->networked.readLock);
// Wait for the thread to finish
pthread_join(client->networked.thread, NULL);
client->networked.thread = 0;
// Destroy mutex and condition variable
pthread_mutex_destroy(&client->networked.lock);
pthread_cond_destroy(&client->networked.cond);
// Signal and wait for the write thread to finish
pthread_mutex_lock(&client->networked.writeLock);
pthread_mutex_unlock(&client->networked.writeLock);
pthread_join(client->networked.writeThread, NULL);
client->networked.writeThread = 0;
pthread_mutex_destroy(&client->networked.writeLock);
// Close the socket
if(client->networked.socket != -1) {
@ -106,17 +107,12 @@ void networkedServerClientCloseOnThread(
);
assertNotMainThread("Client close must not be main thread");
pthread_mutex_lock(&client->networked.lock);
client->state = SERVER_CLIENT_STATE_DISCONNECTING;
// Signal the condition variable to wake up the thread
pthread_cond_signal(&client->networked.cond);
pthread_mutex_unlock(&client->networked.lock);
// Terminate the socket
close(client->networked.socket);
client->networked.socket = -1;
client->networked.thread = 0;
client->networked.readThread = 0;
consolePrint("Client %d disconnected: %s", client->networked.socket, reason);
// Mark this client as disconnected so it can be used again.
@ -241,7 +237,7 @@ errorret_t networkedServerClientWritePacket(
return networkedServerClientWrite(client, (const uint8_t *)packet, fullSize);
}
void * networkedServerClientThread(void *arg) {
void * networkedServerClientReadThread(void *arg) {
assertNotNull(arg, "Client is NULL");
assertNotMainThread("Client thread must not be main thread");
@ -273,7 +269,6 @@ void * networkedServerClientThread(void *arg) {
return NULL;
}
buffer[read] = '\0'; // Null-terminate the string
if(strncmp(buffer, expecting, strlen(expecting)) != 0) {
packetDisconnectCreate(&packet, PACKET_DISCONNECT_REASON_INVALID_VERSION);
@ -291,35 +286,54 @@ void * networkedServerClientThread(void *arg) {
return NULL;
}
// Start listening for packets.
// Client is connected.
client->state = SERVER_CLIENT_STATE_CONNECTED;
while(client->state == SERVER_CLIENT_STATE_CONNECTED) {
pthread_mutex_lock(&client->networked.lock);
// Wait for a signal if the client is disconnecting
while(client->state == SERVER_CLIENT_STATE_DISCONNECTING) {
pthread_cond_wait(&client->networked.cond, &client->networked.lock);
}
pthread_mutex_unlock(&client->networked.lock);
// ...existing code for reading packets...
if(SERVER.state != SERVER_STATE_RUNNING) {
packetDisconnectCreate(
&packet,
PACKET_DISCONNECT_REASON_SERVER_SHUTDOWN
);
networkedServerClientWritePacket(client, &packet);
if(errorCheck()) errorPrint();
networkedServerClientCloseOnThread(client, "Server is shutting down");
break;
}
// Start the write thread after the handshake
int32_t ret = pthread_create(
&client->networked.writeThread,
NULL,
networkedServerClientWriteThread,
client
);
if(ret != 0) {
networkedServerClientCloseOnThread(client, "Failed to create write thread");
return NULL;
}
pthread_mutex_lock(&client->networked.lock);
// Start listening for packets.
while(client->state == SERVER_CLIENT_STATE_CONNECTED) {
pthread_mutex_lock(&client->networked.readLock);
pthread_mutex_unlock(&client->networked.readLock);
}
pthread_mutex_lock(&client->networked.readLock);
client->state = SERVER_CLIENT_STATE_DISCONNECTED;
pthread_mutex_unlock(&client->networked.lock);
pthread_mutex_unlock(&client->networked.readLock);
return NULL;
}
void * networkedServerClientWriteThread(void *arg) {
assertNotNull(arg, "Client is NULL");
assertNotMainThread("Client thread must not be main thread");
assertTrue(
((serverclient_t *)arg)->server->type == SERVER_TYPE_NETWORKED,
"Server is not networked"
);
assertTrue(
((serverclient_t *)arg)->state == SERVER_CLIENT_STATE_CONNECTED,
"Client is not connected"
);
serverclient_t *client = (serverclient_t *)arg;
while(client->state == SERVER_CLIENT_STATE_CONNECTED) {
pthread_mutex_lock(&client->networked.writeLock);
pthread_mutex_unlock(&client->networked.writeLock);
}
return NULL;
}

View File

@ -17,10 +17,11 @@ typedef struct {
typedef struct {
int32_t socket;
pthread_t thread;
pthread_t readThread;
pthread_t writeThread;
struct timeval timeout;
pthread_mutex_t lock;
pthread_cond_t cond;
pthread_mutex_t readLock;
pthread_mutex_t writeLock;
} networkedserverclient_t;
/**
@ -94,9 +95,17 @@ errorret_t networkedServerClientWritePacket(
);
/**
* Thread function for handling a networked server client.
* Thread function for handling reading from a networked server client.
*
* @param arg Pointer to the server client structure.
* @return NULL.
*/
void * networkedServerClientThread(void *arg);
void * networkedServerClientReadThread(void *arg);
/**
* Thread function for handling writing to a networked server client.
*
* @param arg Pointer to the server client structure.
* @return NULL.
*/
void * networkedServerClientWriteThread(void *arg);