物联网 SpringBoot整合InfluxDB 2.*
前言
随着物联网(IoT)和大数据时代的到来,传统关系型数据库在处理时序数据、传感器数据等应用场景时存在一定局限性。InfluxDB 作为专门为时序数据设计的开源数据库,不仅能高效存储和处理数据,还拥有丰富的查询语言(Flux)和完善的生态系统,可以满足物联网应用中海量数据写入、实时查询和分析的需求。本文将带您了解 InfluxDB 的基本架构、优势,并通过 Spring Boot 整合 InfluxDB2.x 的示例来展示如何在实际项目中使用它。
InfluxDB 介绍
InfluxDB 是一款开源的时序数据库,专门设计用于高性能地存储和查询大规模的时序数据,如监控数据、传感器数据、事件日志等。它内置强大的时序数据压缩和聚合功能,支持数据保留策略(Retention Policies)、连续查询(Continuous Queries)以及灵活的查询语言 Flux。对于数据写入频繁且需要实时分析和展示的场景,InfluxDB 是一个理想的选择。
InfluxDB 的主要优势
-
高写入性能
InfluxDB 采用多线程写入和内存缓存机制,能够高效处理大规模数据写入,特别适合传感器数据、监控指标等高速数据流。
-
时序数据优化设计
独特的数据存储结构和压缩算法,使得数据存储空间大幅度减少,同时在执行时间范围查询、聚合分析时性能表现优异。
-
灵活的查询语言 Flux
Flux 语法丰富,支持数据转换、聚合、过滤、窗口化操作,为用户提供了更高的灵活度以及更直观的数据分析体验。
-
完善的生态系统
InfluxDB 与多种开源工具和商业产品良好集成,例如 Grafana 用于可视化展示,可以无缝构建监控系统和物联网平台。
物联网背景下的数据管理需求
在物联网应用中,每秒钟都会产生大量的时序数据,这些数据来自传感器、设备日志、用户交互等多种数据源。这种数据具有如下特点:
-
实时性强
数据采集和处理需要低延迟,确保系统能够快速响应和决策。
-
数据量大且连续增长
设备和传感器常年在线,数据持续不断地产生,存储和检索性能要求高。
(图片来源网络,侵删) -
高并发写入
数以万计的设备同时发送数据,对于数据库写入速度和稳定性提出了更高要求。
(图片来源网络,侵删)InfluxDB 的设计理念正好符合这些需求,通过其高效的写入能力和灵活的查询语言,可以轻松支持这些挑战。此外,通过数据保留、聚合、告警及实时分析功能,可以对设备状态、异常情况和趋势变化进行监控,从而为物联网平台构建出可靠、稳定的基础设施。
InfluxDB2 的优势
本文示例中使用的 InfluxDB2 版本,在 InfluxDB1 的基础上做了诸多改进和提升,为开发者提供了更多功能和更好的用户体验。以下将详细介绍 InfluxDB2 相比于 InfluxDB1 的优势:
(图片来源网络,侵删)-
统一的 API 与平台体验
InfluxDB2 将写入、查询、监控和可视化等功能整合到一个统一的平台中,开发者无需再单独部署 Chronograf 或 Kapacitor 等组件。这种一体化设计大大简化了搭建和维护时序数据平台的复杂度,同时提供了统一而一致的用户操作体验。
-
增强的查询语言 Flux
在 InfluxDB2 中,Flux 得到了更深度的集成和优化。Flux 除了提供数据查询外,还支持数据转换、聚合、跨数据源查询等高级操作,比传统的 InfluxQL 灵活性更高,更适合复杂业务场景下的数据分析需求。通过文章中的 FluxUtil 工具类,开发者可以更方便地拼接复杂查询语句。
-
改进的数据安全与授权管理
InfluxDB2 在安全性和权限管理上也有显著改进,支持基于 Token 的认证机制、细粒度的权限控制以及用户管理。这使得在多租户环境下管理和隔离数据变得更加高效和安全,适合企业级应用的场景。
-
内置任务调度与告警功能
InfluxDB2 内置了任务调度和自动化功能,支持定时查询和数据处理任务。同时,系统可根据查询结果触发告警,满足实时监控及自动响应的需求。这些特性减少了外部工具的依赖,让数据监控和自动化管理变得更简洁。
-
更友好的 Web UI 界面
InfluxDB2 提供了功能强大的 Web UI,使得数据库管理、数据查询、监控仪表盘的搭建以及数据探索变得简单直观。开发者和运维人员无需通过命令行就可完成大部分操作,极大提高了工作效率。
-
全面支持云原生架构
InfluxDB2 针对云原生应用场景进行优化,支持容器化部署、自动扩展与高可用性架构,更适合在微服务和分布式系统中作为时序数据存储和分析平台。对于物联网等数据高频写入、快速响应的场景,InfluxDB2 能够提供更稳定的支持。
往期Influxdb1.*相关文章,如需请跳转。SpringBoot 整合 InfluxDB1.x 三种方式_springboot influxdb-CSDN博客
Spring Boot 整合 InfluxDB 的示例解析
本文接下来的示例代码展示了如何使用 Spring Boot 以及 InfluxDB Java 客户端来完成数据的写入和查询操作。主要内容如下:
环境与依赖
-
JDK 版本:本文示例采用 JDK 17
-
InfluxDB 客户端:通过 com.influxdb:influxdb-client-java 依赖引入
-
辅助工具库:使用 commons-io
相关依赖配置在 Maven 配置文件中简单列出,读者在实际项目中需根据自己的需求调整版本号和配置参数。
com.influxdb influxdb-client-java 7.2.0 commons-io commons-io 2.18.0
配置信息
influx: url: http://localhost:8086 org: token: bucket: connectTimeout: 10000 readTimeout: 120000 writeTimeout: 120000
@Data @ConfigurationProperties(prefix = "influx") public class InfluxProperties { private String url; private String token; private String org; private String bucket; private int connectTimeout; private int readTimeout; private int writeTimeout; }
/** * InfluxDB 客户端配置 * @author https://blog.csdn.net/TCLms?spm=1011.2266.3001.5343 * @Date: 2025-03-20 14:55. */ @Slf4j @Configuration public class InfluxConfig implements DisposableBean { private final InfluxProperties properties; private volatile InfluxDBClient clientInstance; @Autowired public InfluxConfig(InfluxProperties properties) { this.properties = properties; } @Bean public InfluxDBClient influxDBClient() { validateConfiguration(); if (clientInstance == null) { clientInstance = createInfluxDBClient(); } return clientInstance; } private InfluxDBClient createInfluxDBClient() { OkHttpClient customClient = new OkHttpClient.Builder() .connectTimeout(properties.getConnectTimeout(), TimeUnit.MILLISECONDS) .readTimeout(properties.getReadTimeout(), TimeUnit.MILLISECONDS) .writeTimeout(properties.getWriteTimeout(), TimeUnit.MILLISECONDS) .build(); InfluxDBClientOptions options = InfluxDBClientOptions.builder() .url(properties.getUrl()) .authenticateToken(properties.getToken().toCharArray()) .org(properties.getOrg()) .bucket(properties.getBucket()) .okHttpClient(customClient.newBuilder()) .build(); return InfluxDBClientFactory.create(options); } private void validateConfiguration() { if (!StringUtils.hasText(properties.getUrl())) { throw new IllegalArgumentException("InfluxDB URL 未配置"); } if (!StringUtils.hasText(properties.getToken())) { throw new IllegalArgumentException("InfluxDB Token 未配置"); } if (!StringUtils.hasText(properties.getOrg())) { throw new IllegalArgumentException("InfluxDB Org 未配置"); } if (!StringUtils.hasText(properties.getBucket())) { throw new IllegalArgumentException("InfluxDB Bucket 未配置"); } } @Override public void destroy() throws Exception { if (clientInstance != null) { clientInstance.close(); log.info("InfluxDB 客户端已安全关闭"); } } }
Flux常用语法工具类
/** * InfluxDB语句工具类,用于生成Flux查询语句 * @author https://blog.csdn.net/TCLms?spm=1011.2266.3001.5343 * @Date: 2025-03-20 14:03. */ public class FluxUtil { public static String getTableName(Class clazz) { Measurement measurement = clazz.getAnnotation(Measurement.class); if (measurement != null) { return measurement.name(); } return null; } public static void appendCommonFlux(StringBuffer buffer, String bucketName, String tableName, String start, String stop) { appendBucketFlux(buffer, bucketName); appendTimeRangeFlux(buffer, start, stop); appendTableFlux(buffer, tableName); } public static void appendBucketFlux(StringBuffer buffer, String bucketName) { buffer.append("from(bucket: \"" + bucketName + "\") "); } public static void appendTableFlux(StringBuffer buffer, String tableName) { buffer.append("|> filter(fn: (r) => r._measurement == \"" + tableName + "\") "); } public static void appendContainsFlux(StringBuffer buffer, String fieldName, List list) { if (list != null && !list.isEmpty()) { buffer.append("|> filter(fn: (r) => contains(value: r[\"" + fieldName + "\"], set: ["); appendListAsFluxArray(buffer,list); buffer.append("])) "); } } public static void appendTagField(StringBuffer buffer, String field) { buffer.append("|> filter(fn: (r) => r._field == \"" + field + "\") "); } public static void appendTimeRangeFlux(StringBuffer buffer, String start, String stop) { if (StringUtils.isBlank(start)) { start = "1970-01-01T00:00:00.000Z"; } if (StringUtils.isBlank(stop)) { buffer.append("|> range(start:" + start + ") "); } else { buffer.append("|> range(start:" + start + ", stop:" + stop + ") "); } } public static void appendTimeRangeLastFlux(StringBuffer buffer, int time, String unit) { buffer.append("|> range(start: -").append(time).append(unit).append(" )"); } public static void appendDropFlux(StringBuffer buffer, String... args) { if (args.length == 0) { buffer.append("|> drop(columns: [\"host\"]) "); return; } buffer.append("|> drop(columns: ["); for (int i = 0; i keep(columns: ["); for (int i = 0; i duplicate(column: \"" + oldField + "\", as: \"" + newField + "\") "); } public static void appendRenameFlux(StringBuffer buffer, String oldField, String newField) { buffer.append(" |> rename(columns: {" + oldField + ": \"" + newField + "\"}) "); } public static void appendFirstFlux(StringBuffer buffer) { buffer.append("|> first() "); } public static void appendLastFlux(StringBuffer buffer) { buffer.append("|> last() "); } public static void appendLimitFlux(StringBuffer buffer, int n, int offset) { buffer.append("|> limit(n:" + n + ", offset: " + offset + ") "); } public static void appendGroupFlux(StringBuffer buffer, String... columns) { if (columns.length == 0) { buffer.append("|> group() "); } else { buffer.append("|> group(columns:[ "); for (int i = 0; i distinct() "); } else { buffer.append("|> distinct(column:\"" + columns[0] + "\") "); } } public static void appendCountFlux(StringBuffer buffer) { buffer.append("|> count() "); } public static void appendCountFlux(StringBuffer buffer, String fieldName) { buffer.append("|> count(column: \"").append(fieldName).append("\") "); } public static void appendTopFlux(StringBuffer buffer, int n) { buffer.append("|> top(n:" + n + ") "); } public static void appendBottomFlux(StringBuffer buffer, int n) { buffer.append("|> bottom(n:" + n + ") "); } public static void appendSortFlux(StringBuffer buffer, boolean descFlag, String... columns) { if (columns.length == 0) { buffer.append("|> sort(columns: [\"_value\"], desc: " + descFlag + ")"); } else { buffer.append("|> sort(columns:[ "); for (int i = 0; i timeShift(duration: 8h) "); } public static void appendFilterFlux(StringBuffer buffer, List list, String operator, String join, String fieldName) { if (list == null || list.size() == 0) { return; } for (int i = 0, size = list.size(); i filter(fn: (r) =>"); } else { buffer.append(join); } buffer.append(" r." + fieldName + " " + operator + " \"" + list.get(i) + "\" "); } buffer.append(") "); } public static void appendFilterFlux(StringBuffer buffer, Map map, String operator, String join) { Set entrySet = map.entrySet(); Iterator iterator = entrySet.iterator(); boolean flag = true; while (iterator.hasNext()) { Entry next = iterator.next(); String key = next.getKey(); Object value = next.getValue(); if (flag) { buffer.append("|> filter(fn: (r) =>"); flag = false; } else { buffer.append(join); } buffer.append(" r." + key + " " + operator + " \"" + value + "\" "); } if (!flag) { buffer.append(") "); } } public static void appendMulFilterFlux(StringBuffer buffer, List list, String innerJoin, String operator, String outerJoin) { if (list == null || list.size() == 0) { return; } buffer.append("|> filter(fn: (r) => "); boolean outerFlag = true; for (int i = 0; i aggregateWindow(every: " + step + ", fn: " + aggType + ", createEmpty:"+createEmpty+") "); } public static void appendWindowFlux(StringBuffer buffer, String step) { buffer.append("|> window(every: " + step + ") "); } public static void appendAggregateFlux(StringBuffer buffer, String aggType) { buffer.append("|> " + aggType + "() "); } public static void appendYieldFlux(StringBuffer buffer, String name) { buffer.append("|> yield(name: \"" + name + "\") "); } public static void appendTruncateTimeColumn(StringBuffer buffer, String step) { buffer.append("|> truncateTimeColumn(unit: " + step + ") "); } public static void appendImportFlux(StringBuffer buffer, String name) { buffer.append("import \"" + name + "\" "); } public static void appendExistsFlux(StringBuffer buffer) { buffer.append("|> filter(fn: (r) => exists r._value ) "); } public static void appendZeroFlux(StringBuffer buffer) { buffer.append("|> filter(fn: (r) => r._value > 0) "); } public static void appendPivotFlux(StringBuffer buffer) { buffer.append("|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"); } public static void appendPivotFlux(StringBuffer buffer, List rowKeys, List columnKeys, String valueColumn) { buffer.append("|> pivot(rowKey: ["); appendListAsFluxArray(buffer, rowKeys); buffer.append("], columnKey: ["); appendListAsFluxArray(buffer, columnKeys); buffer.append("], valueColumn: \"" + valueColumn + "\") "); } private static void appendListAsFluxArray(StringBuffer buffer, List list) { for (int i = 0; i map(fn: (r) => { r[\"").append(fieldName).append("\"] = ") .append(functionName).append("(r[\"").append(fieldName).append("\"]); return r; }) "); } }
使用示例
@Data @Measurement(name = "student") public class Student { @Column(tag = true) String id; @Column() String name; @Column() int age; @Column(timestamp = true) Instant time; }
/** * 测试 * @author https://blog.csdn.net/TCLms?spm=1011.2266.3001.5343 * @Date: 2024-03-12 14:31. */ @Service public class StudentService { @Autowired InfluxProperties influxProperties; @Autowired InfluxDBClient influxDBClient; public void addStudent(){ try { Student stu=new Student(); stu.setName("李四"); stu.setAge(22); stu.setId("11002"); stu.setTime(Instant.now()); WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking(); writeApi.writeMeasurement(influxProperties.getBucket(), influxProperties.getOrg(), WritePrecision.NS, stu); System.out.println("新增成功"); } catch (InfluxException e) { System.err.println("error:" + e.getMessage()); } } public List queryStudents() { try { StringBuffer query=new StringBuffer(); FluxUtil.appendBucketFlux(query,influxProperties.getBucket()); FluxUtil.appendTimeRangeLastFlux(query,10,"h"); FluxUtil.appendTableFlux(query, FluxUtil.getTableName(Student.class)); StringBuffer querycount=new StringBuffer(); querycount.append(query.toString()); FluxUtil.appendPivotFlux(query); FluxUtil.appendGroupFlux(query); FluxUtil.appendLimitFlux(query,2,0); FluxUtil.appendTagField(querycount, "age"); System.out.println(query.toString()); List tables = influxDBClient.getQueryApi().query(query.toString(), influxProperties.getOrg(),Student.class); for (Student table : tables) { System.out.println(table.toString()); } } catch (InfluxException e) { System.err.println("error:" + e.getMessage()); } return null; } }
总结
通过上述对 InfluxDB2 的详细介绍,可以看出其在操作便捷性、查询能力和安全管理等方面远优于 InfluxDB1。结合文章中的 Spring Boot 整合示例,开发者不仅可以快速上手,并且能够借助 InfluxDB2 的先进特性构建更加智能、稳定和高效的物联网数据处理系统。
欢迎大家继续在评论区分享使用 InfluxDB2 的经验、讨论遇到的问题或提出新的功能需求。您的反馈将帮助我们进一步完善后续文章内容,共同推进时序数据管理技术的应用和发展。
-