ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统
ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统 🌐
📚 目录
- ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统 🌐
- 一、背景🚀
- 二、系统整体架构 🏗️
- 三、实战展示 🛠️:交易行为告警系统
- 3.1 ABP 采集交易事件 📝
- CAP + Outbox 配置示例 💼
- 3.2 Flink CEP 模式与 Exactly-Once ⚡
- 3.3 Redis Stream + SignalR 实时推送 🔔
- 四、生产级部署和监控 📈
- 五、自动化测试 🧪
一、背景🚀
在金融 💰、电商 🛒、IoT 🌐 等高频交互系统中,越来越多的场景需要“实时发现问题并响应”。
二、系统整体架构 🏗️
💡 图示展示了各组件之间的数据流向,实现消息解耦和高可用。
三、实战展示 🛠️:交易行为告警系统
3.1 ABP 采集交易事件 📝
using System; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Volo.Abp.EventBus; using Volo.Abp.EventBus.Distributed; public class TransactionCreatedDomainEvent : DomainEvent { public Guid UserId { get; set; } public decimal Amount { get; set; } public string Location { get; set; } } public class TransactionCreatedHandler : IDistributedEventHandler { private readonly IDistributedEventBus _eventBus; private readonly ILogger _logger; public TransactionCreatedHandler(IDistributedEventBus eventBus, ILogger logger) { _eventBus = eventBus; _logger = logger; } public async Task HandleEventAsync(TransactionCreatedDomainEvent eventData) { var eto = new TransactionCreatedEto { UserId = eventData.UserId, Amount = eventData.Amount, Location = eventData.Location, OccurredAt = Clock.Now }; try { await _eventBus.PublishAsync(eto); } catch (Exception ex) { _logger.LogError(ex, "发布交易事件失败:{UserId}", eventData.UserId); throw; } } }
CAP + Outbox 配置示例 💼
// appsettings.json "Cap": { "UseEntityFramework": true, "UseDashboard": true, "Producer": { "Kafka": { "Servers": "localhost:9092" } }, "Outbox": { "TableName": "CapOutboxMessages" } }
3.2 Flink CEP 模式与 Exactly-Once ⚡
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.eventtime._ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.cep.scala.pattern.Pattern import org.apache.flink.cep.scala.CEP import java.time.Duration val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(10000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.setStateBackend(new RocksDBStateBackend("file:///flink-checkpoints")) env.getCheckpointConfig.setExternalizedCheckpointCleanup( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) val watermarkStrategy = WatermarkStrategy .forBoundedOutOfOrderness[Transaction](Duration.ofSeconds(5)) .withTimestampAssigner((event, _) => event.timestamp.toEpochMilli) val stream = env .addSource(new FlinkKafkaConsumer[Transaction]("transactions", deserializer, props)) .assignTimestampsAndWatermarks(watermarkStrategy) val pattern = Pattern.begin[Transaction]("first") .where(_.amount > 10000) .next("second") .where(new IterativeCondition[Transaction] { override def filter(event: Transaction, ctx: IterativeCondition.Context[Transaction]) = { val first = ctx.getEventsForPattern("first").iterator().next() event.location != first.location } }) .within(Time.minutes(5)) pattern.handleTimeout(new PatternTimeoutFunction[Transaction, Unit] { override def timeout(map: java.util.Map[String, java.util.List[Transaction]], timestamp: Long, out: Collector[Unit]): Unit = { // 超时清理逻辑 } }, Time.minutes(5))
💡 建议全链路使用 Schema Registry 管理消息格式,防止兼容性问题。
3.3 Redis Stream + SignalR 实时推送 🔔
using System; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using StackExchange.Redis; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.SignalR; public class RiskAlertWorker : BackgroundService { private readonly IConnectionMultiplexer _redis; private readonly IHubContext _hubContext; private readonly ILogger _logger; public RiskAlertWorker(IConnectionMultiplexer redis, IHubContext hubContext, ILogger logger) { _redis = redis; _hubContext = hubContext; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var db = _redis.GetDatabase(); try { await db.StreamCreateConsumerGroupAsync("risk-alerts", "alert-group", "$", true); } catch { /* 忽略 BUSYGROUP */ } int backoff = 1000; while (!stoppingToken.IsCancellationRequested) { try { var entries = await db.StreamReadGroupAsync( "risk-alerts", "alert-group", "consumer-1", count: 10, flags: CommandFlags.Block(5000)); foreach (var entry in entries) { var alert = JsonSerializer.Deserialize(entry["data"]!); await _hubContext.Clients.Group(alert.UserId.ToString()) .SendAsync("ReceiveAlert", alert, stoppingToken); await db.StreamAcknowledgeAsync("risk-alerts", "alert-group", entry.Id); } backoff = 1000; } catch (Exception ex) { _logger.LogError(ex, "处理 Redis 告警失败"); await Task.Delay(backoff, stoppingToken); backoff = Math.Min(backoff * 2, 16000); } } } } [Authorize] public class RiskAlertHub : Hub { }
四、生产级部署和监控 📈
组件 推荐配置 ABP 后端 Pod 存活/就绪探针 ✅ + HTTPS 🔒 + Serilog→Elasticsearch Sink 📝 + CAP Outbox Kafka enable.idempotence=true 🔁, acks=all ✅, TLS/SASL 🔐 Flink RocksDBStateBackend ⚙️ + EXACTLY_ONCE ⚡ + State TTL 🕒 + HA 🌟 Redis Redis Cluster 🔄 + AOF 📝 + ACL 🔑 + 阻塞消费 ⏳ PostgreSQL 主从流复制 🛠️ + WAL 日志 📜 + TimescaleDB 插件 📊 SignalR Azure SignalR ☁️ / Redis Backplane 🔄 + JWT 鉴权 🔏 # Flink YAML 示例 state.backend: rocksdb checkpointing: interval: 10s mode: EXACTLY_ONCE externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# Flink Prometheus Reporter metrics.reporters: prom metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter metrics.reporter.prom.port: 9250
📊 在 Grafana 中可视化:Kafka TPS、Flink 延迟分位、Redis 消费速率、ABP 请求成功率/错误率。
五、自动化测试 🧪
// Testcontainers 启动依赖 var kafka = new KafkaContainer().StartAsync().GetAwaiter().GetResult(); var redis = new RedisContainer().StartAsync().GetAwaiter().GetResult(); var postgres = new PostgreSqlContainer().StartAsync().GetAwaiter().GetResult(); // 注入到 ABP 测试模块 context.Services.Configure(opts => { opts.ProducerConnectionString = kafka.GetBootstrapAddress(); opts.OutboxTableName = "CapOutboxMessages"; }); // Flink MiniCluster var flinkCluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder().Build()); flinkCluster.Start();
(图片来源网络,侵删)(图片来源网络,侵删)(图片来源网络,侵删)
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。