zouyu
2025-10-23 0e5bddf6084d3dfb7bcad7217d4320898416eba3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package com.ruoyi.device.service.impl;
 
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ruoyi.common.utils.RedisUtil;
import com.ruoyi.device.constant.DCResistanceMqttConstants;
import com.ruoyi.device.constant.ElongationMqttConstants;
import com.ruoyi.device.mapper.CollectBridgeMapper;
import com.ruoyi.device.pojo.CollectBridge;
import com.ruoyi.device.service.CollectBridgeService;
import com.ruoyi.device.vo.DeviceMqttVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.Objects;
 
/**
 * 数字电桥采集
 *
 * @author zhuo
 * @since 2025-02-19
 */
@Service
@Slf4j
public class CollectBridgeServiceImpl extends ServiceImpl<CollectBridgeMapper, CollectBridge> implements CollectBridgeService {
 
    @Autowired
    private CollectBridgeMapper collectBridgeMapper;
 
    /**
     * 填充采集数据
     * @param jsonObject
     */
    @Override
    public void addBridgeValue(JSONObject jsonObject) {
//        JSONArray dataArray = jsonObject.getJSONArray("data");
//        for (int i = 0; i < dataArray.size(); i++) {
//            JSONObject listInfo = dataArray.getJSONObject(i);
//            // 存储数据
//            String dataStream = listInfo.getString("dataStream");
//            if (dataStream.equals("DQCS.DQCS.SN")) {
//                JSONArray dataPoints = listInfo.getJSONArray("dataPoints");
//                JSONObject pointsJSONObject = dataPoints.getJSONObject(0);
//                String entrustCode = pointsJSONObject.getString("value");
//                // 解析时间戳
//                Instant instant = Instant.ofEpochMilli(pointsJSONObject.getLong("time"));
//                LocalDateTime collectDate = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
//
//                // 先存储编号, 后续存储值
//                CollectBridge collectBridge = new CollectBridge();
//                collectBridge.setEntrustCode(entrustCode);
//                collectBridge.setCollectDate(collectDate);
//                baseMapper.insert(collectBridge);
//
//            }
//            // 寄存器地址等于64获取结果值
//            if (dataStream.equals("DQCS.DQCS.64")) {
//                JSONArray dataPoints = listInfo.getJSONArray("dataPoints");
//                JSONObject pointsJSONObject = dataPoints.getJSONObject(0);
//                String value = pointsJSONObject.getString("value");
//
//                if (value.equals("64")) {
//                    for (int j = 0; j < dataArray.size(); j++) {
//                        JSONObject listInfo2 = dataArray.getJSONObject(j);
//                        String dataStream2 = listInfo2.getString("dataStream");
//                        // 寄存器地址等于64获取结果值
//                        if (dataStream2.equals("DQCS.DQCS.DZZ")) {
//                            JSONArray dataPoints2 = listInfo2.getJSONArray("dataPoints");
//                            JSONObject pointsJSONObject2 = dataPoints2.getJSONObject(0);
//                            String collectValue = pointsJSONObject2.getString("value");
//
//                            // 解析时间戳
//                            Instant instant = Instant.ofEpochMilli(pointsJSONObject2.getLong("time"));
//                            LocalDateTime collectDate = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
//
//                            // 查询最新一条数据
//                            CollectBridge collectBridge = baseMapper.selectOne(Wrappers.<CollectBridge>lambdaQuery()
//                                    .orderByDesc(CollectBridge::getCollectDate)
//                                    .last("limit 1"));
//
//                            // 判断两条数据是否相差在10分钟之内和有没有编号
//                            if (isWithinTenMinutes(collectDate, collectBridge.getCollectDate()) &&
//                                    StringUtils.isNotBlank(collectBridge.getEntrustCode())) {
//                                // 修改检验值
//                                collectBridge.setCollectValue(collectValue);
//                                baseMapper.updateById(collectBridge);
//                            } else {
//                                // 只存储值
//                                CollectBridge bridge = new CollectBridge();
//                                bridge.setCollectValue(collectValue);
//                                bridge.setCollectDate(collectDate);
//                                baseMapper.insert(bridge);
//                            }
//                        }
//                    }
//                }
//            }
//        }
    }
 
    @Override
    public void dcResistanceDataAnalysis(String jsonStr) {
        Map<String, Object> dataMap = JSONObject.parseObject(jsonStr, new TypeReference<Map<String, Object>>(){}.getType());
        if(CollectionUtil.isNotEmpty(dataMap) && Objects.nonNull(dataMap.get("data"))){
            List<DeviceMqttVO> dataList = JSONArray.parseObject(dataMap.get("data").toString(), new TypeReference<List<DeviceMqttVO>>(){}.getType());
            //过滤出包含实际电阻值的对象
            // 处理 NSDQCS.DQCS.DZZ 数据
            String entrustCode = getValueByDataStream(dataList,DCResistanceMqttConstants.NSDQCS_DQCS_SN);
            BigDecimal value = new BigDecimal(getValueByDataStream(dataList,DCResistanceMqttConstants.NSDQCS_DQCS_DZZ));
            Long count = collectBridgeMapper.selectCount(Wrappers.<CollectBridge>lambdaQuery().eq(CollectBridge::getEntrustCode, entrustCode));
            if(count>0L){
                collectBridgeMapper.update(null,Wrappers.<CollectBridge>lambdaUpdate()
                        .set(CollectBridge::getCollectValue,value)
                        .set(CollectBridge::getLastCollectDate,LocalDateTime.now())
                        .eq(CollectBridge::getEntrustCode,entrustCode));
            }else{
                collectBridgeMapper.insert(new CollectBridge(entrustCode,value,LocalDateTime.now(),LocalDateTime.now()));
            }
        }
    }
 
    @Override
    public void elongationDataAnalysis(String jsonStr) {
        Map<String, Object> dataMap = JSONObject.parseObject(jsonStr, new TypeReference<Map<String, Object>>(){}.getType());
        if(CollectionUtil.isNotEmpty(dataMap) && Objects.nonNull(dataMap.get("data"))){
            List<DeviceMqttVO> dataList = JSONArray.parseObject(dataMap.get("data").toString(), new TypeReference<List<DeviceMqttVO>>(){}.getType());
            // 伸长率:处理 NSTSSC.TSSC.SCL 数据
            processMultipleValueDataStream(dataList, ElongationMqttConstants.NSTSSC_TSSC_SCL,3,10.0);
        }
    }
 
    /**
     * 获取mqtt消息体中对应标识的值
     * @param dataList 数据列表
     * @param dataStream 数据流标识
     */
    private String getValueByDataStream(List<DeviceMqttVO> dataList, String dataStream) {
        DeviceMqttVO deviceMqttVO = dataList.stream()
                .filter(f -> StringUtils.equals(f.getDataStream(), dataStream))
                .findFirst()
                .orElse(null);
        if(Objects.nonNull(deviceMqttVO)){
            List<DeviceMqttVO.DataPoint> dataPoints = deviceMqttVO.getDataPoints();
            return CollectionUtil.isNotEmpty(dataPoints)?dataPoints.get(0).getValue():"";
        }
        return "";
    }
 
    /**
     * 处理需要保存多个值的数据流,并保存到 Redis
     * @param dataList 数据列表
     * @param dataStream 数据流标识
     * @param maxSize 保存的最大个数
     * @param minVal 存储数据的最小值,低于该值则跳过;-1代表不限制
     */
    private void processMultipleValueDataStream(List<DeviceMqttVO> dataList, String dataStream,int maxSize,Double minVal){
        // 筛选出当前数据流对应的数据
//        Double value = Double.parseDouble(getValueByDataStream(dataList, dataStream));
        Double value = BigDecimal.valueOf(Math.random()*100).setScale(7, RoundingMode.HALF_EVEN).doubleValue();
        //如果最小值不为空且不为-1,获取到的值小于最小值则跳过不做处理
        if(Objects.nonNull(minVal)&&minVal!=-1&&value<minVal){
            return;
        }
        //判断key是否存在
        boolean existKey = RedisUtil.hasKey(dataStream);
        if(existKey){
            long zSetSize = RedisUtil.getZSetSize(dataStream);
            //如果数据长度超过最大个数,则删除分数最低的数据
            if(zSetSize>=maxSize){
                RedisUtil.delZSetRange(dataStream,0,0);
            }
        }
        RedisUtil.addZSet(dataStream,System.currentTimeMillis(),value);
    }
 
    /**
     * 处理需要获取三次不同值的数据流,并保存到 Redis
     * @param dataList 数据列表
     * @param dataStream 数据流标识
     */
    private void processThreeTimesValueDataStream(List<DeviceMqttVO> dataList, String dataStream) {
        // 从 Redis 获取已存储的值
        Object valueFromRedis = RedisUtil.get(dataStream);
        JSONArray valueArray = new JSONArray();
 
        if (valueFromRedis != null) {
            if (valueFromRedis instanceof String) {
                try {
                    valueArray = JSONArray.parseArray((String) valueFromRedis);
                } catch (Exception e) {
                    // 如果解析失败,说明 Redis 中的值可能不是合法的 JSON 数组,创建空数组
                    valueArray = new JSONArray();
                }
            } else if (valueFromRedis instanceof Double) {
                // 如果是 Double 类型,将其添加到数组中
                valueArray.add(valueFromRedis);
            }
        }
 
 
 
        // 将更新后的值保存到 Redis
        RedisUtil.set(dataStream, valueArray.toJSONString());
    }
    public static boolean isWithinTenMinutes(LocalDateTime dateTime1, LocalDateTime dateTime2) {
        Duration duration = Duration.between(dateTime1, dateTime2);
        long minutesDifference = Math.abs(duration.toMinutes());
        return minutesDifference <= 10;
    }
 
}