zouyu
2 天以前 7c5e1ef7db84731610d5616b2ee7e6e63abd886d
cnas-device/src/main/java/com/ruoyi/device/service/impl/CollectBridgeServiceImpl.java
@@ -13,7 +13,7 @@
import com.ruoyi.device.mapper.CollectBridgeMapper;
import com.ruoyi.device.pojo.CollectBridge;
import com.ruoyi.device.service.CollectBridgeService;
import com.ruoyi.device.vo.DCResistanceMqttVO;
import com.ruoyi.device.vo.DeviceMqttVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -111,29 +111,61 @@
    public void addBridgeValueByNS(String jsonStr) {
        Map<String, Object> dataMap = JSONObject.parseObject(jsonStr, new TypeReference<Map<String, Object>>(){}.getType());
        if(CollectionUtil.isNotEmpty(dataMap) && Objects.nonNull(dataMap.get("data"))){
            List<DCResistanceMqttVO> dataList = JSONArray.parseObject(dataMap.get("data").toString(), new TypeReference<List<DCResistanceMqttVO>>(){}.getType());
            List<DeviceMqttVO> dataList = JSONArray.parseObject(dataMap.get("data").toString(), new TypeReference<List<DeviceMqttVO>>(){}.getType());
            //过滤出包含实际电阻值的对象
            // 处理 NSDQCS.DQCS.DZZ 数据
            processSingleValueDataStream(dataList, DCResistanceMqttConstants.NSDQCS_DQCS_DZZ);
            // 处理 NSTSSC.TSSC.WY 数据
            processThreeTimesValueDataStream(dataList, ElongationMqttConstants.NSTSSC_TSSC_WY);
            Double value = getValueByDataStream(dataList,DCResistanceMqttConstants.NSDQCS_DQCS_DZZ);
            RedisUtil.set(DCResistanceMqttConstants.NSDQCS_DQCS_DZZ, value);
        }
    }
    @Override
    public void addBridgeValueByNSD(String jsonStr) {
        Map<String, Object> dataMap = JSONObject.parseObject(jsonStr, new TypeReference<Map<String, Object>>(){}.getType());
        if(CollectionUtil.isNotEmpty(dataMap) && Objects.nonNull(dataMap.get("data"))){
            List<DeviceMqttVO> dataList = JSONArray.parseObject(dataMap.get("data").toString(), new TypeReference<List<DeviceMqttVO>>(){}.getType());
            //过滤出包含实际电阻值的对象
            // 处理 NSTSSC.TSSC.WY 数据
            processMultipleValueDataStream(dataList, ElongationMqttConstants.NSTSSC_TSSC_WY,3);
        }
    }
    /**
     * 处理单值数据流的数据并保存到 Redis
     * 获取mqtt消息体中对应标识的值
     * @param dataList 数据列表
     * @param dataStream 数据流标识
     */
    private void processSingleValueDataStream(List<DCResistanceMqttVO> dataList, String dataStream) {
        DCResistanceMqttVO dcResistanceMqttVO = dataList.stream()
    private Double getValueByDataStream(List<DeviceMqttVO> dataList, String dataStream) {
        DeviceMqttVO deviceMqttVO = dataList.stream()
                .filter(f -> StringUtils.equals(f.getDataStream(), dataStream))
                .findFirst()
                .orElse(null);
        if(Objects.nonNull(dcResistanceMqttVO)){
            List<DCResistanceMqttVO.DataPoint> dataPoints = dcResistanceMqttVO.getDataPoints();
        if(Objects.nonNull(deviceMqttVO)){
            List<DeviceMqttVO.DataPoint> dataPoints = deviceMqttVO.getDataPoints();
            BigDecimal value = CollectionUtil.isNotEmpty(dataPoints)?dataPoints.get(0).getValue():BigDecimal.ZERO;
            // 保存单个值到 redis
            RedisUtil.set(dataStream, value.doubleValue());
            return value.doubleValue();
        }
        return 0.0;
    }
    /**
     * 处理需要保存多个值的数据流,并保存到 Redis
     * @param dataList 数据列表
     * @param dataStream 数据流标识
     * @param maxSize 保存的最大个数
     */
    private void processMultipleValueDataStream(List<DeviceMqttVO> dataList, String dataStream,int maxSize){
        // 筛选出当前数据流对应的数据
        Double value = getValueByDataStream(dataList, dataStream);
        //判断key是否存在
        boolean existKey = RedisUtil.hasKey(dataStream);
        if(existKey){
            long listSize = RedisUtil.lGetListSize(dataStream);
            if(listSize<maxSize){
                RedisUtil.lSet(dataStream,value);
            }
        }else{
            RedisUtil.lSet(dataStream,value);
        }
    }
@@ -142,7 +174,7 @@
     * @param dataList 数据列表
     * @param dataStream 数据流标识
     */
    private void processThreeTimesValueDataStream(List<DCResistanceMqttVO> dataList, String dataStream) {
    private void processThreeTimesValueDataStream(List<DeviceMqttVO> dataList, String dataStream) {
        // 从 Redis 获取已存储的值
        Object valueFromRedis = RedisUtil.get(dataStream);
        JSONArray valueArray = new JSONArray();
@@ -161,24 +193,7 @@
            }
        }
        // 筛选出当前数据流对应的数据
        DCResistanceMqttVO dcResistanceMqttVO = dataList.stream()
                .filter(f -> StringUtils.equals(f.getDataStream(), dataStream))
                .findFirst()
                .orElse(null);
        if (Objects.nonNull(dcResistanceMqttVO)) {
            List<DCResistanceMqttVO.DataPoint> dataPoints = dcResistanceMqttVO.getDataPoints();
            if (CollectionUtil.isNotEmpty(dataPoints)) {
                BigDecimal value = dataPoints.get(0).getValue();
                valueArray.add(value.doubleValue());
                // 只保留最新的三个值
                if (valueArray.size() > 3) {
                    valueArray.remove(0);
                }
            }
        }
        // 将更新后的值保存到 Redis
        RedisUtil.set(dataStream, valueArray.toJSONString());