物联网 SpringBoot整合InfluxDB 2.*

06-01 1480阅读

前言

随着物联网(IoT)和大数据时代的到来,传统关系型数据库在处理时序数据、传感器数据等应用场景时存在一定局限性。InfluxDB 作为专门为时序数据设计的开源数据库,不仅能高效存储和处理数据,还拥有丰富的查询语言(Flux)和完善的生态系统,可以满足物联网应用中海量数据写入、实时查询和分析的需求。本文将带您了解 InfluxDB 的基本架构、优势,并通过 Spring Boot 整合 InfluxDB2.x 的示例来展示如何在实际项目中使用它。

InfluxDB 介绍

InfluxDB 是一款开源的时序数据库,专门设计用于高性能地存储和查询大规模的时序数据,如监控数据、传感器数据、事件日志等。它内置强大的时序数据压缩和聚合功能,支持数据保留策略(Retention Policies)、连续查询(Continuous Queries)以及灵活的查询语言 Flux。对于数据写入频繁且需要实时分析和展示的场景,InfluxDB 是一个理想的选择。

InfluxDB 的主要优势

  1. 高写入性能

    InfluxDB 采用多线程写入和内存缓存机制,能够高效处理大规模数据写入,特别适合传感器数据、监控指标等高速数据流。

  2. 时序数据优化设计

    独特的数据存储结构和压缩算法,使得数据存储空间大幅度减少,同时在执行时间范围查询、聚合分析时性能表现优异。

  3. 灵活的查询语言 Flux

    Flux 语法丰富,支持数据转换、聚合、过滤、窗口化操作,为用户提供了更高的灵活度以及更直观的数据分析体验。

  4. 完善的生态系统

    InfluxDB 与多种开源工具和商业产品良好集成,例如 Grafana 用于可视化展示,可以无缝构建监控系统和物联网平台。

物联网背景下的数据管理需求

在物联网应用中,每秒钟都会产生大量的时序数据,这些数据来自传感器、设备日志、用户交互等多种数据源。这种数据具有如下特点:

  • 实时性强

    数据采集和处理需要低延迟,确保系统能够快速响应和决策。

  • 数据量大且连续增长

    设备和传感器常年在线,数据持续不断地产生,存储和检索性能要求高。

    物联网 SpringBoot整合InfluxDB 2.*
    (图片来源网络,侵删)
  • 高并发写入

    数以万计的设备同时发送数据,对于数据库写入速度和稳定性提出了更高要求。

    物联网 SpringBoot整合InfluxDB 2.*
    (图片来源网络,侵删)

    InfluxDB 的设计理念正好符合这些需求,通过其高效的写入能力和灵活的查询语言,可以轻松支持这些挑战。此外,通过数据保留、聚合、告警及实时分析功能,可以对设备状态、异常情况和趋势变化进行监控,从而为物联网平台构建出可靠、稳定的基础设施。

    InfluxDB2 的优势

    本文示例中使用的 InfluxDB2 版本,在 InfluxDB1 的基础上做了诸多改进和提升,为开发者提供了更多功能和更好的用户体验。以下将详细介绍 InfluxDB2 相比于 InfluxDB1 的优势:

    物联网 SpringBoot整合InfluxDB 2.*
    (图片来源网络,侵删)
    1. 统一的 API 与平台体验

      InfluxDB2 将写入、查询、监控和可视化等功能整合到一个统一的平台中,开发者无需再单独部署 Chronograf 或 Kapacitor 等组件。这种一体化设计大大简化了搭建和维护时序数据平台的复杂度,同时提供了统一而一致的用户操作体验。

    2. 增强的查询语言 Flux

      在 InfluxDB2 中,Flux 得到了更深度的集成和优化。Flux 除了提供数据查询外,还支持数据转换、聚合、跨数据源查询等高级操作,比传统的 InfluxQL 灵活性更高,更适合复杂业务场景下的数据分析需求。通过文章中的 FluxUtil 工具类,开发者可以更方便地拼接复杂查询语句。

    3. 改进的数据安全与授权管理

      InfluxDB2 在安全性和权限管理上也有显著改进,支持基于 Token 的认证机制、细粒度的权限控制以及用户管理。这使得在多租户环境下管理和隔离数据变得更加高效和安全,适合企业级应用的场景。

    4. 内置任务调度与告警功能

      InfluxDB2 内置了任务调度和自动化功能,支持定时查询和数据处理任务。同时,系统可根据查询结果触发告警,满足实时监控及自动响应的需求。这些特性减少了外部工具的依赖,让数据监控和自动化管理变得更简洁。

    5. 更友好的 Web UI 界面

      InfluxDB2 提供了功能强大的 Web UI,使得数据库管理、数据查询、监控仪表盘的搭建以及数据探索变得简单直观。开发者和运维人员无需通过命令行就可完成大部分操作,极大提高了工作效率。

    6. 全面支持云原生架构

      InfluxDB2 针对云原生应用场景进行优化,支持容器化部署、自动扩展与高可用性架构,更适合在微服务和分布式系统中作为时序数据存储和分析平台。对于物联网等数据高频写入、快速响应的场景,InfluxDB2 能够提供更稳定的支持。

     往期Influxdb1.*相关文章,如需请跳转。SpringBoot 整合 InfluxDB1.x 三种方式_springboot influxdb-CSDN博客

    Spring Boot 整合 InfluxDB 的示例解析

    本文接下来的示例代码展示了如何使用 Spring Boot 以及 InfluxDB Java 客户端来完成数据的写入和查询操作。主要内容如下:

    环境与依赖

    • JDK 版本:本文示例采用 JDK 17

    • InfluxDB 客户端:通过 com.influxdb:influxdb-client-java 依赖引入

    • 辅助工具库:使用 commons-io

      相关依赖配置在 Maven 配置文件中简单列出,读者在实际项目中需根据自己的需求调整版本号和配置参数。

          com.influxdb
          influxdb-client-java
          7.2.0
      
      
          commons-io
          commons-io
          2.18.0
      

      配置信息 

      influx:
        url: http://localhost:8086
        org: 
        token: 
        bucket: 
        connectTimeout: 10000
        readTimeout: 120000
        writeTimeout: 120000
      @Data
      @ConfigurationProperties(prefix = "influx")
      public class InfluxProperties {
          private String url;
          private String token;
          private String org;
          private String bucket;
          private int connectTimeout;
          private int readTimeout;
          private int writeTimeout;
      }
      /**
       * InfluxDB 客户端配置
       * @author https://blog.csdn.net/TCLms?spm=1011.2266.3001.5343
       * @Date: 2025-03-20 14:55.
       */
      @Slf4j
      @Configuration
      public class InfluxConfig implements DisposableBean {
          private final InfluxProperties properties;
          private volatile InfluxDBClient clientInstance;
          @Autowired
          public InfluxConfig(InfluxProperties properties) {
              this.properties = properties;
          }
          @Bean
          public InfluxDBClient influxDBClient() {
              validateConfiguration();
              if (clientInstance == null) {
                  clientInstance = createInfluxDBClient();
              }
              return clientInstance;
          }
          private InfluxDBClient createInfluxDBClient() {
              OkHttpClient customClient = new OkHttpClient.Builder()
                      .connectTimeout(properties.getConnectTimeout(), TimeUnit.MILLISECONDS) 
                      .readTimeout(properties.getReadTimeout(), TimeUnit.MILLISECONDS)    
                      .writeTimeout(properties.getWriteTimeout(), TimeUnit.MILLISECONDS)   
                      .build();
              InfluxDBClientOptions options = InfluxDBClientOptions.builder()
                      .url(properties.getUrl())
                      .authenticateToken(properties.getToken().toCharArray())
                      .org(properties.getOrg())
                      .bucket(properties.getBucket())
                      .okHttpClient(customClient.newBuilder()) 
                      .build();
              return InfluxDBClientFactory.create(options);
          }
          private void validateConfiguration() {
              if (!StringUtils.hasText(properties.getUrl())) {
                  throw new IllegalArgumentException("InfluxDB URL 未配置");
              }
              if (!StringUtils.hasText(properties.getToken())) {
                  throw new IllegalArgumentException("InfluxDB Token 未配置");
              }
              if (!StringUtils.hasText(properties.getOrg())) {
                  throw new IllegalArgumentException("InfluxDB Org 未配置");
              }
              if (!StringUtils.hasText(properties.getBucket())) {
                  throw new IllegalArgumentException("InfluxDB Bucket 未配置");
              }
          }
          @Override
          public void destroy() throws Exception {
              if (clientInstance != null) {
                  clientInstance.close();
                  log.info("InfluxDB 客户端已安全关闭");
              }
          }
      }

       Flux常用语法工具类

      /**
       * InfluxDB语句工具类,用于生成Flux查询语句
       * @author https://blog.csdn.net/TCLms?spm=1011.2266.3001.5343
       * @Date: 2025-03-20 14:03.
       */
      public class FluxUtil {
          public static String getTableName(Class clazz) {
              Measurement measurement = clazz.getAnnotation(Measurement.class);
              if (measurement != null) {
                  return measurement.name();
              }
              return null;
          }
          public static void appendCommonFlux(StringBuffer buffer, String bucketName, String tableName,
                                              String start, String stop) {
              appendBucketFlux(buffer, bucketName);
              appendTimeRangeFlux(buffer, start, stop);
              appendTableFlux(buffer, tableName);
          }
          public static void appendBucketFlux(StringBuffer buffer, String bucketName) {
              buffer.append("from(bucket: \"" + bucketName + "\") ");
          }
          public static void appendTableFlux(StringBuffer buffer, String tableName) {
              buffer.append("|> filter(fn: (r) => r._measurement == \"" + tableName + "\") ");
          }
          public static void appendContainsFlux(StringBuffer buffer, String fieldName, List list) {
              if (list != null && !list.isEmpty()) {
                  buffer.append("|> filter(fn: (r) => contains(value: r[\"" + fieldName + "\"], set: [");
                  appendListAsFluxArray(buffer,list);
                  buffer.append("])) ");
              }
          }
          public static void appendTagField(StringBuffer buffer, String field) {
              buffer.append("|> filter(fn: (r) => r._field == \"" + field + "\") ");
          }
          public static void appendTimeRangeFlux(StringBuffer buffer, String start, String stop) {
              if (StringUtils.isBlank(start)) {
                  start = "1970-01-01T00:00:00.000Z";
              }
              if (StringUtils.isBlank(stop)) {
                  buffer.append("|> range(start:" + start + ") ");
              } else {
                  buffer.append("|> range(start:" + start + ", stop:" + stop + ") ");
              }
          }
          public static void appendTimeRangeLastFlux(StringBuffer buffer, int time, String unit) {
              buffer.append("|> range(start: -").append(time).append(unit).append(" )");
          }
          public static void appendDropFlux(StringBuffer buffer, String... args) {
              if (args.length == 0) {
                  buffer.append("|> drop(columns: [\"host\"]) ");
                  return;
              }
              buffer.append("|> drop(columns: [");
              for (int i = 0; i  keep(columns: [");
              for (int i = 0; i  duplicate(column: \"" + oldField + "\", as: \"" + newField + "\") ");
          }
          public static void appendRenameFlux(StringBuffer buffer, String oldField, String newField) {
              buffer.append(" |> rename(columns: {" + oldField + ": \"" + newField + "\"}) ");
          }
          public static void appendFirstFlux(StringBuffer buffer) {
              buffer.append("|> first() ");
          }
          public static void appendLastFlux(StringBuffer buffer) {
              buffer.append("|> last() ");
          }
          public static void appendLimitFlux(StringBuffer buffer, int n, int offset) {
              buffer.append("|> limit(n:" + n + ", offset: " + offset + ") ");
          }
          public static void appendGroupFlux(StringBuffer buffer, String... columns) {
              if (columns.length == 0) {
                  buffer.append("|> group() ");
              } else {
                  buffer.append("|> group(columns:[ ");
                  for (int i = 0; i  distinct() ");
              } else {
                  buffer.append("|> distinct(column:\"" + columns[0] + "\") ");
              }
          }
          public static void appendCountFlux(StringBuffer buffer) {
              buffer.append("|> count() ");
          }
          public static void appendCountFlux(StringBuffer buffer, String fieldName) {
              buffer.append("|> count(column: \"").append(fieldName).append("\") ");
          }
          public static void appendTopFlux(StringBuffer buffer, int n) {
              buffer.append("|> top(n:" + n + ") ");
          }
          public static void appendBottomFlux(StringBuffer buffer, int n) {
              buffer.append("|> bottom(n:" + n + ") ");
          }
          public static void appendSortFlux(StringBuffer buffer, boolean descFlag, String... columns) {
              if (columns.length == 0) {
                  buffer.append("|> sort(columns: [\"_value\"], desc: " + descFlag + ")");
              } else {
                  buffer.append("|> sort(columns:[ ");
                  for (int i = 0; i  timeShift(duration: 8h) ");
          }
          public static void appendFilterFlux(StringBuffer buffer, List list, String operator, String join, String fieldName) {
              if (list == null || list.size() == 0) {
                  return;
              }
              for (int i = 0, size = list.size(); i  filter(fn: (r) =>");
                  } else {
                      buffer.append(join);
                  }
                  buffer.append(" r." + fieldName + " " + operator + " \"" + list.get(i) + "\" ");
              }
              buffer.append(")  ");
          }
          public static void appendFilterFlux(StringBuffer buffer, Map map, String operator, String join) {
              Set entrySet = map.entrySet();
              Iterator iterator = entrySet.iterator();
              boolean flag = true;
              while (iterator.hasNext()) {
                  Entry next = iterator.next();
                  String key = next.getKey();
                  Object value = next.getValue();
                  if (flag) {
                      buffer.append("|> filter(fn: (r) =>");
                      flag = false;
                  } else {
                      buffer.append(join);
                  }
                  buffer.append(" r." + key + " " + operator + " \"" + value + "\" ");
              }
              if (!flag) {
                  buffer.append(")  ");
              }
          }
          public static void appendMulFilterFlux(StringBuffer buffer, List list, String innerJoin, String operator, String outerJoin) {
              if (list == null || list.size() == 0) {
                  return;
              }
              buffer.append("|> filter(fn: (r) => ");
              boolean outerFlag = true;
              for (int i = 0; i  aggregateWindow(every: " + step + ", fn: " + aggType + ", createEmpty:"+createEmpty+") ");
          }
          public static void appendWindowFlux(StringBuffer buffer, String step) {
              buffer.append("|> window(every: " + step + ") ");
          }
          public static void appendAggregateFlux(StringBuffer buffer, String aggType) {
              buffer.append("|> " + aggType + "() ");
          }
          public static void appendYieldFlux(StringBuffer buffer, String name) {
              buffer.append("|> yield(name: \"" + name + "\") ");
          }
          public static void appendTruncateTimeColumn(StringBuffer buffer, String step) {
              buffer.append("|> truncateTimeColumn(unit: " + step + ") ");
          }
          public static void appendImportFlux(StringBuffer buffer, String name) {
              buffer.append("import \"" + name + "\" ");
          }
          public static void appendExistsFlux(StringBuffer buffer) {
              buffer.append("|> filter(fn: (r) => exists r._value ) ");
          }
          public static void appendZeroFlux(StringBuffer buffer) {
              buffer.append("|> filter(fn: (r) => r._value > 0) ");
          }
          public static void appendPivotFlux(StringBuffer buffer) {
              buffer.append("|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")");
          }
          public static void appendPivotFlux(StringBuffer buffer, List rowKeys, List columnKeys, String valueColumn) {
              buffer.append("|> pivot(rowKey: [");
              appendListAsFluxArray(buffer, rowKeys);
              buffer.append("], columnKey: [");
              appendListAsFluxArray(buffer, columnKeys);
              buffer.append("], valueColumn: \"" + valueColumn + "\") ");
          }
          private static void appendListAsFluxArray(StringBuffer buffer, List list) {
              for (int i = 0; i  map(fn: (r) => { r[\"").append(fieldName).append("\"] = ")
                      .append(functionName).append("(r[\"").append(fieldName).append("\"]); return r; }) ");
          }
      }
      

       使用示例

      @Data
      @Measurement(name = "student")
      public class Student {
          @Column(tag = true)
          String id;
          @Column()
          String name;
          @Column()
          int age;
          @Column(timestamp = true)
          Instant time;
      }
      /**
       * 测试
       * @author https://blog.csdn.net/TCLms?spm=1011.2266.3001.5343
       * @Date: 2024-03-12 14:31.
       */
      @Service
      public class StudentService {
          @Autowired
          InfluxProperties influxProperties;
          @Autowired
          InfluxDBClient influxDBClient;
          public void addStudent(){
              try {
                  Student stu=new Student();
                  stu.setName("李四");
                  stu.setAge(22);
                  stu.setId("11002");
                  stu.setTime(Instant.now());
                  WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
                  writeApi.writeMeasurement(influxProperties.getBucket(), influxProperties.getOrg(), WritePrecision.NS, stu);
                  System.out.println("新增成功");
              } catch (InfluxException e) {
                  System.err.println("error:" + e.getMessage());
              }
          }
          public List queryStudents() {
              try {
                  StringBuffer query=new StringBuffer();
                  FluxUtil.appendBucketFlux(query,influxProperties.getBucket());
                  FluxUtil.appendTimeRangeLastFlux(query,10,"h");
                  FluxUtil.appendTableFlux(query, FluxUtil.getTableName(Student.class));
                  StringBuffer querycount=new StringBuffer();
                  querycount.append(query.toString());
                  FluxUtil.appendPivotFlux(query);
                  FluxUtil.appendGroupFlux(query);
                  FluxUtil.appendLimitFlux(query,2,0);
                  FluxUtil.appendTagField(querycount, "age");
                  System.out.println(query.toString());
                  List tables = influxDBClient.getQueryApi().query(query.toString(), influxProperties.getOrg(),Student.class);
                  for (Student table : tables) {
                          System.out.println(table.toString());
                  }
              } catch (InfluxException e) {
                  System.err.println("error:" + e.getMessage());
              }
              return null;
          }
      }

      总结

      通过上述对 InfluxDB2 的详细介绍,可以看出其在操作便捷性、查询能力和安全管理等方面远优于 InfluxDB1。结合文章中的 Spring Boot 整合示例,开发者不仅可以快速上手,并且能够借助 InfluxDB2 的先进特性构建更加智能、稳定和高效的物联网数据处理系统。

      欢迎大家继续在评论区分享使用 InfluxDB2 的经验、讨论遇到的问题或提出新的功能需求。您的反馈将帮助我们进一步完善后续文章内容,共同推进时序数据管理技术的应用和发展。

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码