liding
3 天以前 7f9e375391e30fd3c367cb5a080a609a6e25e524
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.zbkj.admin.manager;
 
import com.zbkj.common.utils.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
 
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
 
@Component
@Scope("singleton") // 显式声明为单例
public class AdminLineUpWebSocketHandler extends AbstractWebSocketHandler {
 
    public static final Logger logger = LoggerFactory.getLogger(AdminLineUpWebSocketHandler.class);
 
    private final CopyOnWriteArrayList<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private static final long HEARTBEAT_INTERVAL = 30000; // 心跳检测间隔30秒
    private static final long HEARTBEAT_TIMEOUT = 60000;  // 超时时间60秒
 
    @Autowired
    private RedisUtil redisUtil;
 
    @PostConstruct
    public void init() {
        scheduler.scheduleAtFixedRate(this::checkHeartbeats, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
    }
 
    // 心跳检测逻辑
    private void checkHeartbeats() {
        long now = System.currentTimeMillis();
        sessions.removeIf(session -> {
            try {
                if (!session.isOpen()) return true;
 
                Long lastActive = (Long) session.getAttributes().get("lastActive");
                if (lastActive == null || now - lastActive > HEARTBEAT_TIMEOUT) {
                    session.close(CloseStatus.SESSION_NOT_RELIABLE);
                    return true;
                }
                return false;
            } catch (IOException e) {
                e.printStackTrace();
                return true;
            }
        });
    }
 
 
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        session.getAttributes().put("lastActive", System.currentTimeMillis());
        sessions.add(session);
        System.out.println("AdminLineUpWebSocketHandler -- 新连接建立: " + session.getId() +
                " | 当前连接数: " + sessions.size());
    }
 
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        String payload = message.getPayload();
        // 处理心跳消息
        if ("ping".equalsIgnoreCase(payload)) {
            session.sendMessage(new TextMessage("pong"));
            session.getAttributes().put("lastActive", System.currentTimeMillis());
        } else if (payload.contains("open_")) {
            System.out.println("AdminLineUpWebSocketHandler -- 打开大屏 -------- " + payload);
            System.out.println("AdminLineUpWebSocketHandler -- 建立连接之后主动发送数据给客户端");
            // 向发送消息的客户端推送最新初始数据
            session.sendMessage(new TextMessage("success"));
//            session.sendMessage(new TextMessage("update"));
        } else {
            // 其他业务消息处理保持不变
            super.handleTextMessage(session, message);
        }
    }
 
    // 获取最新初始数据的方法,可根据实际情况修改
    private String getLatestInitialData() {
        // 从 Redis 中获取数据,实际应用中可按需修改
        Object data = redisUtil.get("screen-data");
        return data != null ? data.toString() : null;
    }
 
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        sessions.remove(session);
        System.out.println("AdminLineUpWebSocketHandler -- 连接关闭: " + session.getId() +
                " | 当前连接数: " + sessions.size());
    }
 
    // 广播消息给所有客户端
    public void broadcast(String message) {
        System.out.println("AdminLineUpWebSocketHandler -- [广播] 当前连接数: " + sessions.size() +
                " | 内容: " + message);
        sessions.forEach(session -> {
            try {
                if (session.isOpen()) {
                    session.sendMessage(new TextMessage(message));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }
 
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) {
        System.err.println("AdminLineUpWebSocketHandler -- 传输错误: " + session.getId());
        logger.error("AdminLineUpWebSocketHandler -- 传输错误: {}, 异常信息: {}", session.getId(), exception.getMessage());
        exception.printStackTrace();
    }
 
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
}