package com.zbkj.admin.manager;
|
|
import com.zbkj.common.utils.RedisUtil;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.context.annotation.Scope;
|
import org.springframework.stereotype.Component;
|
import org.springframework.web.socket.CloseStatus;
|
import org.springframework.web.socket.TextMessage;
|
import org.springframework.web.socket.WebSocketSession;
|
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
|
|
import javax.annotation.PostConstruct;
|
import java.io.IOException;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.TimeUnit;
|
|
@Component
|
@Scope("singleton") // 显式声明为单例
|
public class BigScreenWebSocketHandler extends AbstractWebSocketHandler {
|
|
public static final Logger logger = LoggerFactory.getLogger(BigScreenWebSocketHandler.class);
|
|
private final CopyOnWriteArrayList<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
|
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
|
private static final long HEARTBEAT_INTERVAL = 30000; // 心跳检测间隔30秒
|
private static final long HEARTBEAT_TIMEOUT = 60000; // 超时时间60秒
|
|
@Autowired
|
private RedisUtil redisUtil;
|
|
@PostConstruct
|
public void init() {
|
scheduler.scheduleAtFixedRate(this::checkHeartbeats, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
|
}
|
|
// 心跳检测逻辑
|
private void checkHeartbeats() {
|
long now = System.currentTimeMillis();
|
sessions.removeIf(session -> {
|
try {
|
if (!session.isOpen()) return true;
|
|
Long lastActive = (Long) session.getAttributes().get("lastActive");
|
if (lastActive == null || now - lastActive > HEARTBEAT_TIMEOUT) {
|
session.close(CloseStatus.SESSION_NOT_RELIABLE);
|
return true;
|
}
|
return false;
|
} catch (IOException e) {
|
e.printStackTrace();
|
return true;
|
}
|
});
|
}
|
|
|
@Override
|
public void afterConnectionEstablished(WebSocketSession session) {
|
session.getAttributes().put("lastActive", System.currentTimeMillis());
|
sessions.add(session);
|
System.out.println("BigScreenWebSocketHandler -- 新连接建立: " + session.getId() +
|
" | 当前连接数: " + sessions.size());
|
}
|
|
@Override
|
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
|
String payload = message.getPayload();
|
// 处理心跳消息
|
if ("ping".equalsIgnoreCase(payload)) {
|
session.sendMessage(new TextMessage("pong"));
|
session.getAttributes().put("lastActive", System.currentTimeMillis());
|
} else if (payload.contains("open_")) {
|
System.out.println("BigScreenWebSocketHandler -- 打开大屏 -------- " + payload);
|
System.out.println("BigScreenWebSocketHandler -- 建立连接之后主动发送数据给客户端");
|
// 获取最新初始数据
|
String initialData = getLatestInitialData();
|
if (initialData != null) {
|
try {
|
// 向发送消息的客户端推送最新初始数据
|
session.sendMessage(new TextMessage(initialData));
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
}
|
} else {
|
// 其他业务消息处理保持不变
|
super.handleTextMessage(session, message);
|
}
|
}
|
|
// 获取最新初始数据的方法,可根据实际情况修改
|
private String getLatestInitialData() {
|
// 从 Redis 中获取数据,实际应用中可按需修改
|
Object data = redisUtil.get("screen-data");
|
return data != null ? data.toString() : null;
|
}
|
|
@Override
|
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
|
sessions.remove(session);
|
System.out.println("BigScreenWebSocketHandler -- 连接关闭: " + session.getId() +
|
" | 当前连接数: " + sessions.size());
|
}
|
|
// 广播消息给所有客户端
|
public void broadcast(String message) {
|
System.out.println("BigScreenWebSocketHandler -- [广播] 当前连接数: " + sessions.size() +
|
" | 内容: " + message);
|
sessions.forEach(session -> {
|
try {
|
if (session.isOpen()) {
|
session.sendMessage(new TextMessage(message));
|
}
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
});
|
}
|
|
@Override
|
public void handleTransportError(WebSocketSession session, Throwable exception) {
|
System.err.println("BigScreenWebSocketHandler -- 传输错误: " + session.getId());
|
logger.error("BigScreenWebSocketHandler -- 传输错误: {}, 异常信息: {}", session.getId(), exception.getMessage());
|
exception.printStackTrace();
|
}
|
|
@Override
|
public boolean supportsPartialMessages() {
|
return false;
|
}
|
}
|