Switch to an efficient pool of daemon threads instead of thread per websocket and sleeping!

This commit is contained in:
kohlerpop1
2025-08-14 00:10:23 -04:00
parent 1b27d6074c
commit 183aadb8e8
2 changed files with 36 additions and 25 deletions

View File

@@ -142,7 +142,7 @@ public class TikTokWebSocketClient implements LiveSocketClient {
case 2 -> webSocketClient.closeConnection(CloseFrame.NORMAL, ""); case 2 -> webSocketClient.closeConnection(CloseFrame.NORMAL, "");
default -> webSocketClient.close(); default -> webSocketClient.close();
} }
heartbeatTask.stop(); heartbeatTask.stop(webSocketClient);
} }
webSocketClient = null; webSocketClient = null;
} }

View File

@@ -24,41 +24,52 @@ package io.github.jwdeveloper.tiktok.websocket;
import org.java_websocket.WebSocket; import org.java_websocket.WebSocket;
import java.util.Base64; import java.util.*;
import java.util.concurrent.*;
public class WebSocketHeartbeatTask public class WebSocketHeartbeatTask
{ {
private Thread thread; // Single shared pool for all heartbeat tasks
private boolean isRunning = false; private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, r -> {
private final int MAX_TIMEOUT = 250; Thread t = new Thread(r, "heartbeat-pool");
private final int SLEEP_TIME = 500; t.setDaemon(true);
return t;
});
private final Map<WebSocket, ScheduledFuture<?>> tasks = new ConcurrentHashMap<>();
private final Map<WebSocket, Long> commTime = new ConcurrentHashMap<>();
private final byte[] heartbeatBytes = Base64.getDecoder().decode("MgJwYjoCaGI="); // Used to be '3A026862' aka ':\x02hb', now is '2\x02pb:\x02hb'. private final byte[] heartbeatBytes = Base64.getDecoder().decode("MgJwYjoCaGI="); // Used to be '3A026862' aka ':\x02hb', now is '2\x02pb:\x02hb'.
public void run(WebSocket webSocket, long pingTaskTime) { public void run(WebSocket webSocket, long pingTaskTime) {
stop(); stop(webSocket); // remove existing task if any
thread = new Thread(() -> heartbeatTask(webSocket, pingTaskTime), "heartbeat-task");
isRunning = true;
thread.start();
}
public void stop() { tasks.put(webSocket, scheduler.scheduleAtFixedRate(() -> {
if (thread != null)
thread.interrupt();
isRunning = false;
}
private void heartbeatTask(WebSocket webSocket, long pingTaskTime) {
while (isRunning) {
try { try {
if (webSocket.isOpen()) { if (webSocket.isOpen()) {
webSocket.send(heartbeatBytes); webSocket.send(heartbeatBytes);
Thread.sleep(pingTaskTime + (int) (Math.random() * MAX_TIMEOUT)); commTime.put(webSocket, System.currentTimeMillis());
} else } else {
Thread.sleep(SLEEP_TIME); Long time = commTime.get(webSocket);
if (time != null && System.currentTimeMillis() - time >= 60_000) // Stop if disconnected longer than 60s
stop(webSocket);
}
} catch (Exception e) { } catch (Exception e) {
//TODO we should display some kind of error message e.printStackTrace();
isRunning = false; stop(webSocket);
} }
} }, 0, pingTaskTime, TimeUnit.MILLISECONDS));
}
public void stop(WebSocket webSocket) {
ScheduledFuture<?> future = tasks.remove(webSocket);
if (future != null)
future.cancel(true);
commTime.remove(webSocket);
}
public void shutdown() {
tasks.values().forEach(f -> f.cancel(true));
commTime.clear();
scheduler.shutdownNow();
} }
} }