Joe


年少不知愁滋味,老来方知行路难

进入博客 >

Joe

Joe

年少不知愁滋味,老来方知行路难
  • 文章 105篇
  • 评论 1条
  • 分类 5个
  • 标签 15个
2020-09-04

JetLinks规则引擎设计与实现

JetLinks规则引擎设计与实现:基于Reactor的响应式规则处理框架

0x00 引言:IoT平台的核心引擎

在物联网(IoT)平台架构中,规则引擎是连接设备数据与业务逻辑的核心组件。它需要实时处理海量设备上报的数据流,根据预定义规则执行告警、转发、存储等操作。传统的基于轮询或消息队列的架构在面对百万级设备连接时,往往力不从心。

JetLinks是一个开源的企业级物联网平台,其规则引擎采用响应式编程模型(Reactive Programming),基于Project Reactor实现。这种架构天然适合处理高并发、低延迟的数据流场景,并提供了强大的回压(Backpressure)机制来应对流量洪峰。

本文将从架构设计、核心实现、性能优化三个维度,深入剖析JetLinks规则引擎的工程实践。我们不仅关注"如何实现",更关注"为什么这样设计"。

0x01 架构设计:响应式数据流的管道模型

1.1 传统规则引擎的痛点

传统IoT规则引擎通常采用以下架构:

设备数据 → 消息队列(Kafka/RabbitMQ) → 规则处理器(轮询) → 执行器

存在的问题

  1. 阻塞式I/O:每个规则处理器线程在等待I/O时被阻塞,资源利用率低
  2. 内存压力:消息队列需要缓存大量数据,内存消耗大
  3. 回压缺失:上游生产速度过快时,下游消费者崩溃
  4. 扩展性差:线程模型限制了并发处理能力

1.2 JetLinks的响应式架构

JetLinks采用响应式流(Reactive Streams)规范,将数据处理抽象为管道(Pipeline)模型:

设备数据源(Flux) → 过滤器(Filter) → 转换器(Map) → 
规则匹配(FlatMap) → 动作执行(FlatMap) → 结果输出(Subscribe)

核心优势

  1. 非阻塞异步:基于事件循环,单线程可处理大量并发
  2. 声明式编程:函数式API,代码简洁且易于组合
  3. 自动回压:下游处理能力不足时,自动通知上游减速
  4. 资源高效:线程复用,内存占用可控

1.3 整体架构图

┌─────────────────────────────────────────────────────────────┐
│                      设备数据采集层                          │
│  MQTT Broker  │  HTTP API  │  CoAP Gateway  │  TCP Server  │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│                    规则引擎核心层                            │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐     │
│  │ 规则定义管理 │  │ 数据流路由器 │  │ 执行上下文   │     │
│  └──────────────┘  └──────────────┘  └──────────────┘     │
│  ┌──────────────────────────────────────────────────┐     │
│  │         规则处理管道(Reactor Pipeline)          │     │
│  │  Filter → Map → FlatMap → GroupBy → Window       │     │
│  └──────────────────────────────────────────────────┘     │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│                     动作执行层                               │
│  消息通知 │ 数据转发 │ 设备控制 │ 数据存储 │ 自定义脚本   │
└─────────────────────────────────────────────────────────────┘

0x02 核心数据结构:规则模型设计

2.1 规则定义(Rule Definition)

/**
 * 规则定义
 */
@Data
@Builder
public class RuleDefinition {
    /** 规则ID */
    private String id;
    
    /** 规则名称 */
    private String name;
    
    /** 规则类型:设备告警、数据转发等 */
    private RuleType type;
    
    /** 触发条件:使用SpEL表达式 */
    private List<Condition> conditions;
    
    /** 执行动作 */
    private List<Action> actions;
    
    /** 是否启用 */
    private boolean enabled;
    
    /** 调度策略:实时、定时、批量 */
    private SchedulePolicy schedulePolicy;
    
    /** 创建时间 */
    private LocalDateTime createTime;
}

/**
 * 条件定义
 */
@Data
public class Condition {
    /** 条件类型:设备属性、设备事件、定时触发 */
    private ConditionType type;
    
    /** 过滤表达式:SpEL语法
     * 例如:#device.temperature > 80 && #device.status == 'online'
     */
    private String expression;
    
    /** 时间窗口:用于聚合计算 */
    private TimeWindow timeWindow;
}

/**
 * 动作定义
 */
@Data
public class Action {
    /** 动作类型:消息通知、设备控制、数据转发 */
    private ActionType type;
    
    /** 动作参数(JSON格式) */
    private Map<String, Object> parameters;
    
    /** 是否异步执行 */
    private boolean async;
    
    /** 失败重试策略 */
    private RetryPolicy retryPolicy;
}

2.2 执行上下文(Execution Context)

规则执行需要携带完整的上下文信息:

/**
 * 规则执行上下文
 */
@Data
@Builder
public class RuleExecutionContext {
    /** 规则实例 */
    private RuleDefinition rule;
    
    /** 触发数据 */
    private DeviceMessage message;
    
    /** 设备信息 */
    private DeviceMetadata device;
    
    /** 全局变量(用于条件计算) */
    private Map<String, Object> variables;
    
    /** 执行开始时间 */
    private Instant startTime;
    
    /** 追踪ID(用于日志关联) */
    private String traceId;
    
    /** 获取SpEL计算用的根对象 */
    public Map<String, Object> getEvaluationRoot() {
        Map<String, Object> root = new HashMap<>();
        root.put("device", device);
        root.put("message", message);
        root.put("payload", message.getPayload());
        root.putAll(variables);
        return root;
    }
}

0x03 条件引擎:基于SpEL的表达式计算

3.1 Spring Expression Language集成

JetLinks使用SpEL(Spring Expression Language)作为规则条件的表达式语言,兼顾灵活性与性能:

/**
 * 条件计算器
 */
@Component
@Slf4j
public class ConditionEvaluator {
    
    private final SpelExpressionParser parser = new SpelExpressionParser();
    
    /** 表达式缓存(避免重复解析) */
    private final Cache<String, Expression> expressionCache = 
        Caffeine.newBuilder()
            .maximumSize(10_000)
            .expireAfterAccess(Duration.ofHours(1))
            .build();
    
    /**
     * 计算条件是否满足
     */
    public Mono<Boolean> evaluate(Condition condition, RuleExecutionContext context) {
        return Mono.fromCallable(() -> {
            Expression expression = expressionCache.get(
                condition.getExpression(), 
                expr -> parser.parseExpression(expr)
            );
            
            StandardEvaluationContext evalContext = new StandardEvaluationContext();
            evalContext.setRootObject(context.getEvaluationRoot());
            evalContext.setVariables(context.getVariables());
            
            // 注册自定义函数
            registerCustomFunctions(evalContext);
            
            Boolean result = expression.getValue(evalContext, Boolean.class);
            return result != null && result;
        }).onErrorResume(ex -> {
            log.error("条件计算失败: expression={}, context={}", 
                condition.getExpression(), context, ex);
            return Mono.just(false);
        });
    }
    
    /**
     * 注册自定义函数
     */
    private void registerCustomFunctions(StandardEvaluationContext context) {
        try {
            // 注册Math函数
            context.registerFunction("abs", 
                Math.class.getDeclaredMethod("abs", double.class));
            context.registerFunction("max", 
                Math.class.getDeclaredMethod("max", double.class, double.class));
            
            // 注册自定义函数
            context.registerFunction("between", 
                this.getClass().getDeclaredMethod("between", 
                    double.class, double.class, double.class));
        } catch (NoSuchMethodException e) {
            log.error("注册SpEL函数失败", e);
        }
    }
    
    /**
     * 自定义函数:判断值是否在区间内
     */
    public static boolean between(double value, double min, double max) {
        return value >= min && value <= max;
    }
}

3.2 复杂条件示例

// 温度告警规则
Condition tempAlert = Condition.builder()
    .type(ConditionType.DEVICE_PROPERTY)
    .expression(
        "#device.productId == 'temp-sensor-v2' && " +
        "#payload.temperature > 80 && " +
        "#payload.temperature < 120 && " +
        "#between(#payload.humidity, 30, 70)"
    )
    .build();

// 设备离线检测(5分钟内未上报)
Condition offlineCheck = Condition.builder()
    .type(ConditionType.TIME_WINDOW)
    .expression(
        "#message.timestamp < (#now() - 300000)"  // 5分钟 = 300000ms
    )
    .timeWindow(TimeWindow.of(Duration.ofMinutes(5)))
    .build();

// 聚合条件:平均值超过阈值
Condition avgAlert = Condition.builder()
    .type(ConditionType.AGGREGATION)
    .expression(
        "#avg(#payload.temperature) > 75 && #count() >= 10"
    )
    .timeWindow(TimeWindow.tumbling(Duration.ofMinutes(10)))
    .build();

0x04 规则引擎核心:响应式管道实现

4.1 规则处理器(Rule Processor)

/**
 * 规则处理器:将设备消息流转换为规则执行流
 */
@Component
@Slf4j
public class RuleProcessor {
    
    private final ConditionEvaluator conditionEvaluator;
    private final ActionExecutor actionExecutor;
    private final RuleRepository ruleRepository;
    
    /**
     * 处理设备消息流
     * @param messageFlux 设备消息流
     * @return 处理结果流
     */
    public Flux<RuleExecutionResult> process(Flux<DeviceMessage> messageFlux) {
        return messageFlux
            // 1. 丰富上下文:查询设备元数据
            .flatMap(this::enrichContext, 32) // 并发度32
            
            // 2. 路由到匹配的规则
            .flatMap(this::routeToRules, 16)
            
            // 3. 条件计算
            .filterWhen(ctx -> evaluateConditions(ctx.getRule(), ctx))
            
            // 4. 执行动作
            .flatMap(this::executeActions, 64)
            
            // 5. 错误处理与日志
            .doOnError(ex -> log.error("规则处理失败", ex))
            .onErrorResume(ex -> Mono.empty())
            
            // 6. 性能监控
            .doOnNext(result -> recordMetrics(result));
    }
    
    /**
     * 丰富执行上下文
     */
    private Mono<RuleExecutionContext> enrichContext(DeviceMessage message) {
        return deviceService.getMetadata(message.getDeviceId())
            .map(device -> RuleExecutionContext.builder()
                .message(message)
                .device(device)
                .traceId(generateTraceId())
                .startTime(Instant.now())
                .variables(new HashMap<>())
                .build());
    }
    
    /**
     * 路由到匹配的规则(根据设备ID、产品ID等)
     */
    private Flux<RuleExecutionContext> routeToRules(RuleExecutionContext context) {
        return ruleRepository
            .findEnabledRules(context.getDevice().getProductId())
            .map(rule -> context.toBuilder().rule(rule).build());
    }
    
    /**
     * 计算所有条件(AND逻辑)
     */
    private Mono<Boolean> evaluateConditions(RuleDefinition rule, 
                                              RuleExecutionContext context) {
        List<Mono<Boolean>> conditionResults = rule.getConditions().stream()
            .map(condition -> conditionEvaluator.evaluate(condition, context))
            .collect(Collectors.toList());
        
        // 所有条件都满足才返回true
        return Flux.concat(conditionResults)
            .all(result -> result);
    }
    
    /**
     * 执行所有动作
     */
    private Flux<RuleExecutionResult> executeActions(RuleExecutionContext context) {
        return Flux.fromIterable(context.getRule().getActions())
            .flatMap(action -> actionExecutor.execute(action, context));
    }
}

4.2 回压与流量控制

响应式流的一大优势是自动回压机制:

/**
 * 带回压控制的消息处理
 */
public Flux<RuleExecutionResult> processWithBackpressure(
        Flux<DeviceMessage> messageFlux) {
    
    return messageFlux
        // 限流:每秒最多处理10000条消息
        .limitRate(10000)
        
        // 缓冲:防止瞬时流量冲击
        .buffer(Duration.ofSeconds(1), 1000)
        .flatMap(Flux::fromIterable)
        
        // 按设备ID分组(避免同一设备消息乱序)
        .groupBy(msg -> msg.getDeviceId(), Integer.MAX_VALUE)
        
        // 每个分组并发处理
        .flatMap(group -> group
            .flatMap(this::enrichContext)
            .flatMap(this::routeToRules)
            .filterWhen(ctx -> evaluateConditions(ctx.getRule(), ctx))
            .flatMap(this::executeActions)
        , 128) // 最多128个设备并发
        
        // 背压策略:当下游处理不过来时,丢弃最旧的数据
        .onBackpressureDrop(msg -> 
            log.warn("消息被丢弃due to backpressure: {}", msg)
        );
}

0x05 动作执行器:可扩展的插件架构

5.1 动作执行器接口

/**
 * 动作执行器接口
 */
public interface ActionExecutor {
    
    /**
     * 获取支持的动作类型
     */
    ActionType supportedType();
    
    /**
     * 执行动作
     * @param action 动作定义
     * @param context 执行上下文
     * @return 执行结果
     */
    Mono<RuleExecutionResult> execute(Action action, RuleExecutionContext context);
}

/**
 * 动作执行器管理器
 */
@Component
public class ActionExecutorManager {
    
    private final Map<ActionType, ActionExecutor> executors = new ConcurrentHashMap<>();
    
    @Autowired
    public ActionExecutorManager(List<ActionExecutor> executorList) {
        executorList.forEach(executor -> 
            executors.put(executor.supportedType(), executor)
        );
    }
    
    public Mono<RuleExecutionResult> execute(Action action, RuleExecutionContext context) {
        ActionExecutor executor = executors.get(action.getType());
        if (executor == null) {
            return Mono.error(new UnsupportedOperationException(
                "不支持的动作类型: " + action.getType()
            ));
        }
        
        return executor.execute(action, context)
            .timeout(Duration.ofSeconds(30)) // 超时控制
            .retry(action.getRetryPolicy().getMaxRetries()) // 重试
            .onErrorResume(ex -> {
                log.error("动作执行失败: action={}, context={}", action, context, ex);
                return Mono.just(RuleExecutionResult.failure(ex.getMessage()));
            });
    }
}

5.2 具体动作执行器实现

消息通知执行器

/**
 * 消息通知执行器(钉钉、邮件、短信等)
 */
@Component
@Slf4j
public class NotificationActionExecutor implements ActionExecutor {
    
    private final WebClient webClient;
    
    @Override
    public ActionType supportedType() {
        return ActionType.NOTIFICATION;
    }
    
    @Override
    public Mono<RuleExecutionResult> execute(Action action, RuleExecutionContext context) {
        NotificationConfig config = parseConfig(action.getParameters());
        
        // 构建通知内容
        String content = buildNotificationContent(context, config);
        
        // 根据通知类型分发
        return switch (config.getNotificationType()) {
            case DINGTALK -> sendDingTalkMessage(config, content);
            case EMAIL -> sendEmail(config, content);
            case SMS -> sendSms(config, content);
            default -> Mono.error(new IllegalArgumentException("未知的通知类型"));
        };
    }
    
    /**
     * 发送钉钉消息
     */
    private Mono<RuleExecutionResult> sendDingTalkMessage(
            NotificationConfig config, String content) {
        
        Map<String, Object> body = Map.of(
            "msgtype", "text",
            "text", Map.of("content", content)
        );
        
        return webClient.post()
            .uri(config.getWebhookUrl())
            .bodyValue(body)
            .retrieve()
            .bodyToMono(String.class)
            .map(response -> RuleExecutionResult.success("钉钉消息发送成功"))
            .doOnError(ex -> log.error("钉钉消息发送失败: {}", content, ex));
    }
    
    /**
     * 构建通知内容(支持模板变量)
     */
    private String buildNotificationContent(RuleExecutionContext context, 
                                             NotificationConfig config) {
        String template = config.getContentTemplate();
        
        // 使用Mustache模板引擎渲染
        MustacheFactory mf = new DefaultMustacheFactory();
        Mustache mustache = mf.compile(new StringReader(template), "notification");
        
        StringWriter writer = new StringWriter();
        mustache.execute(writer, context.getEvaluationRoot());
        
        return writer.toString();
    }
}

通知模板示例:

【设备告警】
设备名称:device.name
产品类型:device.productName
告警内容:温度异常,当前值 payload.temperature℃
触发时间:message.timestamp
处理建议:请立即检查设备运行状态

设备控制执行器

/**
 * 设备控制执行器(下发指令到设备)
 */
@Component
@Slf4j
public class DeviceControlActionExecutor implements ActionExecutor {
    
    private final DeviceCommandService commandService;
    
    @Override
    public ActionType supportedType() {
        return ActionType.DEVICE_CONTROL;
    }
    
    @Override
    public Mono<RuleExecutionResult> execute(Action action, RuleExecutionContext context) {
        ControlConfig config = parseConfig(action.getParameters());
        
        // 构建设备指令
        DeviceCommand command = DeviceCommand.builder()
            .deviceId(config.getTargetDeviceId())
            .function(config.getFunctionId())
            .parameters(config.getCommandParams())
            .timeout(Duration.ofSeconds(30))
            .build();
        
        // 发送指令并等待响应
        return commandService.sendCommand(command)
            .map(response -> {
                if (response.isSuccess()) {
                    return RuleExecutionResult.success(
                        "设备控制成功: " + response.getMessage()
                    );
                } else {
                    return RuleExecutionResult.failure(
                        "设备控制失败: " + response.getErrorMessage()
                    );
                }
            })
            .timeout(Duration.ofSeconds(30))
            .onErrorResume(ex -> {
                log.error("设备控制失败: deviceId={}, command={}", 
                    command.getDeviceId(), command, ex);
                return Mono.just(RuleExecutionResult.failure(ex.getMessage()));
            });
    }
}

数据转发执行器

/**
 * 数据转发执行器(转发到HTTP、MQTT、Kafka等)
 */
@Component
@Slf4j
public class DataForwardActionExecutor implements ActionExecutor {
    
    private final WebClient webClient;
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    @Override
    public ActionType supportedType() {
        return ActionType.DATA_FORWARD;
    }
    
    @Override
    public Mono<RuleExecutionResult> execute(Action action, RuleExecutionContext context) {
        ForwardConfig config = parseConfig(action.getParameters());
        
        // 序列化数据
        String payload = serializePayload(context, config);
        
        return switch (config.getProtocol()) {
            case HTTP -> forwardToHttp(config, payload);
            case KAFKA -> forwardToKafka(config, payload);
            case MQTT -> forwardToMqtt(config, payload);
            default -> Mono.error(new IllegalArgumentException("不支持的转发协议"));
        };
    }
    
    /**
     * 转发到HTTP接口
     */
    private Mono<RuleExecutionResult> forwardToHttp(ForwardConfig config, String payload) {
        return webClient.post()
            .uri(config.getUrl())
            .headers(headers -> headers.setAll(config.getHeaders()))
            .bodyValue(payload)
            .retrieve()
            .bodyToMono(String.class)
            .map(response -> RuleExecutionResult.success("HTTP转发成功"))
            .onErrorResume(ex -> {
                log.error("HTTP转发失败: url={}, payload={}", 
                    config.getUrl(), payload, ex);
                return Mono.just(RuleExecutionResult.failure(ex.getMessage()));
            });
    }
    
    /**
     * 转发到Kafka
     */
    private Mono<RuleExecutionResult> forwardToKafka(ForwardConfig config, String payload) {
        return Mono.fromFuture(
            kafkaTemplate.send(config.getTopic(), payload).completable()
        ).map(result -> RuleExecutionResult.success("Kafka转发成功"))
         .onErrorResume(ex -> {
             log.error("Kafka转发失败: topic={}, payload={}", 
                 config.getTopic(), payload, ex);
             return Mono.just(RuleExecutionResult.failure(ex.getMessage()));
         });
    }
}

0x06 时间窗口与聚合计算

6.1 滑动窗口实现

/**
 * 时间窗口处理器
 */
@Component
public class TimeWindowProcessor {
    
    /**
     * 滚动窗口(Tumbling Window)
     * 每10分钟计算一次平均温度
     */
    public Flux<AggregationResult> tumblingWindow(Flux<DeviceMessage> messageFlux) {
        return messageFlux
            .window(Duration.ofMinutes(10))
            .flatMap(window -> window
                .collect(Collectors.averagingDouble(
                    msg -> msg.getPayload().getDouble("temperature")
                ))
                .map(avgTemp -> AggregationResult.builder()
                    .metric("temperature_avg")
                    .value(avgTemp)
                    .timestamp(Instant.now())
                    .build())
            );
    }
    
    /**
     * 滑动窗口(Sliding Window)
     * 每1分钟统计最近10分钟的最大温度
     */
    public Flux<AggregationResult> slidingWindow(Flux<DeviceMessage> messageFlux) {
        return messageFlux
            .window(Duration.ofMinutes(10), Duration.ofMinutes(1))
            .flatMap(window -> window
                .collect(Collectors.maxBy(
                    Comparator.comparing(msg -> 
                        msg.getPayload().getDouble("temperature"))
                ))
                .map(Optional::get)
                .map(msg -> AggregationResult.builder()
                    .metric("temperature_max")
                    .value(msg.getPayload().getDouble("temperature"))
                    .deviceId(msg.getDeviceId())
                    .timestamp(Instant.now())
                    .build())
            );
    }
    
    /**
     * 会话窗口(Session Window)
     * 检测设备在线会话(5分钟无数据视为会话结束)
     */
    public Flux<SessionResult> sessionWindow(Flux<DeviceMessage> messageFlux) {
        return messageFlux
            .groupBy(DeviceMessage::getDeviceId)
            .flatMap(group -> group
                .windowTimeout(100, Duration.ofMinutes(5))
                .flatMap(window -> window
                    .collect(Collectors.toList())
                    .filter(list -> !list.isEmpty())
                    .map(list -> SessionResult.builder()
                        .deviceId(group.key())
                        .messageCount(list.size())
                        .sessionStart(list.get(0).getTimestamp())
                        .sessionEnd(list.get(list.size() - 1).getTimestamp())
                        .build())
                )
            );
    }
}

6.2 复杂聚合规则

/**
 * 聚合规则:检测温度异常波动
 * 条件:10分钟内温度标准差 > 10℃
 */
public Flux<AlertEvent> detectTemperatureFluctuation(Flux<DeviceMessage> messageFlux) {
    return messageFlux
        .filter(msg -> msg.getPayload().has("temperature"))
        .window(Duration.ofMinutes(10))
        .flatMap(window -> window
            .map(msg -> msg.getPayload().getDouble("temperature"))
            .collect(Collectors.toList())
            .filter(temps -> temps.size() >= 10) // 至少10个样本
            .map(temps -> {
                double mean = temps.stream()
                    .mapToDouble(Double::doubleValue)
                    .average()
                    .orElse(0.0);
                
                double stdDev = Math.sqrt(
                    temps.stream()
                        .mapToDouble(temp -> Math.pow(temp - mean, 2))
                        .average()
                        .orElse(0.0)
                );
                
                return Map.entry(mean, stdDev);
            })
            .filter(entry -> entry.getValue() > 10.0) // 标准差阈值
            .map(entry -> AlertEvent.builder()
                .type("TEMPERATURE_FLUCTUATION")
                .message(String.format(
                    "温度波动异常:平均值=%.2f℃, 标准差=%.2f℃",
                    entry.getKey(), entry.getValue()
                ))
                .timestamp(Instant.now())
                .build())
        );
}

0x07 性能优化:从理论到实践

7.1 并发度调优

/**
 * 性能优化配置
 */
@Configuration
public class RuleEnginePerformanceConfig {
    
    @Bean
    public Scheduler ruleProcessorScheduler() {
        // 使用弹性线程池(适合I/O密集型任务)
        return Schedulers.boundedElastic();
    }
    
    @Bean
    public Scheduler actionExecutorScheduler() {
        // 使用并行线程池(CPU核心数)
        return Schedulers.parallel();
    }
    
    /**
     * 优化后的处理流程
     */
    public Flux<RuleExecutionResult> optimizedProcess(Flux<DeviceMessage> messageFlux) {
        return messageFlux
            // 1. I/O操作(查询设备元数据)使用弹性调度器
            .publishOn(ruleProcessorScheduler())
            .flatMap(this::enrichContext, 64) // 高并发度
            
            // 2. CPU密集型操作(条件计算)使用并行调度器
            .publishOn(actionExecutorScheduler())
            .flatMap(this::routeToRules, 32)
            .filterWhen(ctx -> evaluateConditions(ctx.getRule(), ctx))
            
            // 3. I/O操作(执行动作)再次切换
            .publishOn(ruleProcessorScheduler())
            .flatMap(this::executeActions, 128);
    }
}

7.2 缓存优化

/**
 * 多级缓存策略
 */
@Component
public class CachedRuleRepository {
    
    /** L1缓存:本地内存(Caffeine) */
    private final LoadingCache<String, RuleDefinition> localCache = 
        Caffeine.newBuilder()
            .maximumSize(10_000)
            .expireAfterWrite(Duration.ofMinutes(5))
            .build(this::loadFromDatabase);
    
    /** L2缓存:Redis */
    @Autowired
    private ReactiveRedisTemplate<String, RuleDefinition> redisTemplate;
    
    /** L3存储:MySQL */
    @Autowired
    private RuleMapper ruleMapper;
    
    /**
     * 查询规则(带多级缓存)
     */
    public Mono<RuleDefinition> findById(String ruleId) {
        // 1. 查询本地缓存
        RuleDefinition localCached = localCache.getIfPresent(ruleId);
        if (localCached != null) {
            return Mono.just(localCached);
        }
        
        // 2. 查询Redis缓存
        return redisTemplate.opsForValue()
            .get("rule:" + ruleId)
            .switchIfEmpty(
                // 3. 查询数据库
                Mono.fromCallable(() -> loadFromDatabase(ruleId))
                    // 3.1 写入Redis
                    .doOnNext(rule -> redisTemplate.opsForValue()
                        .set("rule:" + ruleId, rule, Duration.ofMinutes(10))
                        .subscribe())
            )
            // 4. 写入本地缓存
            .doOnNext(rule -> localCache.put(ruleId, rule));
    }
    
    private RuleDefinition loadFromDatabase(String ruleId) {
        return ruleMapper.selectById(ruleId);
    }
}

7.3 批量处理优化

/**
 * 批量执行动作(减少网络开销)
 */
public Flux<RuleExecutionResult> batchExecuteActions(
        Flux<RuleExecutionContext> contextFlux) {
    
    return contextFlux
        // 按动作类型分组
        .groupBy(ctx -> ctx.getRule().getActions().get(0).getType())
        
        .flatMap(group -> group
            // 每100条或每秒批量处理一次
            .bufferTimeout(100, Duration.ofSeconds(1))
            
            .flatMap(batch -> {
                ActionType actionType = group.key();
                
                return switch (actionType) {
                    case NOTIFICATION -> batchSendNotifications(batch);
                    case DATA_FORWARD -> batchForwardData(batch);
                    default -> Flux.fromIterable(batch)
                        .flatMap(this::executeActions);
                };
            })
        );
}

/**
 * 批量发送通知(合并为一条消息)
 */
private Flux<RuleExecutionResult> batchSendNotifications(
        List<RuleExecutionContext> batch) {
    
    String batchContent = batch.stream()
        .map(ctx -> String.format(
            "[%s] %s: %s",
            ctx.getDevice().getName(),
            ctx.getRule().getName(),
            ctx.getMessage().getPayload()
        ))
        .collect(Collectors.joining("\n"));
    
    return sendNotification(batchContent)
        .flatMapMany(result -> Flux.fromIterable(
            Collections.nCopies(batch.size(), result)
        ));
}

0x08 监控与可观测性

8.1 指标采集

/**
 * 规则引擎监控指标
 */
@Component
public class RuleEngineMetrics {
    
    private final MeterRegistry meterRegistry;
    
    /** 消息处理速率 */
    private final Counter messageCounter;
    
    /** 规则执行耗时 */
    private final Timer ruleExecutionTimer;
    
    /** 动作执行成功率 */
    private final Counter actionSuccessCounter;
    private final Counter actionFailureCounter;
    
    public RuleEngineMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        
        this.messageCounter = Counter.builder("rule.engine.message.processed")
            .description("处理的消息总数")
            .register(meterRegistry);
        
        this.ruleExecutionTimer = Timer.builder("rule.engine.execution.time")
            .description("规则执行耗时")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry);
        
        this.actionSuccessCounter = Counter.builder("rule.engine.action.success")
            .description("动作执行成功数")
            .register(meterRegistry);
        
        this.actionFailureCounter = Counter.builder("rule.engine.action.failure")
            .description("动作执行失败数")
            .register(meterRegistry);
    }
    
    /**
     * 记录规则执行
     */
    public void recordRuleExecution(RuleExecutionContext context, 
                                     Runnable execution) {
        messageCounter.increment();
        
        ruleExecutionTimer.record(() -> {
            try {
                execution.run();
                actionSuccessCounter.increment();
            } catch (Exception ex) {
                actionFailureCounter.increment();
                throw ex;
            }
        });
    }
}

8.2 分布式追踪

/**
 * 集成OpenTelemetry实现分布式追踪
 */
@Component
public class TracedRuleProcessor {
    
    @Autowired
    private Tracer tracer;
    
    public Flux<RuleExecutionResult> processWithTracing(
            Flux<DeviceMessage> messageFlux) {
        
        return messageFlux
            .flatMap(message -> {
                // 创建Span
                Span span = tracer.spanBuilder("rule.process")
                    .setAttribute("device.id", message.getDeviceId())
                    .setAttribute("message.id", message.getMessageId())
                    .startSpan();
                
                try (Scope scope = span.makeCurrent()) {
                    return enrichContext(message)
                        .flatMap(ctx -> {
                            // 子Span:条件计算
                            Span evalSpan = tracer.spanBuilder("rule.evaluate")
                                .startSpan();
                            
                            return routeToRules(ctx)
                                .filterWhen(c -> evaluateConditions(c.getRule(), c))
                                .doFinally(signal -> evalSpan.end())
                                .flatMap(c -> {
                                    // 子Span:动作执行
                                    Span actionSpan = tracer.spanBuilder("rule.action")
                                        .setAttribute("rule.id", c.getRule().getId())
                                        .startSpan();
                                    
                                    return executeActions(c)
                                        .doFinally(signal -> actionSpan.end());
                                });
                        })
                        .doFinally(signal -> span.end());
                } catch (Exception ex) {
                    span.recordException(ex);
                    span.setStatus(StatusCode.ERROR);
                    throw ex;
                }
            });
    }
}

0x09 高可用与容错设计

9.1 降级策略

/**
 * 规则引擎降级与熔断
 */
@Component
public class RuleEngineCircuitBreaker {
    
    private final CircuitBreakerRegistry circuitBreakerRegistry;
    
    /**
     * 带熔断的规则处理
     */
    public Flux<RuleExecutionResult> processWithCircuitBreaker(
            Flux<DeviceMessage> messageFlux) {
        
        CircuitBreaker circuitBreaker = circuitBreakerRegistry
            .circuitBreaker("rule-engine");
        
        return messageFlux
            .flatMap(message -> 
                Mono.fromCallable(() -> process(message))
                    .transform(CircuitBreakerOperator.of(circuitBreaker))
                    .onErrorResume(CircuitBreakerOpenException.class, ex -> {
                        log.warn("熔断器开启,降级处理: {}", message);
                        return Mono.just(degradedProcess(message));
                    })
            );
    }
    
    /**
     * 降级处理:只记录日志,不执行动作
     */
    private RuleExecutionResult degradedProcess(DeviceMessage message) {
        log.info("降级模式:消息已接收但未处理 - deviceId={}, payload={}", 
            message.getDeviceId(), message.getPayload());
        
        return RuleExecutionResult.builder()
            .success(true)
            .message("降级处理:已缓存待后续处理")
            .build();
    }
}

9.2 分布式部署

/**
 * 基于Redis的分布式锁(避免重复处理)
 */
@Component
public class DistributedRuleProcessor {
    
    @Autowired
    private RedissonClient redissonClient;
    
    /**
     * 分布式环境下的规则处理
     */
    public Mono<RuleExecutionResult> processWithLock(RuleExecutionContext context) {
        String lockKey = "rule:lock:" + context.getRule().getId() + 
                         ":" + context.getMessage().getMessageId();
        
        RLock lock = redissonClient.getLock(lockKey);
        
        return Mono.fromCallable(() -> {
            // 尝试获取锁(最多等待1秒)
            if (lock.tryLock(1, 10, TimeUnit.SECONDS)) {
                try {
                    return executeActions(context).block();
                } finally {
                    lock.unlock();
                }
            } else {
                log.debug("获取锁失败,跳过处理: lockKey={}", lockKey);
                return RuleExecutionResult.skipped("其他节点正在处理");
            }
        }).subscribeOn(Schedulers.boundedElastic());
    }
}

0x10 总结:响应式规则引擎的工程价值

10.1 核心优势总结

  1. 高吞吐低延迟:响应式流天然适合IoT场景的高并发数据处理
  2. 资源高效:非阻塞I/O,线程复用,内存占用可控
  3. 弹性伸缩:自动回压机制,流量高峰时自动降级
  4. 易于扩展:插件化的动作执行器,新增功能无需修改核心代码
  5. 可观测性:完善的监控指标和分布式追踪

10.2 适用场景

  • 设备告警:实时检测设备异常状态,触发告警通知
  • 数据转发:将设备数据转发到第三方系统或数据仓库
  • 设备联动:根据传感器数据自动控制执行器设备
  • 数据清洗:过滤、转换、聚合设备原始数据
  • 安全防护:检测异常流量或攻击行为,自动封禁设备

10.3 未来演进方向

  1. AI规则生成:基于历史数据,自动生成告警阈值和规则
  2. 图计算引擎:支持复杂的设备关系图查询
  3. 边缘计算:将规则引擎下沉到边缘网关,减少云端压力
  4. Serverless架构:规则作为FaaS函数,按需加载和执行

参考资料

  1. Project Reactor官方文档
  2. JetLinks开源项目
  3. Reactive Streams规范
  4. Spring WebFlux性能优化指南
  5. Bonér, J., et al. (2014). "The Reactive Manifesto"

#标签: none

- THE END -

非特殊说明,本博所有文章均为博主原创。


暂无评论 >_<