Change static schedulers to AsyncHandler to hold for heartbeat and reconnect logic. (#148)

This commit is contained in:
David Kohler
2025-09-24 19:26:44 -04:00
committed by GitHub
parent 85cba9fff2
commit 57f33b2efa
5 changed files with 45 additions and 39 deletions

View File

@@ -23,6 +23,7 @@
package io.github.jwdeveloper.tiktok;
import com.google.protobuf.ByteString;
import io.github.jwdeveloper.tiktok.common.AsyncHandler;
import io.github.jwdeveloper.tiktok.data.events.*;
import io.github.jwdeveloper.tiktok.data.events.common.TikTokEvent;
import io.github.jwdeveloper.tiktok.data.events.control.*;
@@ -39,7 +40,7 @@ import io.github.jwdeveloper.tiktok.websocket.*;
import lombok.Getter;
import java.util.Base64;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.logging.Logger;
@@ -89,12 +90,11 @@ public class TikTokLiveClient implements LiveClient
tikTokEventHandler.publish(this, new TikTokDisconnectedEvent("Exception: " + e.getMessage()));
if (e instanceof TikTokLiveOfflineHostException && clientSettings.isRetryOnConnectionFailure()) {
try {
Thread.sleep(clientSettings.getRetryConnectionTimeout().toMillis());
} catch (Exception ignored) {}
AsyncHandler.getReconnectScheduler().schedule(() -> {
logger.info("Reconnecting");
tikTokEventHandler.publish(this, new TikTokReconnectingEvent());
this.connect();
}, clientSettings.getRetryConnectionTimeout().toMillis(), TimeUnit.MILLISECONDS);
}
throw e;
} catch (Exception e) {

View File

@@ -133,7 +133,7 @@ public class TikTokLiveClientBuilder implements LiveClientBuilder {
//networking
dependance.registerSingleton(HttpClientFactory.class);
dependance.registerSingleton(WebSocketHeartbeatTask.class); // True global singleton - Static objects are located to serve as global
dependance.registerSingleton(WebSocketHeartbeatTask.class);
if (clientSettings.isOffline()) {
dependance.registerSingleton(LiveSocketClient.class, TikTokWebSocketOfflineClient.class);
dependance.registerSingleton(LiveHttpClient.class, TikTokLiveHttpOfflineClient.class);

View File

@@ -0,0 +1,22 @@
package io.github.jwdeveloper.tiktok.common;
import lombok.Getter;
import java.util.concurrent.*;
public class AsyncHandler
{
@Getter
private static final ScheduledExecutorService heartBeatScheduler = Executors.newScheduledThreadPool(1, r -> {
Thread t = new Thread(r, "heartbeat-pool");
t.setDaemon(true);
return t;
});
@Getter
private static final ScheduledExecutorService reconnectScheduler = Executors.newScheduledThreadPool(0, r -> {
Thread t = new Thread(r, "reconnect-pool");
t.setDaemon(true);
return t;
});
}

View File

@@ -142,7 +142,7 @@ public class TikTokWebSocketClient implements LiveSocketClient {
case DISCONNECT -> webSocketClient.closeConnection(CloseFrame.NORMAL, "");
default -> webSocketClient.close();
}
heartbeatTask.stop(webSocketClient);
heartbeatTask.stop();
}
webSocketClient = null;
}

View File

@@ -22,6 +22,7 @@
*/
package io.github.jwdeveloper.tiktok.websocket;
import io.github.jwdeveloper.tiktok.common.AsyncHandler;
import org.java_websocket.WebSocket;
import java.util.*;
@@ -29,47 +30,30 @@ import java.util.concurrent.*;
public class WebSocketHeartbeatTask
{
// Single shared pool for all heartbeat tasks
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, r -> {
Thread t = new Thread(r, "heartbeat-pool");
t.setDaemon(true);
return t;
});
private static final Map<WebSocket, ScheduledFuture<?>> tasks = new ConcurrentHashMap<>();
private static final Map<WebSocket, Long> commTime = new ConcurrentHashMap<>();
private ScheduledFuture<?> task;
private Long commTime;
private final byte[] heartbeatBytes = Base64.getDecoder().decode("MgJwYjoCaGI="); // Used to be '3A026862' aka ':\x02hb', now is '2\x02pb:\x02hb'.
private final static byte[] heartbeatBytes = Base64.getDecoder().decode("MgJwYjoCaGI="); // Used to be '3A026862' aka ':\x02hb', now is '2\x02pb:\x02hb'.
public void run(WebSocket webSocket, long pingTaskTime) {
stop(webSocket); // remove existing task if any
stop(); // remove existing task if any
tasks.put(webSocket, scheduler.scheduleAtFixedRate(() -> {
task = AsyncHandler.getHeartBeatScheduler().scheduleAtFixedRate(() -> {
try {
if (webSocket.isOpen()) {
webSocket.send(heartbeatBytes);
commTime.put(webSocket, System.currentTimeMillis());
} else {
Long time = commTime.get(webSocket);
if (time != null && System.currentTimeMillis() - time >= 60_000) // Stop if disconnected longer than 60s
stop(webSocket);
}
commTime = System.currentTimeMillis();
} else if (commTime != null && System.currentTimeMillis() - commTime >= 60_000) // Stop if disconnected longer than 60s
stop();
} catch (Exception e) {
e.printStackTrace();
stop(webSocket);
stop();
}
}, 0, pingTaskTime, TimeUnit.MILLISECONDS));
}, 0, pingTaskTime, TimeUnit.MILLISECONDS);
}
public void stop(WebSocket webSocket) {
ScheduledFuture<?> future = tasks.remove(webSocket);
if (future != null)
future.cancel(true);
commTime.remove(webSocket);
}
public void shutdown() {
tasks.values().forEach(f -> f.cancel(true));
commTime.clear();
scheduler.shutdownNow();
public void stop() {
if (task != null)
task.cancel(true);
}
}