ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统

06-01 1093阅读

ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统 🌐


📚 目录

  • ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统 🌐
    • 一、背景🚀
    • 二、系统整体架构 🏗️
    • 三、实战展示 🛠️:交易行为告警系统
      • 3.1 ABP 采集交易事件 📝
        • CAP + Outbox 配置示例 💼
        • 3.2 Flink CEP 模式与 Exactly-Once ⚡
        • 3.3 Redis Stream + SignalR 实时推送 🔔
        • 四、生产级部署和监控 📈
        • 五、自动化测试 🧪

          一、背景🚀

          在金融 💰、电商 🛒、IoT 🌐 等高频交互系统中,越来越多的场景需要“实时发现问题并响应”。


          二、系统整体架构 🏗️

          💡 图示展示了各组件之间的数据流向,实现消息解耦和高可用。


          三、实战展示 🛠️:交易行为告警系统

          3.1 ABP 采集交易事件 📝

          using System;
          using System.Threading.Tasks;
          using Microsoft.Extensions.Logging;
          using Volo.Abp.EventBus;
          using Volo.Abp.EventBus.Distributed;
          public class TransactionCreatedDomainEvent : DomainEvent
          {
              public Guid UserId { get; set; }
              public decimal Amount { get; set; }
              public string Location { get; set; }
          }
          public class TransactionCreatedHandler : IDistributedEventHandler
          {
              private readonly IDistributedEventBus _eventBus;
              private readonly ILogger _logger;
              public TransactionCreatedHandler(IDistributedEventBus eventBus,
                                               ILogger logger)
              {
                  _eventBus = eventBus;
                  _logger = logger;
              }
              public async Task HandleEventAsync(TransactionCreatedDomainEvent eventData)
              {
                  var eto = new TransactionCreatedEto
                  {
                      UserId = eventData.UserId,
                      Amount = eventData.Amount,
                      Location = eventData.Location,
                      OccurredAt = Clock.Now
                  };
                  try
                  {
                      await _eventBus.PublishAsync(eto);
                  }
                  catch (Exception ex)
                  {
                      _logger.LogError(ex, "发布交易事件失败:{UserId}", eventData.UserId);
                      throw;
                  }
              }
          }
          
          CAP + Outbox 配置示例 💼
          // appsettings.json
          "Cap": {
            "UseEntityFramework": true,
            "UseDashboard": true,
            "Producer": {
              "Kafka": { "Servers": "localhost:9092" }
            },
            "Outbox": { "TableName": "CapOutboxMessages" }
          }
          

          3.2 Flink CEP 模式与 Exactly-Once ⚡

          import org.apache.flink.streaming.api.scala._
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
          import org.apache.flink.streaming.api.CheckpointingMode
          import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
          import org.apache.flink.api.common.eventtime._
          import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
          import org.apache.flink.cep.scala.pattern.Pattern
          import org.apache.flink.cep.scala.CEP
          import java.time.Duration
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          env.enableCheckpointing(10000)
          env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
          env.setStateBackend(new RocksDBStateBackend("file:///flink-checkpoints"))
          env.getCheckpointConfig.setExternalizedCheckpointCleanup(
            ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
          val watermarkStrategy = WatermarkStrategy
            .forBoundedOutOfOrderness[Transaction](Duration.ofSeconds(5))
            .withTimestampAssigner((event, _) => event.timestamp.toEpochMilli)
          val stream = env
            .addSource(new FlinkKafkaConsumer[Transaction]("transactions", deserializer, props))
            .assignTimestampsAndWatermarks(watermarkStrategy)
          val pattern = Pattern.begin[Transaction]("first")
            .where(_.amount > 10000)
            .next("second")
            .where(new IterativeCondition[Transaction] {
              override def filter(event: Transaction, ctx: IterativeCondition.Context[Transaction]) = {
                val first = ctx.getEventsForPattern("first").iterator().next()
                event.location != first.location
              }
            })
            .within(Time.minutes(5))
          pattern.handleTimeout(new PatternTimeoutFunction[Transaction, Unit] {
            override def timeout(map: java.util.Map[String, java.util.List[Transaction]], timestamp: Long, out: Collector[Unit]): Unit = {
              // 超时清理逻辑
            }
          }, Time.minutes(5))
          

          💡 建议全链路使用 Schema Registry 管理消息格式,防止兼容性问题。


          3.3 Redis Stream + SignalR 实时推送 🔔

          using System;
          using System.Text.Json;
          using System.Threading;
          using System.Threading.Tasks;
          using Microsoft.Extensions.Hosting;
          using Microsoft.Extensions.Logging;
          using StackExchange.Redis;
          using Microsoft.AspNetCore.Authorization;
          using Microsoft.AspNetCore.SignalR;
          public class RiskAlertWorker : BackgroundService
          {
              private readonly IConnectionMultiplexer _redis;
              private readonly IHubContext _hubContext;
              private readonly ILogger _logger;
              public RiskAlertWorker(IConnectionMultiplexer redis,
                                     IHubContext hubContext,
                                     ILogger logger)
              {
                  _redis = redis;
                  _hubContext = hubContext;
                  _logger = logger;
              }
              protected override async Task ExecuteAsync(CancellationToken stoppingToken)
              {
                  var db = _redis.GetDatabase();
                  try { await db.StreamCreateConsumerGroupAsync("risk-alerts", "alert-group", "$", true); }
                  catch { /* 忽略 BUSYGROUP */ }
                  int backoff = 1000;
                  while (!stoppingToken.IsCancellationRequested)
                  {
                      try
                      {
                          var entries = await db.StreamReadGroupAsync(
                              "risk-alerts", "alert-group", "consumer-1",
                              count: 10, flags: CommandFlags.Block(5000));
                          foreach (var entry in entries)
                          {
                              var alert = JsonSerializer.Deserialize(entry["data"]!);
                              await _hubContext.Clients.Group(alert.UserId.ToString())
                                         .SendAsync("ReceiveAlert", alert, stoppingToken);
                              await db.StreamAcknowledgeAsync("risk-alerts", "alert-group", entry.Id);
                          }
                          backoff = 1000;
                      }
                      catch (Exception ex)
                      {
                          _logger.LogError(ex, "处理 Redis 告警失败");
                          await Task.Delay(backoff, stoppingToken);
                          backoff = Math.Min(backoff * 2, 16000);
                      }
                  }
              }
          }
          [Authorize]
          public class RiskAlertHub : Hub { }
          

          四、生产级部署和监控 📈

          组件推荐配置
          ABP 后端Pod 存活/就绪探针 ✅ + HTTPS 🔒 + Serilog→Elasticsearch Sink 📝 + CAP Outbox
          Kafkaenable.idempotence=true 🔁, acks=all ✅, TLS/SASL 🔐
          FlinkRocksDBStateBackend ⚙️ + EXACTLY_ONCE ⚡ + State TTL 🕒 + HA 🌟
          RedisRedis Cluster 🔄 + AOF 📝 + ACL 🔑 + 阻塞消费 ⏳
          PostgreSQL主从流复制 🛠️ + WAL 日志 📜 + TimescaleDB 插件 📊
          SignalRAzure SignalR ☁️ / Redis Backplane 🔄 + JWT 鉴权 🔏
          # Flink YAML 示例
          state.backend: rocksdb
          checkpointing:
            interval: 10s
            mode: EXACTLY_ONCE
            externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
          
          # Flink Prometheus Reporter
          metrics.reporters: prom
          metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
          metrics.reporter.prom.port: 9250
          

          📊 在 Grafana 中可视化:Kafka TPS、Flink 延迟分位、Redis 消费速率、ABP 请求成功率/错误率。


          五、自动化测试 🧪

          // Testcontainers 启动依赖
          var kafka = new KafkaContainer().StartAsync().GetAwaiter().GetResult();
          var redis = new RedisContainer().StartAsync().GetAwaiter().GetResult();
          var postgres = new PostgreSqlContainer().StartAsync().GetAwaiter().GetResult();
          // 注入到 ABP 测试模块
          context.Services.Configure(opts => {
              opts.ProducerConnectionString = kafka.GetBootstrapAddress();
              opts.OutboxTableName = "CapOutboxMessages";
          });
          // Flink MiniCluster
          var flinkCluster = new MiniClusterWithClientResource(
              new MiniClusterResourceConfiguration.Builder().Build());
          flinkCluster.Start();
          

          ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统
          (图片来源网络,侵删)
          ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统
          (图片来源网络,侵删)
          ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统
          (图片来源网络,侵删)
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码