JetLinks规则引擎设计与实现:基于Reactor的响应式规则处理框架
0x00 引言:IoT平台的核心引擎
在物联网(IoT)平台架构中,规则引擎是连接设备数据与业务逻辑的核心组件。它需要实时处理海量设备上报的数据流,根据预定义规则执行告警、转发、存储等操作。传统的基于轮询或消息队列的架构在面对百万级设备连接时,往往力不从心。
JetLinks是一个开源的企业级物联网平台,其规则引擎采用响应式编程模型(Reactive Programming),基于Project Reactor实现。这种架构天然适合处理高并发、低延迟的数据流场景,并提供了强大的回压(Backpressure)机制来应对流量洪峰。
本文将从架构设计、核心实现、性能优化三个维度,深入剖析JetLinks规则引擎的工程实践。我们不仅关注"如何实现",更关注"为什么这样设计"。
0x01 架构设计:响应式数据流的管道模型
1.1 传统规则引擎的痛点
传统IoT规则引擎通常采用以下架构:
设备数据 → 消息队列(Kafka/RabbitMQ) → 规则处理器(轮询) → 执行器存在的问题:
- 阻塞式I/O:每个规则处理器线程在等待I/O时被阻塞,资源利用率低
- 内存压力:消息队列需要缓存大量数据,内存消耗大
- 回压缺失:上游生产速度过快时,下游消费者崩溃
- 扩展性差:线程模型限制了并发处理能力
1.2 JetLinks的响应式架构
JetLinks采用响应式流(Reactive Streams)规范,将数据处理抽象为管道(Pipeline)模型:
设备数据源(Flux) → 过滤器(Filter) → 转换器(Map) →
规则匹配(FlatMap) → 动作执行(FlatMap) → 结果输出(Subscribe)核心优势:
- 非阻塞异步:基于事件循环,单线程可处理大量并发
- 声明式编程:函数式API,代码简洁且易于组合
- 自动回压:下游处理能力不足时,自动通知上游减速
- 资源高效:线程复用,内存占用可控
1.3 整体架构图
┌─────────────────────────────────────────────────────────────┐
│ 设备数据采集层 │
│ MQTT Broker │ HTTP API │ CoAP Gateway │ TCP Server │
└────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 规则引擎核心层 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 规则定义管理 │ │ 数据流路由器 │ │ 执行上下文 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ 规则处理管道(Reactor Pipeline) │ │
│ │ Filter → Map → FlatMap → GroupBy → Window │ │
│ └──────────────────────────────────────────────────┘ │
└────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 动作执行层 │
│ 消息通知 │ 数据转发 │ 设备控制 │ 数据存储 │ 自定义脚本 │
└─────────────────────────────────────────────────────────────┘0x02 核心数据结构:规则模型设计
2.1 规则定义(Rule Definition)
/**
* 规则定义
*/
@Data
@Builder
public class RuleDefinition {
/** 规则ID */
private String id;
/** 规则名称 */
private String name;
/** 规则类型:设备告警、数据转发等 */
private RuleType type;
/** 触发条件:使用SpEL表达式 */
private List<Condition> conditions;
/** 执行动作 */
private List<Action> actions;
/** 是否启用 */
private boolean enabled;
/** 调度策略:实时、定时、批量 */
private SchedulePolicy schedulePolicy;
/** 创建时间 */
private LocalDateTime createTime;
}
/**
* 条件定义
*/
@Data
public class Condition {
/** 条件类型:设备属性、设备事件、定时触发 */
private ConditionType type;
/** 过滤表达式:SpEL语法
* 例如:#device.temperature > 80 && #device.status == 'online'
*/
private String expression;
/** 时间窗口:用于聚合计算 */
private TimeWindow timeWindow;
}
/**
* 动作定义
*/
@Data
public class Action {
/** 动作类型:消息通知、设备控制、数据转发 */
private ActionType type;
/** 动作参数(JSON格式) */
private Map<String, Object> parameters;
/** 是否异步执行 */
private boolean async;
/** 失败重试策略 */
private RetryPolicy retryPolicy;
}2.2 执行上下文(Execution Context)
规则执行需要携带完整的上下文信息:
/**
* 规则执行上下文
*/
@Data
@Builder
public class RuleExecutionContext {
/** 规则实例 */
private RuleDefinition rule;
/** 触发数据 */
private DeviceMessage message;
/** 设备信息 */
private DeviceMetadata device;
/** 全局变量(用于条件计算) */
private Map<String, Object> variables;
/** 执行开始时间 */
private Instant startTime;
/** 追踪ID(用于日志关联) */
private String traceId;
/** 获取SpEL计算用的根对象 */
public Map<String, Object> getEvaluationRoot() {
Map<String, Object> root = new HashMap<>();
root.put("device", device);
root.put("message", message);
root.put("payload", message.getPayload());
root.putAll(variables);
return root;
}
}0x03 条件引擎:基于SpEL的表达式计算
3.1 Spring Expression Language集成
JetLinks使用SpEL(Spring Expression Language)作为规则条件的表达式语言,兼顾灵活性与性能:
/**
* 条件计算器
*/
@Component
@Slf4j
public class ConditionEvaluator {
private final SpelExpressionParser parser = new SpelExpressionParser();
/** 表达式缓存(避免重复解析) */
private final Cache<String, Expression> expressionCache =
Caffeine.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(Duration.ofHours(1))
.build();
/**
* 计算条件是否满足
*/
public Mono<Boolean> evaluate(Condition condition, RuleExecutionContext context) {
return Mono.fromCallable(() -> {
Expression expression = expressionCache.get(
condition.getExpression(),
expr -> parser.parseExpression(expr)
);
StandardEvaluationContext evalContext = new StandardEvaluationContext();
evalContext.setRootObject(context.getEvaluationRoot());
evalContext.setVariables(context.getVariables());
// 注册自定义函数
registerCustomFunctions(evalContext);
Boolean result = expression.getValue(evalContext, Boolean.class);
return result != null && result;
}).onErrorResume(ex -> {
log.error("条件计算失败: expression={}, context={}",
condition.getExpression(), context, ex);
return Mono.just(false);
});
}
/**
* 注册自定义函数
*/
private void registerCustomFunctions(StandardEvaluationContext context) {
try {
// 注册Math函数
context.registerFunction("abs",
Math.class.getDeclaredMethod("abs", double.class));
context.registerFunction("max",
Math.class.getDeclaredMethod("max", double.class, double.class));
// 注册自定义函数
context.registerFunction("between",
this.getClass().getDeclaredMethod("between",
double.class, double.class, double.class));
} catch (NoSuchMethodException e) {
log.error("注册SpEL函数失败", e);
}
}
/**
* 自定义函数:判断值是否在区间内
*/
public static boolean between(double value, double min, double max) {
return value >= min && value <= max;
}
}3.2 复杂条件示例
// 温度告警规则
Condition tempAlert = Condition.builder()
.type(ConditionType.DEVICE_PROPERTY)
.expression(
"#device.productId == 'temp-sensor-v2' && " +
"#payload.temperature > 80 && " +
"#payload.temperature < 120 && " +
"#between(#payload.humidity, 30, 70)"
)
.build();
// 设备离线检测(5分钟内未上报)
Condition offlineCheck = Condition.builder()
.type(ConditionType.TIME_WINDOW)
.expression(
"#message.timestamp < (#now() - 300000)" // 5分钟 = 300000ms
)
.timeWindow(TimeWindow.of(Duration.ofMinutes(5)))
.build();
// 聚合条件:平均值超过阈值
Condition avgAlert = Condition.builder()
.type(ConditionType.AGGREGATION)
.expression(
"#avg(#payload.temperature) > 75 && #count() >= 10"
)
.timeWindow(TimeWindow.tumbling(Duration.ofMinutes(10)))
.build();0x04 规则引擎核心:响应式管道实现
4.1 规则处理器(Rule Processor)
/**
* 规则处理器:将设备消息流转换为规则执行流
*/
@Component
@Slf4j
public class RuleProcessor {
private final ConditionEvaluator conditionEvaluator;
private final ActionExecutor actionExecutor;
private final RuleRepository ruleRepository;
/**
* 处理设备消息流
* @param messageFlux 设备消息流
* @return 处理结果流
*/
public Flux<RuleExecutionResult> process(Flux<DeviceMessage> messageFlux) {
return messageFlux
// 1. 丰富上下文:查询设备元数据
.flatMap(this::enrichContext, 32) // 并发度32
// 2. 路由到匹配的规则
.flatMap(this::routeToRules, 16)
// 3. 条件计算
.filterWhen(ctx -> evaluateConditions(ctx.getRule(), ctx))
// 4. 执行动作
.flatMap(this::executeActions, 64)
// 5. 错误处理与日志
.doOnError(ex -> log.error("规则处理失败", ex))
.onErrorResume(ex -> Mono.empty())
// 6. 性能监控
.doOnNext(result -> recordMetrics(result));
}
/**
* 丰富执行上下文
*/
private Mono<RuleExecutionContext> enrichContext(DeviceMessage message) {
return deviceService.getMetadata(message.getDeviceId())
.map(device -> RuleExecutionContext.builder()
.message(message)
.device(device)
.traceId(generateTraceId())
.startTime(Instant.now())
.variables(new HashMap<>())
.build());
}
/**
* 路由到匹配的规则(根据设备ID、产品ID等)
*/
private Flux<RuleExecutionContext> routeToRules(RuleExecutionContext context) {
return ruleRepository
.findEnabledRules(context.getDevice().getProductId())
.map(rule -> context.toBuilder().rule(rule).build());
}
/**
* 计算所有条件(AND逻辑)
*/
private Mono<Boolean> evaluateConditions(RuleDefinition rule,
RuleExecutionContext context) {
List<Mono<Boolean>> conditionResults = rule.getConditions().stream()
.map(condition -> conditionEvaluator.evaluate(condition, context))
.collect(Collectors.toList());
// 所有条件都满足才返回true
return Flux.concat(conditionResults)
.all(result -> result);
}
/**
* 执行所有动作
*/
private Flux<RuleExecutionResult> executeActions(RuleExecutionContext context) {
return Flux.fromIterable(context.getRule().getActions())
.flatMap(action -> actionExecutor.execute(action, context));
}
}4.2 回压与流量控制
响应式流的一大优势是自动回压机制:
/**
* 带回压控制的消息处理
*/
public Flux<RuleExecutionResult> processWithBackpressure(
Flux<DeviceMessage> messageFlux) {
return messageFlux
// 限流:每秒最多处理10000条消息
.limitRate(10000)
// 缓冲:防止瞬时流量冲击
.buffer(Duration.ofSeconds(1), 1000)
.flatMap(Flux::fromIterable)
// 按设备ID分组(避免同一设备消息乱序)
.groupBy(msg -> msg.getDeviceId(), Integer.MAX_VALUE)
// 每个分组并发处理
.flatMap(group -> group
.flatMap(this::enrichContext)
.flatMap(this::routeToRules)
.filterWhen(ctx -> evaluateConditions(ctx.getRule(), ctx))
.flatMap(this::executeActions)
, 128) // 最多128个设备并发
// 背压策略:当下游处理不过来时,丢弃最旧的数据
.onBackpressureDrop(msg ->
log.warn("消息被丢弃due to backpressure: {}", msg)
);
}0x05 动作执行器:可扩展的插件架构
5.1 动作执行器接口
/**
* 动作执行器接口
*/
public interface ActionExecutor {
/**
* 获取支持的动作类型
*/
ActionType supportedType();
/**
* 执行动作
* @param action 动作定义
* @param context 执行上下文
* @return 执行结果
*/
Mono<RuleExecutionResult> execute(Action action, RuleExecutionContext context);
}
/**
* 动作执行器管理器
*/
@Component
public class ActionExecutorManager {
private final Map<ActionType, ActionExecutor> executors = new ConcurrentHashMap<>();
@Autowired
public ActionExecutorManager(List<ActionExecutor> executorList) {
executorList.forEach(executor ->
executors.put(executor.supportedType(), executor)
);
}
public Mono<RuleExecutionResult> execute(Action action, RuleExecutionContext context) {
ActionExecutor executor = executors.get(action.getType());
if (executor == null) {
return Mono.error(new UnsupportedOperationException(
"不支持的动作类型: " + action.getType()
));
}
return executor.execute(action, context)
.timeout(Duration.ofSeconds(30)) // 超时控制
.retry(action.getRetryPolicy().getMaxRetries()) // 重试
.onErrorResume(ex -> {
log.error("动作执行失败: action={}, context={}", action, context, ex);
return Mono.just(RuleExecutionResult.failure(ex.getMessage()));
});
}
}5.2 具体动作执行器实现
消息通知执行器
/**
* 消息通知执行器(钉钉、邮件、短信等)
*/
@Component
@Slf4j
public class NotificationActionExecutor implements ActionExecutor {
private final WebClient webClient;
@Override
public ActionType supportedType() {
return ActionType.NOTIFICATION;
}
@Override
public Mono<RuleExecutionResult> execute(Action action, RuleExecutionContext context) {
NotificationConfig config = parseConfig(action.getParameters());
// 构建通知内容
String content = buildNotificationContent(context, config);
// 根据通知类型分发
return switch (config.getNotificationType()) {
case DINGTALK -> sendDingTalkMessage(config, content);
case EMAIL -> sendEmail(config, content);
case SMS -> sendSms(config, content);
default -> Mono.error(new IllegalArgumentException("未知的通知类型"));
};
}
/**
* 发送钉钉消息
*/
private Mono<RuleExecutionResult> sendDingTalkMessage(
NotificationConfig config, String content) {
Map<String, Object> body = Map.of(
"msgtype", "text",
"text", Map.of("content", content)
);
return webClient.post()
.uri(config.getWebhookUrl())
.bodyValue(body)
.retrieve()
.bodyToMono(String.class)
.map(response -> RuleExecutionResult.success("钉钉消息发送成功"))
.doOnError(ex -> log.error("钉钉消息发送失败: {}", content, ex));
}
/**
* 构建通知内容(支持模板变量)
*/
private String buildNotificationContent(RuleExecutionContext context,
NotificationConfig config) {
String template = config.getContentTemplate();
// 使用Mustache模板引擎渲染
MustacheFactory mf = new DefaultMustacheFactory();
Mustache mustache = mf.compile(new StringReader(template), "notification");
StringWriter writer = new StringWriter();
mustache.execute(writer, context.getEvaluationRoot());
return writer.toString();
}
}通知模板示例:
【设备告警】
设备名称:device.name
产品类型:device.productName
告警内容:温度异常,当前值 payload.temperature℃
触发时间:message.timestamp
处理建议:请立即检查设备运行状态设备控制执行器
/**
* 设备控制执行器(下发指令到设备)
*/
@Component
@Slf4j
public class DeviceControlActionExecutor implements ActionExecutor {
private final DeviceCommandService commandService;
@Override
public ActionType supportedType() {
return ActionType.DEVICE_CONTROL;
}
@Override
public Mono<RuleExecutionResult> execute(Action action, RuleExecutionContext context) {
ControlConfig config = parseConfig(action.getParameters());
// 构建设备指令
DeviceCommand command = DeviceCommand.builder()
.deviceId(config.getTargetDeviceId())
.function(config.getFunctionId())
.parameters(config.getCommandParams())
.timeout(Duration.ofSeconds(30))
.build();
// 发送指令并等待响应
return commandService.sendCommand(command)
.map(response -> {
if (response.isSuccess()) {
return RuleExecutionResult.success(
"设备控制成功: " + response.getMessage()
);
} else {
return RuleExecutionResult.failure(
"设备控制失败: " + response.getErrorMessage()
);
}
})
.timeout(Duration.ofSeconds(30))
.onErrorResume(ex -> {
log.error("设备控制失败: deviceId={}, command={}",
command.getDeviceId(), command, ex);
return Mono.just(RuleExecutionResult.failure(ex.getMessage()));
});
}
}数据转发执行器
/**
* 数据转发执行器(转发到HTTP、MQTT、Kafka等)
*/
@Component
@Slf4j
public class DataForwardActionExecutor implements ActionExecutor {
private final WebClient webClient;
private final KafkaTemplate<String, String> kafkaTemplate;
@Override
public ActionType supportedType() {
return ActionType.DATA_FORWARD;
}
@Override
public Mono<RuleExecutionResult> execute(Action action, RuleExecutionContext context) {
ForwardConfig config = parseConfig(action.getParameters());
// 序列化数据
String payload = serializePayload(context, config);
return switch (config.getProtocol()) {
case HTTP -> forwardToHttp(config, payload);
case KAFKA -> forwardToKafka(config, payload);
case MQTT -> forwardToMqtt(config, payload);
default -> Mono.error(new IllegalArgumentException("不支持的转发协议"));
};
}
/**
* 转发到HTTP接口
*/
private Mono<RuleExecutionResult> forwardToHttp(ForwardConfig config, String payload) {
return webClient.post()
.uri(config.getUrl())
.headers(headers -> headers.setAll(config.getHeaders()))
.bodyValue(payload)
.retrieve()
.bodyToMono(String.class)
.map(response -> RuleExecutionResult.success("HTTP转发成功"))
.onErrorResume(ex -> {
log.error("HTTP转发失败: url={}, payload={}",
config.getUrl(), payload, ex);
return Mono.just(RuleExecutionResult.failure(ex.getMessage()));
});
}
/**
* 转发到Kafka
*/
private Mono<RuleExecutionResult> forwardToKafka(ForwardConfig config, String payload) {
return Mono.fromFuture(
kafkaTemplate.send(config.getTopic(), payload).completable()
).map(result -> RuleExecutionResult.success("Kafka转发成功"))
.onErrorResume(ex -> {
log.error("Kafka转发失败: topic={}, payload={}",
config.getTopic(), payload, ex);
return Mono.just(RuleExecutionResult.failure(ex.getMessage()));
});
}
}0x06 时间窗口与聚合计算
6.1 滑动窗口实现
/**
* 时间窗口处理器
*/
@Component
public class TimeWindowProcessor {
/**
* 滚动窗口(Tumbling Window)
* 每10分钟计算一次平均温度
*/
public Flux<AggregationResult> tumblingWindow(Flux<DeviceMessage> messageFlux) {
return messageFlux
.window(Duration.ofMinutes(10))
.flatMap(window -> window
.collect(Collectors.averagingDouble(
msg -> msg.getPayload().getDouble("temperature")
))
.map(avgTemp -> AggregationResult.builder()
.metric("temperature_avg")
.value(avgTemp)
.timestamp(Instant.now())
.build())
);
}
/**
* 滑动窗口(Sliding Window)
* 每1分钟统计最近10分钟的最大温度
*/
public Flux<AggregationResult> slidingWindow(Flux<DeviceMessage> messageFlux) {
return messageFlux
.window(Duration.ofMinutes(10), Duration.ofMinutes(1))
.flatMap(window -> window
.collect(Collectors.maxBy(
Comparator.comparing(msg ->
msg.getPayload().getDouble("temperature"))
))
.map(Optional::get)
.map(msg -> AggregationResult.builder()
.metric("temperature_max")
.value(msg.getPayload().getDouble("temperature"))
.deviceId(msg.getDeviceId())
.timestamp(Instant.now())
.build())
);
}
/**
* 会话窗口(Session Window)
* 检测设备在线会话(5分钟无数据视为会话结束)
*/
public Flux<SessionResult> sessionWindow(Flux<DeviceMessage> messageFlux) {
return messageFlux
.groupBy(DeviceMessage::getDeviceId)
.flatMap(group -> group
.windowTimeout(100, Duration.ofMinutes(5))
.flatMap(window -> window
.collect(Collectors.toList())
.filter(list -> !list.isEmpty())
.map(list -> SessionResult.builder()
.deviceId(group.key())
.messageCount(list.size())
.sessionStart(list.get(0).getTimestamp())
.sessionEnd(list.get(list.size() - 1).getTimestamp())
.build())
)
);
}
}6.2 复杂聚合规则
/**
* 聚合规则:检测温度异常波动
* 条件:10分钟内温度标准差 > 10℃
*/
public Flux<AlertEvent> detectTemperatureFluctuation(Flux<DeviceMessage> messageFlux) {
return messageFlux
.filter(msg -> msg.getPayload().has("temperature"))
.window(Duration.ofMinutes(10))
.flatMap(window -> window
.map(msg -> msg.getPayload().getDouble("temperature"))
.collect(Collectors.toList())
.filter(temps -> temps.size() >= 10) // 至少10个样本
.map(temps -> {
double mean = temps.stream()
.mapToDouble(Double::doubleValue)
.average()
.orElse(0.0);
double stdDev = Math.sqrt(
temps.stream()
.mapToDouble(temp -> Math.pow(temp - mean, 2))
.average()
.orElse(0.0)
);
return Map.entry(mean, stdDev);
})
.filter(entry -> entry.getValue() > 10.0) // 标准差阈值
.map(entry -> AlertEvent.builder()
.type("TEMPERATURE_FLUCTUATION")
.message(String.format(
"温度波动异常:平均值=%.2f℃, 标准差=%.2f℃",
entry.getKey(), entry.getValue()
))
.timestamp(Instant.now())
.build())
);
}0x07 性能优化:从理论到实践
7.1 并发度调优
/**
* 性能优化配置
*/
@Configuration
public class RuleEnginePerformanceConfig {
@Bean
public Scheduler ruleProcessorScheduler() {
// 使用弹性线程池(适合I/O密集型任务)
return Schedulers.boundedElastic();
}
@Bean
public Scheduler actionExecutorScheduler() {
// 使用并行线程池(CPU核心数)
return Schedulers.parallel();
}
/**
* 优化后的处理流程
*/
public Flux<RuleExecutionResult> optimizedProcess(Flux<DeviceMessage> messageFlux) {
return messageFlux
// 1. I/O操作(查询设备元数据)使用弹性调度器
.publishOn(ruleProcessorScheduler())
.flatMap(this::enrichContext, 64) // 高并发度
// 2. CPU密集型操作(条件计算)使用并行调度器
.publishOn(actionExecutorScheduler())
.flatMap(this::routeToRules, 32)
.filterWhen(ctx -> evaluateConditions(ctx.getRule(), ctx))
// 3. I/O操作(执行动作)再次切换
.publishOn(ruleProcessorScheduler())
.flatMap(this::executeActions, 128);
}
}7.2 缓存优化
/**
* 多级缓存策略
*/
@Component
public class CachedRuleRepository {
/** L1缓存:本地内存(Caffeine) */
private final LoadingCache<String, RuleDefinition> localCache =
Caffeine.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(Duration.ofMinutes(5))
.build(this::loadFromDatabase);
/** L2缓存:Redis */
@Autowired
private ReactiveRedisTemplate<String, RuleDefinition> redisTemplate;
/** L3存储:MySQL */
@Autowired
private RuleMapper ruleMapper;
/**
* 查询规则(带多级缓存)
*/
public Mono<RuleDefinition> findById(String ruleId) {
// 1. 查询本地缓存
RuleDefinition localCached = localCache.getIfPresent(ruleId);
if (localCached != null) {
return Mono.just(localCached);
}
// 2. 查询Redis缓存
return redisTemplate.opsForValue()
.get("rule:" + ruleId)
.switchIfEmpty(
// 3. 查询数据库
Mono.fromCallable(() -> loadFromDatabase(ruleId))
// 3.1 写入Redis
.doOnNext(rule -> redisTemplate.opsForValue()
.set("rule:" + ruleId, rule, Duration.ofMinutes(10))
.subscribe())
)
// 4. 写入本地缓存
.doOnNext(rule -> localCache.put(ruleId, rule));
}
private RuleDefinition loadFromDatabase(String ruleId) {
return ruleMapper.selectById(ruleId);
}
}7.3 批量处理优化
/**
* 批量执行动作(减少网络开销)
*/
public Flux<RuleExecutionResult> batchExecuteActions(
Flux<RuleExecutionContext> contextFlux) {
return contextFlux
// 按动作类型分组
.groupBy(ctx -> ctx.getRule().getActions().get(0).getType())
.flatMap(group -> group
// 每100条或每秒批量处理一次
.bufferTimeout(100, Duration.ofSeconds(1))
.flatMap(batch -> {
ActionType actionType = group.key();
return switch (actionType) {
case NOTIFICATION -> batchSendNotifications(batch);
case DATA_FORWARD -> batchForwardData(batch);
default -> Flux.fromIterable(batch)
.flatMap(this::executeActions);
};
})
);
}
/**
* 批量发送通知(合并为一条消息)
*/
private Flux<RuleExecutionResult> batchSendNotifications(
List<RuleExecutionContext> batch) {
String batchContent = batch.stream()
.map(ctx -> String.format(
"[%s] %s: %s",
ctx.getDevice().getName(),
ctx.getRule().getName(),
ctx.getMessage().getPayload()
))
.collect(Collectors.joining("\n"));
return sendNotification(batchContent)
.flatMapMany(result -> Flux.fromIterable(
Collections.nCopies(batch.size(), result)
));
}0x08 监控与可观测性
8.1 指标采集
/**
* 规则引擎监控指标
*/
@Component
public class RuleEngineMetrics {
private final MeterRegistry meterRegistry;
/** 消息处理速率 */
private final Counter messageCounter;
/** 规则执行耗时 */
private final Timer ruleExecutionTimer;
/** 动作执行成功率 */
private final Counter actionSuccessCounter;
private final Counter actionFailureCounter;
public RuleEngineMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.messageCounter = Counter.builder("rule.engine.message.processed")
.description("处理的消息总数")
.register(meterRegistry);
this.ruleExecutionTimer = Timer.builder("rule.engine.execution.time")
.description("规则执行耗时")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
this.actionSuccessCounter = Counter.builder("rule.engine.action.success")
.description("动作执行成功数")
.register(meterRegistry);
this.actionFailureCounter = Counter.builder("rule.engine.action.failure")
.description("动作执行失败数")
.register(meterRegistry);
}
/**
* 记录规则执行
*/
public void recordRuleExecution(RuleExecutionContext context,
Runnable execution) {
messageCounter.increment();
ruleExecutionTimer.record(() -> {
try {
execution.run();
actionSuccessCounter.increment();
} catch (Exception ex) {
actionFailureCounter.increment();
throw ex;
}
});
}
}8.2 分布式追踪
/**
* 集成OpenTelemetry实现分布式追踪
*/
@Component
public class TracedRuleProcessor {
@Autowired
private Tracer tracer;
public Flux<RuleExecutionResult> processWithTracing(
Flux<DeviceMessage> messageFlux) {
return messageFlux
.flatMap(message -> {
// 创建Span
Span span = tracer.spanBuilder("rule.process")
.setAttribute("device.id", message.getDeviceId())
.setAttribute("message.id", message.getMessageId())
.startSpan();
try (Scope scope = span.makeCurrent()) {
return enrichContext(message)
.flatMap(ctx -> {
// 子Span:条件计算
Span evalSpan = tracer.spanBuilder("rule.evaluate")
.startSpan();
return routeToRules(ctx)
.filterWhen(c -> evaluateConditions(c.getRule(), c))
.doFinally(signal -> evalSpan.end())
.flatMap(c -> {
// 子Span:动作执行
Span actionSpan = tracer.spanBuilder("rule.action")
.setAttribute("rule.id", c.getRule().getId())
.startSpan();
return executeActions(c)
.doFinally(signal -> actionSpan.end());
});
})
.doFinally(signal -> span.end());
} catch (Exception ex) {
span.recordException(ex);
span.setStatus(StatusCode.ERROR);
throw ex;
}
});
}
}0x09 高可用与容错设计
9.1 降级策略
/**
* 规则引擎降级与熔断
*/
@Component
public class RuleEngineCircuitBreaker {
private final CircuitBreakerRegistry circuitBreakerRegistry;
/**
* 带熔断的规则处理
*/
public Flux<RuleExecutionResult> processWithCircuitBreaker(
Flux<DeviceMessage> messageFlux) {
CircuitBreaker circuitBreaker = circuitBreakerRegistry
.circuitBreaker("rule-engine");
return messageFlux
.flatMap(message ->
Mono.fromCallable(() -> process(message))
.transform(CircuitBreakerOperator.of(circuitBreaker))
.onErrorResume(CircuitBreakerOpenException.class, ex -> {
log.warn("熔断器开启,降级处理: {}", message);
return Mono.just(degradedProcess(message));
})
);
}
/**
* 降级处理:只记录日志,不执行动作
*/
private RuleExecutionResult degradedProcess(DeviceMessage message) {
log.info("降级模式:消息已接收但未处理 - deviceId={}, payload={}",
message.getDeviceId(), message.getPayload());
return RuleExecutionResult.builder()
.success(true)
.message("降级处理:已缓存待后续处理")
.build();
}
}9.2 分布式部署
/**
* 基于Redis的分布式锁(避免重复处理)
*/
@Component
public class DistributedRuleProcessor {
@Autowired
private RedissonClient redissonClient;
/**
* 分布式环境下的规则处理
*/
public Mono<RuleExecutionResult> processWithLock(RuleExecutionContext context) {
String lockKey = "rule:lock:" + context.getRule().getId() +
":" + context.getMessage().getMessageId();
RLock lock = redissonClient.getLock(lockKey);
return Mono.fromCallable(() -> {
// 尝试获取锁(最多等待1秒)
if (lock.tryLock(1, 10, TimeUnit.SECONDS)) {
try {
return executeActions(context).block();
} finally {
lock.unlock();
}
} else {
log.debug("获取锁失败,跳过处理: lockKey={}", lockKey);
return RuleExecutionResult.skipped("其他节点正在处理");
}
}).subscribeOn(Schedulers.boundedElastic());
}
}0x10 总结:响应式规则引擎的工程价值
10.1 核心优势总结
- 高吞吐低延迟:响应式流天然适合IoT场景的高并发数据处理
- 资源高效:非阻塞I/O,线程复用,内存占用可控
- 弹性伸缩:自动回压机制,流量高峰时自动降级
- 易于扩展:插件化的动作执行器,新增功能无需修改核心代码
- 可观测性:完善的监控指标和分布式追踪
10.2 适用场景
- 设备告警:实时检测设备异常状态,触发告警通知
- 数据转发:将设备数据转发到第三方系统或数据仓库
- 设备联动:根据传感器数据自动控制执行器设备
- 数据清洗:过滤、转换、聚合设备原始数据
- 安全防护:检测异常流量或攻击行为,自动封禁设备
10.3 未来演进方向
- AI规则生成:基于历史数据,自动生成告警阈值和规则
- 图计算引擎:支持复杂的设备关系图查询
- 边缘计算:将规则引擎下沉到边缘网关,减少云端压力
- Serverless架构:规则作为FaaS函数,按需加载和执行
参考资料:
- Project Reactor官方文档
- JetLinks开源项目
- Reactive Streams规范
- Spring WebFlux性能优化指南
- Bonér, J., et al. (2014). "The Reactive Manifesto"