Replaced tight loops in server/client with mutexes

This commit is contained in:
2025-04-21 19:26:36 -05:00
parent 4aaabb3882
commit 8108a2bb0f
7 changed files with 113 additions and 56 deletions

View File

@ -63,6 +63,10 @@ errorret_t networkedClientConnect(
} }
} }
// Initialize mutex and condition variable
pthread_mutex_init(&client->networked.lock, NULL);
pthread_cond_init(&client->networked.cond, NULL);
// Send the version // Send the version
{ {
const char_t *message = "DUSK|"DUSK_VERSION; const char_t *message = "DUSK|"DUSK_VERSION;
@ -119,29 +123,37 @@ void networkedClientDisconnect(client_t *client) {
assertTrue(client->type == CLIENT_TYPE_NETWORKED, "Client is not networked"); assertTrue(client->type == CLIENT_TYPE_NETWORKED, "Client is not networked");
assertTrue(client->state == CLIENT_STATE_CONNECTED, "Client not connected"); assertTrue(client->state == CLIENT_STATE_CONNECTED, "Client not connected");
pthread_mutex_lock(&client->networked.lock);
client->state = CLIENT_STATE_DISCONNECTING; client->state = CLIENT_STATE_DISCONNECTING;
int32_t maxAttempts = 0; struct timespec timeout;
while(client->state == CLIENT_STATE_DISCONNECTING) { clock_gettime(CLOCK_REALTIME, &timeout);
usleep(100 * 1000);// Sleep for 100ms timeout.tv_sec += 1; // Wait for up to 1 second
maxAttempts++;
if(maxAttempts > 5) { while (client->state == CLIENT_STATE_DISCONNECTING) {
if (pthread_cond_timedwait(&client->networked.cond, &client->networked.lock, &timeout) == ETIMEDOUT) {
consolePrint("Client disconnect timed out, force closing"); consolePrint("Client disconnect timed out, force closing");
break; break;
} }
} }
pthread_mutex_unlock(&client->networked.lock);
client->state = CLIENT_STATE_DISCONNECTED; client->state = CLIENT_STATE_DISCONNECTED;
if(client->networked.thread) {
if (client->networked.thread) {
pthread_join(client->networked.thread, NULL); pthread_join(client->networked.thread, NULL);
client->networked.thread = 0; client->networked.thread = 0;
} }
if(client->networked.socket) { if (client->networked.socket) {
shutdown(client->networked.socket, SHUT_RDWR); shutdown(client->networked.socket, SHUT_RDWR);
close(client->networked.socket); close(client->networked.socket);
client->networked.socket = 0; client->networked.socket = 0;
} }
// Destroy mutex and condition variable
pthread_mutex_destroy(&client->networked.lock);
pthread_cond_destroy(&client->networked.cond);
} }
errorret_t networkedClientWrite( errorret_t networkedClientWrite(
@ -250,17 +262,24 @@ void * networkedClientThread(void *arg) {
"Client thread argument is not connecting" "Client thread argument is not connecting"
); );
pthread_mutex_lock(&client->networked.lock);
client->state = CLIENT_STATE_CONNECTED; client->state = CLIENT_STATE_CONNECTED;
pthread_cond_signal(&client->networked.cond); // Notify the main thread
pthread_mutex_unlock(&client->networked.lock);
// Main loop
while(client->state == CLIENT_STATE_CONNECTED) { while(client->state == CLIENT_STATE_CONNECTED) {
usleep(1000*1000);// Sleep for 1 second // errorret_t err = networkedClientReadPacket(client, &packet);
// if(err) {
// packetPingCreate(&packet); // consolePrint("Error reading packet: %s", err.message);
// packetQueuePushOutbound( // break;
// &client->packetQueue, // }
// &packet
// );
} }
client->state = CLIENT_STATE_DISCONNECTED; client->state = CLIENT_STATE_DISCONNECTED;
pthread_mutex_lock(&client->networked.lock);
client->state = CLIENT_STATE_DISCONNECTED;
pthread_cond_signal(&client->networked.cond); // Notify the main thread
pthread_mutex_unlock(&client->networked.lock);
} }

View File

@ -21,6 +21,8 @@ typedef struct {
int32_t socket; int32_t socket;
struct sockaddr_in address; struct sockaddr_in address;
pthread_t thread; pthread_t thread;
pthread_mutex_t lock;
pthread_cond_t cond;
} networkedclient_t; } networkedclient_t;
/** /**

View File

@ -50,6 +50,10 @@ errorret_t networkedServerStart(
return error("Failed to listen on socket"); return error("Failed to listen on socket");
} }
// Initialize mutex and condition variable
pthread_mutex_init(&server->networked.lock, NULL);
pthread_cond_init(&server->networked.cond, NULL);
// Start the server thread. // Start the server thread.
server->state = SERVER_STATE_STARTING; server->state = SERVER_STATE_STARTING;
res = pthread_create(&server->thread, NULL, networkedServerThread, server); res = pthread_create(&server->thread, NULL, networkedServerThread, server);
@ -60,9 +64,11 @@ errorret_t networkedServerStart(
} }
// Wait for the thread to start. // Wait for the thread to start.
pthread_mutex_lock(&server->networked.lock);
while(server->state == SERVER_STATE_STARTING) { while(server->state == SERVER_STATE_STARTING) {
usleep(1000); pthread_cond_wait(&server->networked.cond, &server->networked.lock);
} }
pthread_mutex_unlock(&server->networked.lock);
// Server started, hand back. // Server started, hand back.
consolePrint("Server started."); consolePrint("Server started.");
@ -75,8 +81,11 @@ void networkedServerStop(server_t *server) {
assertTrue(server->state != SERVER_STATE_STARTING, "Server is starting"); assertTrue(server->state != SERVER_STATE_STARTING, "Server is starting");
assertIsMainThread("Server stop must be on main thread"); assertIsMainThread("Server stop must be on main thread");
// Tell thread we want it to stop // Notify thread to stop
pthread_mutex_lock(&server->networked.lock);
server->state = SERVER_STATE_STOPPING; server->state = SERVER_STATE_STOPPING;
pthread_cond_signal(&server->networked.cond);
pthread_mutex_unlock(&server->networked.lock);
// Disconnect clients // Disconnect clients
uint8_t i = 0; uint8_t i = 0;
@ -86,17 +95,22 @@ void networkedServerStop(server_t *server) {
serverClientClose(client); serverClientClose(client);
} while(i < SERVER_MAX_CLIENTS); } while(i < SERVER_MAX_CLIENTS);
// Now we wait a short time for the thread to stop. // Wait for the thread to stop
int32_t maxWaits = 0; int32_t maxWaits = 0;
while(SERVER.state == SERVER_STATE_STOPPING) { pthread_mutex_lock(&server->networked.lock);
usleep(1000); while(server->state == SERVER_STATE_STOPPING) {
maxWaits++; if(maxWaits++ >= 1000) {
if(maxWaits < 1000) continue;
consolePrint("Server thread did not stop in time, forcing exit."); consolePrint("Server thread did not stop in time, forcing exit.");
SERVER.state = SERVER_STATE_STOPPED; server->state = SERVER_STATE_STOPPED;
break; break;
} }
pthread_cond_wait(&server->networked.cond, &server->networked.lock);
}
pthread_mutex_unlock(&server->networked.lock);
// Destroy mutex and condition variable
pthread_mutex_destroy(&server->networked.lock);
pthread_cond_destroy(&server->networked.cond);
// Close the socket // Close the socket
if(server->networked.socket >= 0) { if(server->networked.socket >= 0) {
@ -116,15 +130,18 @@ void * networkedServerThread(void *arg) {
assertNotMainThread("Server thread must not be main thread"); assertNotMainThread("Server thread must not be main thread");
server_t *server = (server_t *)arg; server_t *server = (server_t *)arg;
// Notify main thread that the server is running
pthread_mutex_lock(&server->networked.lock);
server->state = SERVER_STATE_RUNNING;
pthread_cond_signal(&server->networked.cond);
pthread_mutex_unlock(&server->networked.lock);
struct timeval timeout; struct timeval timeout;
fd_set readfds; fd_set readfds;
server->state = SERVER_STATE_RUNNING;
// Main thread loop. // Main thread loop.
while(server->state == SERVER_STATE_RUNNING) { while(server->state == SERVER_STATE_RUNNING) {
// Wait a tiny bit to avoid large CPU usage.
usleep(1000);
// Prepare the select call, used for waiting for incoming connections. // Prepare the select call, used for waiting for incoming connections.
FD_ZERO(&readfds); FD_ZERO(&readfds);
FD_SET(server->networked.socket, &readfds); FD_SET(server->networked.socket, &readfds);
@ -140,14 +157,8 @@ void * networkedServerThread(void *arg) {
&timeout &timeout
); );
// Timeout // Timeout or no activity
if(activity == 0) continue; if(activity <= 0) continue;
// Check for errors
if(activity < 0) {
consolePrint("Error in select");
continue;
}
// Check if there is a new connection // Check if there is a new connection
if(!FD_ISSET(server->networked.socket, &readfds)) continue; if(!FD_ISSET(server->networked.socket, &readfds)) continue;
@ -211,7 +222,11 @@ void * networkedServerThread(void *arg) {
"Server thread exiting while server is running?" "Server thread exiting while server is running?"
); );
// Notify main thread we are stopped. // Notify main thread that the server has stopped
pthread_mutex_lock(&server->networked.lock);
server->state = SERVER_STATE_STOPPED; server->state = SERVER_STATE_STOPPED;
pthread_cond_signal(&server->networked.cond);
pthread_mutex_unlock(&server->networked.lock);
return NULL; return NULL;
} }

View File

@ -17,6 +17,8 @@ typedef struct {
typedef struct { typedef struct {
int socket; int socket;
struct sockaddr_in address; struct sockaddr_in address;
pthread_mutex_t lock;
pthread_cond_t cond;
} networkedserver_t; } networkedserver_t;
typedef struct server_s server_t; typedef struct server_s server_t;

View File

@ -29,6 +29,10 @@ errorret_t networkedServerClientAccept(
client->networked.timeout.tv_sec = 8; client->networked.timeout.tv_sec = 8;
client->networked.timeout.tv_usec = 0; client->networked.timeout.tv_usec = 0;
// Initialize mutex and condition variable
pthread_mutex_init(&client->networked.lock, NULL);
pthread_cond_init(&client->networked.cond, NULL);
// Create a thread for the client // Create a thread for the client
int32_t ret = pthread_create( int32_t ret = pthread_create(
&client->networked.thread, &client->networked.thread,
@ -65,19 +69,21 @@ void networkedServerClientClose(serverclient_t *client) {
); );
// Mark client as disconnecting // Mark client as disconnecting
pthread_mutex_lock(&client->networked.lock);
client->state = SERVER_CLIENT_STATE_DISCONNECTING; client->state = SERVER_CLIENT_STATE_DISCONNECTING;
int32_t maxWaits = 0; // Signal the condition variable to wake up the thread
while(client->state == SERVER_CLIENT_STATE_DISCONNECTING) { pthread_cond_signal(&client->networked.cond);
// Wait for the thread to finish pthread_mutex_unlock(&client->networked.lock);
usleep(1000);
maxWaits++;
if(maxWaits > 10) break;
}
// Wait for the thread to finish
pthread_join(client->networked.thread, NULL); pthread_join(client->networked.thread, NULL);
client->networked.thread = 0; client->networked.thread = 0;
// Destroy mutex and condition variable
pthread_mutex_destroy(&client->networked.lock);
pthread_cond_destroy(&client->networked.cond);
// Close the socket // Close the socket
if(client->networked.socket != -1) { if(client->networked.socket != -1) {
close(client->networked.socket); close(client->networked.socket);
@ -100,8 +106,14 @@ void networkedServerClientCloseOnThread(
); );
assertNotMainThread("Client close must not be main thread"); assertNotMainThread("Client close must not be main thread");
// Terminate the socket pthread_mutex_lock(&client->networked.lock);
client->state = SERVER_CLIENT_STATE_DISCONNECTING; client->state = SERVER_CLIENT_STATE_DISCONNECTING;
// Signal the condition variable to wake up the thread
pthread_cond_signal(&client->networked.cond);
pthread_mutex_unlock(&client->networked.lock);
// Terminate the socket
close(client->networked.socket); close(client->networked.socket);
client->networked.socket = -1; client->networked.socket = -1;
client->networked.thread = 0; client->networked.thread = 0;
@ -282,14 +294,16 @@ void * networkedServerClientThread(void *arg) {
// Start listening for packets. // Start listening for packets.
client->state = SERVER_CLIENT_STATE_CONNECTED; client->state = SERVER_CLIENT_STATE_CONNECTED;
while(client->state == SERVER_CLIENT_STATE_CONNECTED) { while(client->state == SERVER_CLIENT_STATE_CONNECTED) {
err = networkedServerClientReadPacket(client, &packet); pthread_mutex_lock(&client->networked.lock);
if(err != ERROR_OK) {
errorPrint(); // Wait for a signal if the client is disconnecting
networkedServerClientCloseOnThread(client, "Failed to read packet"); while(client->state == SERVER_CLIENT_STATE_DISCONNECTING) {
break; pthread_cond_wait(&client->networked.cond, &client->networked.lock);
} }
printf("Received packet type %d\n", packet.type); pthread_mutex_unlock(&client->networked.lock);
// ...existing code for reading packets...
if(SERVER.state != SERVER_STATE_RUNNING) { if(SERVER.state != SERVER_STATE_RUNNING) {
packetDisconnectCreate( packetDisconnectCreate(
@ -303,6 +317,9 @@ void * networkedServerClientThread(void *arg) {
} }
} }
pthread_mutex_lock(&client->networked.lock);
client->state = SERVER_CLIENT_STATE_DISCONNECTED; client->state = SERVER_CLIENT_STATE_DISCONNECTED;
pthread_mutex_unlock(&client->networked.lock);
return NULL; return NULL;
} }

View File

@ -19,6 +19,8 @@ typedef struct {
int32_t socket; int32_t socket;
pthread_t thread; pthread_t thread;
struct timeval timeout; struct timeval timeout;
pthread_mutex_t lock;
pthread_cond_t cond;
} networkedserverclient_t; } networkedserverclient_t;
/** /**

View File

@ -26,7 +26,7 @@ void cmdStart(const consolecmdexec_t *exec) {
return; return;
} }
} else { } else {
start.type = SERVER_TYPE_SINGLE_PLAYER; start.type = SERVER_TYPE_NETWORKED;
} }
if(exec->argc > 1) { if(exec->argc > 1) {