zouyu
2025-10-17 c65ab218b14e87489f1594b2d932f7bd54b3ba11
cnas-device/src/main/java/com/ruoyi/device/service/impl/CollectBridgeServiceImpl.java
@@ -1,20 +1,32 @@
package com.ruoyi.device.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ruoyi.common.utils.RedisUtil;
import com.ruoyi.device.constant.DCResistanceMqttConstants;
import com.ruoyi.device.constant.ElongationMqttConstants;
import com.ruoyi.device.mapper.CollectBridgeMapper;
import com.ruoyi.device.pojo.CollectBridge;
import com.ruoyi.device.service.CollectBridgeService;
import com.ruoyi.device.vo.DeviceMqttVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
 * 数字电桥采集
@@ -26,77 +38,184 @@
@Slf4j
public class CollectBridgeServiceImpl extends ServiceImpl<CollectBridgeMapper, CollectBridge> implements CollectBridgeService {
    @Autowired
    private CollectBridgeMapper collectBridgeMapper;
    /**
     * 填充采集数据
     * @param jsonObject
     */
    @Override
    public void addBridgeValue(JSONObject jsonObject) {
        JSONArray dataArray = jsonObject.getJSONArray("data");
        for (int i = 0; i < dataArray.size(); i++) {
            JSONObject listInfo = dataArray.getJSONObject(i);
            // 存储数据
            String dataStream = listInfo.getString("dataStream");
            if (dataStream.equals("DQCS.DQCS.SN")) {
                JSONArray dataPoints = listInfo.getJSONArray("dataPoints");
                JSONObject pointsJSONObject = dataPoints.getJSONObject(0);
                String entrustCode = pointsJSONObject.getString("value");
                // 解析时间戳
                Instant instant = Instant.ofEpochMilli(pointsJSONObject.getLong("time"));
                LocalDateTime collectDate = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
//        JSONArray dataArray = jsonObject.getJSONArray("data");
//        for (int i = 0; i < dataArray.size(); i++) {
//            JSONObject listInfo = dataArray.getJSONObject(i);
//            // 存储数据
//            String dataStream = listInfo.getString("dataStream");
//            if (dataStream.equals("DQCS.DQCS.SN")) {
//                JSONArray dataPoints = listInfo.getJSONArray("dataPoints");
//                JSONObject pointsJSONObject = dataPoints.getJSONObject(0);
//                String entrustCode = pointsJSONObject.getString("value");
//                // 解析时间戳
//                Instant instant = Instant.ofEpochMilli(pointsJSONObject.getLong("time"));
//                LocalDateTime collectDate = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
//
//                // 先存储编号, 后续存储值
//                CollectBridge collectBridge = new CollectBridge();
//                collectBridge.setEntrustCode(entrustCode);
//                collectBridge.setCollectDate(collectDate);
//                baseMapper.insert(collectBridge);
//
//            }
//            // 寄存器地址等于64获取结果值
//            if (dataStream.equals("DQCS.DQCS.64")) {
//                JSONArray dataPoints = listInfo.getJSONArray("dataPoints");
//                JSONObject pointsJSONObject = dataPoints.getJSONObject(0);
//                String value = pointsJSONObject.getString("value");
//
//                if (value.equals("64")) {
//                    for (int j = 0; j < dataArray.size(); j++) {
//                        JSONObject listInfo2 = dataArray.getJSONObject(j);
//                        String dataStream2 = listInfo2.getString("dataStream");
//                        // 寄存器地址等于64获取结果值
//                        if (dataStream2.equals("DQCS.DQCS.DZZ")) {
//                            JSONArray dataPoints2 = listInfo2.getJSONArray("dataPoints");
//                            JSONObject pointsJSONObject2 = dataPoints2.getJSONObject(0);
//                            String collectValue = pointsJSONObject2.getString("value");
//
//                            // 解析时间戳
//                            Instant instant = Instant.ofEpochMilli(pointsJSONObject2.getLong("time"));
//                            LocalDateTime collectDate = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
//
//                            // 查询最新一条数据
//                            CollectBridge collectBridge = baseMapper.selectOne(Wrappers.<CollectBridge>lambdaQuery()
//                                    .orderByDesc(CollectBridge::getCollectDate)
//                                    .last("limit 1"));
//
//                            // 判断两条数据是否相差在10分钟之内和有没有编号
//                            if (isWithinTenMinutes(collectDate, collectBridge.getCollectDate()) &&
//                                    StringUtils.isNotBlank(collectBridge.getEntrustCode())) {
//                                // 修改检验值
//                                collectBridge.setCollectValue(collectValue);
//                                baseMapper.updateById(collectBridge);
//                            } else {
//                                // 只存储值
//                                CollectBridge bridge = new CollectBridge();
//                                bridge.setCollectValue(collectValue);
//                                bridge.setCollectDate(collectDate);
//                                baseMapper.insert(bridge);
//                            }
//                        }
//                    }
//                }
//            }
//        }
    }
                // 先存储编号, 后续存储值
                CollectBridge collectBridge = new CollectBridge();
                collectBridge.setEntrustCode(entrustCode);
                collectBridge.setCollectDate(collectDate);
                baseMapper.insert(collectBridge);
            }
            // 寄存器地址等于64获取结果值
            if (dataStream.equals("DQCS.DQCS.64")) {
                JSONArray dataPoints = listInfo.getJSONArray("dataPoints");
                JSONObject pointsJSONObject = dataPoints.getJSONObject(0);
                String value = pointsJSONObject.getString("value");
                if (value.equals("64")) {
                    for (int j = 0; j < dataArray.size(); j++) {
                        JSONObject listInfo2 = dataArray.getJSONObject(j);
                        String dataStream2 = listInfo2.getString("dataStream");
                        // 寄存器地址等于64获取结果值
                        if (dataStream2.equals("DQCS.DQCS.DZZ")) {
                            JSONArray dataPoints2 = listInfo2.getJSONArray("dataPoints");
                            JSONObject pointsJSONObject2 = dataPoints2.getJSONObject(0);
                            String collectValue = pointsJSONObject2.getString("value");
                            // 解析时间戳
                            Instant instant = Instant.ofEpochMilli(pointsJSONObject2.getLong("time"));
                            LocalDateTime collectDate = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
                            // 查询最新一条数据
                            CollectBridge collectBridge = baseMapper.selectOne(Wrappers.<CollectBridge>lambdaQuery()
                                    .orderByDesc(CollectBridge::getCollectDate)
                                    .last("limit 1"));
                            // 判断两条数据是否相差在10分钟之内和有没有编号
                            if (isWithinTenMinutes(collectDate, collectBridge.getCollectDate()) &&
                                    StringUtils.isNotBlank(collectBridge.getEntrustCode())) {
                                // 修改检验值
                                collectBridge.setCollectValue(collectValue);
                                baseMapper.updateById(collectBridge);
                            } else {
                                // 只存储值
                                CollectBridge bridge = new CollectBridge();
                                bridge.setCollectValue(collectValue);
                                bridge.setCollectDate(collectDate);
                                baseMapper.insert(bridge);
                            }
                        }
                    }
                }
    @Override
    public void dcResistanceDataAnalysis(String jsonStr) {
        Map<String, Object> dataMap = JSONObject.parseObject(jsonStr, new TypeReference<Map<String, Object>>(){}.getType());
        if(CollectionUtil.isNotEmpty(dataMap) && Objects.nonNull(dataMap.get("data"))){
            List<DeviceMqttVO> dataList = JSONArray.parseObject(dataMap.get("data").toString(), new TypeReference<List<DeviceMqttVO>>(){}.getType());
            //过滤出包含实际电阻值的对象
            // 处理 NSDQCS.DQCS.DZZ 数据
            String entrustCode = getValueByDataStream(dataList,DCResistanceMqttConstants.NSDQCS_DQCS_SN);
            BigDecimal value = new BigDecimal(getValueByDataStream(dataList,DCResistanceMqttConstants.NSDQCS_DQCS_DZZ));
            Long count = collectBridgeMapper.selectCount(Wrappers.<CollectBridge>lambdaQuery().eq(CollectBridge::getEntrustCode, entrustCode));
            if(count>0L){
                collectBridgeMapper.update(null,Wrappers.<CollectBridge>lambdaUpdate()
                        .set(CollectBridge::getCollectValue,value)
                        .set(CollectBridge::getLastCollectDate,LocalDateTime.now())
                        .eq(CollectBridge::getEntrustCode,entrustCode));
            }else{
                collectBridgeMapper.insert(new CollectBridge(entrustCode,value,LocalDateTime.now(),LocalDateTime.now()));
            }
        }
    }
    @Override
    public void elongationDataAnalysis(String jsonStr) {
        Map<String, Object> dataMap = JSONObject.parseObject(jsonStr, new TypeReference<Map<String, Object>>(){}.getType());
        if(CollectionUtil.isNotEmpty(dataMap) && Objects.nonNull(dataMap.get("data"))){
            List<DeviceMqttVO> dataList = JSONArray.parseObject(dataMap.get("data").toString(), new TypeReference<List<DeviceMqttVO>>(){}.getType());
            // 伸长率:处理 NSTSSC.TSSC.SCL 数据
            processMultipleValueDataStream(dataList, ElongationMqttConstants.NSTSSC_TSSC_SCL,3,10.0);
        }
    }
    /**
     * 获取mqtt消息体中对应标识的值
     * @param dataList 数据列表
     * @param dataStream 数据流标识
     */
    private String getValueByDataStream(List<DeviceMqttVO> dataList, String dataStream) {
        DeviceMqttVO deviceMqttVO = dataList.stream()
                .filter(f -> StringUtils.equals(f.getDataStream(), dataStream))
                .findFirst()
                .orElse(null);
        if(Objects.nonNull(deviceMqttVO)){
            List<DeviceMqttVO.DataPoint> dataPoints = deviceMqttVO.getDataPoints();
            return CollectionUtil.isNotEmpty(dataPoints)?dataPoints.get(0).getValue():"";
        }
        return "";
    }
    /**
     * 处理需要保存多个值的数据流,并保存到 Redis
     * @param dataList 数据列表
     * @param dataStream 数据流标识
     * @param maxSize 保存的最大个数
     * @param minVal 存储数据的最小值,低于该值则跳过;-1代表不限制
     */
    private void processMultipleValueDataStream(List<DeviceMqttVO> dataList, String dataStream,int maxSize,Double minVal){
        // 筛选出当前数据流对应的数据
//        Double value = Double.parseDouble(getValueByDataStream(dataList, dataStream));
        Double value = BigDecimal.valueOf(Math.random()*100).setScale(7, RoundingMode.HALF_EVEN).doubleValue();
        //如果最小值不为空且不为-1,获取到的值小于最小值则跳过不做处理
        if(Objects.nonNull(minVal)&&minVal!=-1&&value<minVal){
            return;
        }
        //判断key是否存在
        boolean existKey = RedisUtil.hasKey(dataStream);
        if(existKey){
            long zSetSize = RedisUtil.getZSetSize(dataStream);
            //如果数据长度超过最大个数,则删除分数最低的数据
            if(zSetSize>=maxSize){
                RedisUtil.delZSetRange(dataStream,0,0);
            }
        }
        RedisUtil.addZSet(dataStream,System.currentTimeMillis(),value);
    }
    /**
     * 处理需要获取三次不同值的数据流,并保存到 Redis
     * @param dataList 数据列表
     * @param dataStream 数据流标识
     */
    private void processThreeTimesValueDataStream(List<DeviceMqttVO> dataList, String dataStream) {
        // 从 Redis 获取已存储的值
        Object valueFromRedis = RedisUtil.get(dataStream);
        JSONArray valueArray = new JSONArray();
        if (valueFromRedis != null) {
            if (valueFromRedis instanceof String) {
                try {
                    valueArray = JSONArray.parseArray((String) valueFromRedis);
                } catch (Exception e) {
                    // 如果解析失败,说明 Redis 中的值可能不是合法的 JSON 数组,创建空数组
                    valueArray = new JSONArray();
                }
            } else if (valueFromRedis instanceof Double) {
                // 如果是 Double 类型,将其添加到数组中
                valueArray.add(valueFromRedis);
            }
        }
        // 将更新后的值保存到 Redis
        RedisUtil.set(dataStream, valueArray.toJSONString());
    }
    public static boolean isWithinTenMinutes(LocalDateTime dateTime1, LocalDateTime dateTime2) {
        Duration duration = Duration.between(dateTime1, dateTime2);
        long minutesDifference = Math.abs(duration.toMinutes());