diff --git a/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/TikTokWebSocketClient.java b/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/TikTokWebSocketClient.java index 498acf3..aa5a630 100644 --- a/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/TikTokWebSocketClient.java +++ b/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/TikTokWebSocketClient.java @@ -142,7 +142,7 @@ public class TikTokWebSocketClient implements LiveSocketClient { case 2 -> webSocketClient.closeConnection(CloseFrame.NORMAL, ""); default -> webSocketClient.close(); } - heartbeatTask.stop(); + heartbeatTask.stop(webSocketClient); } webSocketClient = null; } diff --git a/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/WebSocketHeartbeatTask.java b/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/WebSocketHeartbeatTask.java index 504dd20..8a385ac 100644 --- a/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/WebSocketHeartbeatTask.java +++ b/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/WebSocketHeartbeatTask.java @@ -24,41 +24,52 @@ package io.github.jwdeveloper.tiktok.websocket; import org.java_websocket.WebSocket; -import java.util.Base64; +import java.util.*; +import java.util.concurrent.*; public class WebSocketHeartbeatTask { - private Thread thread; - private boolean isRunning = false; - private final int MAX_TIMEOUT = 250; - private final int SLEEP_TIME = 500; + // Single shared pool for all heartbeat tasks + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, r -> { + Thread t = new Thread(r, "heartbeat-pool"); + t.setDaemon(true); + return t; + }); + private final Map> tasks = new ConcurrentHashMap<>(); + private final Map commTime = new ConcurrentHashMap<>(); + private final byte[] heartbeatBytes = Base64.getDecoder().decode("MgJwYjoCaGI="); // Used to be '3A026862' aka ':\x02hb', now is '2\x02pb:\x02hb'. public void run(WebSocket webSocket, long pingTaskTime) { - stop(); - thread = new Thread(() -> heartbeatTask(webSocket, pingTaskTime), "heartbeat-task"); - isRunning = true; - thread.start(); - } + stop(webSocket); // remove existing task if any - public void stop() { - if (thread != null) - thread.interrupt(); - isRunning = false; - } - - private void heartbeatTask(WebSocket webSocket, long pingTaskTime) { - while (isRunning) { + tasks.put(webSocket, scheduler.scheduleAtFixedRate(() -> { try { if (webSocket.isOpen()) { webSocket.send(heartbeatBytes); - Thread.sleep(pingTaskTime + (int) (Math.random() * MAX_TIMEOUT)); - } else - Thread.sleep(SLEEP_TIME); + commTime.put(webSocket, System.currentTimeMillis()); + } else { + Long time = commTime.get(webSocket); + if (time != null && System.currentTimeMillis() - time >= 60_000) // Stop if disconnected longer than 60s + stop(webSocket); + } } catch (Exception e) { - //TODO we should display some kind of error message - isRunning = false; + e.printStackTrace(); + stop(webSocket); } - } + }, 0, pingTaskTime, TimeUnit.MILLISECONDS)); + } + + public void stop(WebSocket webSocket) { + ScheduledFuture future = tasks.remove(webSocket); + if (future != null) + future.cancel(true); + commTime.remove(webSocket); + } + + public void shutdown() { + tasks.values().forEach(f -> f.cancel(true)); + commTime.clear(); + scheduler.shutdownNow(); } } \ No newline at end of file