Spring Boot集成IoTDB实现时序数据管理
在物联网(IoT)和工业互联网领域,时序数据的高效管理和分析是至关重要的。Apache IoTDB 是一个高性能的时序数据库,专为处理大规模时序数据而设计。本文将介绍如何在 Spring Boot 应用程序中采用IotDB-Session的方式集成 IoTDB,实现数据的插入、查询和聚合等功能。
1. 项目背景
随着物联网设备的普及,数据量呈爆发式增长。这些数据通常是时间序列数据,具有高吞吐量、高并发和快速写入的特点。传统的关系型数据库在处理这类数据时往往力不从心,而时序数据库(如 IoTDB)则能够更好地满足这些需求。
IoTDB 是一个开源的时序数据库,支持高效的数据插入和查询,特别适合处理设备传感器数据、工业数据等。通过 Spring Boot 集成 IoTDB,我们可以快速构建一个高性能的时序数据管理系统。
2. 环境准备
在开始之前,确保你已经安装了以下环境:
-
Java Development Kit (JDK):推荐使用 JDK 1.8 或更高版本。
-
Maven:用于项目构建和依赖管理。
-
IoTDB:下载并安装 IoTDB,并确保其服务已启动。
-
Spring Boot:用于构建应用程序。
3. 项目依赖配置
在 pom.xml 中,添加 IoTDB 的 Java 客户端依赖以及其他必要的 Spring Boot 依赖:
org.apache.iotdb iotdb-session 1.3.0
4. 配置文件
在 application-dev.yml 中,配置 IoTDB 的连接信息:
spring: iotdb: username: root password: root hostUrl: - 127.0.0.1:6667 maxSize: 100
5. IoTDB 会话管理
创建 IotDbSessionPoolManager 类,用于管理 IoTDB 的会话连接池:
@Component @Slf4j @ConfigurationProperties(prefix = "spring.iotdb") @Setter public class IotDbSessionPoolManager { private String username; private String password; private List hostUrl; private int maxSize; private static SessionPool sessionPool; public SessionPool getSessionPool() { if (sessionPool == null) { sessionPool = new SessionPool(hostUrl, username, password, maxSize); } return sessionPool; } @PostConstruct public void init() { // 创建 SessionPool log.info("====SessionPool init===="); sessionPool = new SessionPool(hostUrl, username, password, maxSize); } @PreDestroy public void destroy() { // 关闭 SessionPool log.info("====SessionPool destroy===="); close(); } /** * 关闭连接池 */ public void close() { if (sessionPool != null) { sessionPool.close(); sessionPool = null; } } }
6. 服务接口与实现
定义 IotDBService 接口,提供各种数据操作方法,例如插入数据、查询实时数据、查询历史数据、聚合查询等:
public interface IotDBService { /** * 根据设备获取指标实时数据 * * @param db iotdb库名 * @param device iotdb设备路径 * @param points iotdb测点编码 * @return */ List realtime(String db, String device, List points); /** * 根据设备获取指标点码平均值 * * @param device iotdb设备路径 用点隔开 * @param points iotdb设备路径 用点隔开 * @param startTime 开始时间 秒级时间戳 * @param endTime 结束时间 秒级时间戳 * @return */ List avgValue(String device, List points, Long startTime, Long endTime); /** * 根据设备获取指标点码最大值和最小值 * * @param device iotdb设备路径 用点隔开 * @param points iotdb设备路径 用点隔开 * @param startTime 开始时间 秒级时间戳 * @param endTime 结束时间 秒级时间戳 * @return */ List statisticsValue(String device, List points, Long startTime, Long endTime); /** * 根据设备求和指标点码 * * @param device * @param points * @return */ List sumValue(String device, List points, Long startTime, Long endTime); /** * 获取点码历史数据 * * @param device * @param: points * @param: page * @param: pageSize * @param: startTime * @param: endTime */ HistoryValuesDto historyData(String device, List points, Integer page, Integer pageSize, Long startTime, Long endTime); /** * 根据时间窗口按照时间间隔获取点码历史数据 * * @param device * @param: points * @param: page * @param: pageSize * @param: startTime * @param: endTime * @param: windowSize 间隔窗口大小 */ HistoryValuesDto windowAggHistoryData(String device, List points, Integer page, Integer pageSize, Long startTime, Long endTime, Integer windowSize, String interval); /** * 插入数据到IoTDB * * @param device deviceId: String - 设备的唯一标识符。 * @param points pointArr: String[] - 要插入的数据点名称列表。 * @param types types: TSDataType[] - 各数据点对应的数据类型列表,支持的数据类型(BOOLEAN,INT32,INT64,FLOAT,DOUBLE,TEXT)。 * @param dataJson dataJson: String - 包含要插入的数据的JSON字符串。 * {"data":[{"timestamp":1617187200,"temperature":"111","speed":"1"},{"timestamp":1617187200,"temperature":"111","speed":"1"}]} * timestamp: Long (可选) - 数据点的时间戳,单位为毫秒。如果不传,则默认为当前时间。 */ int insertData(String device, List points, List types, String dataJson); /** * 多设备获取指标实时数据 * * @param deviceIds iotdb设备路径 * @param points iotdb测点编码 * @return */ List batchRealtime(List deviceIds, List points); /** * 多设备获取指标点码历史数据 * * @param deviceIds iotdb设备路径 * @param points iotdb测点编码 * @return */ BatchHistoryValuesDto batchHistoryData(List deviceIds, List points, Integer page, Integer pageSize, Long startTime, Long endTime); /** * 根据时间窗口按照时间间隔获取点码历史数据 * * @param deviceIds * @param: points * @param: page * @param: pageSize * @param: startTime * @param: endTime * @param: windowSize 间隔窗口大小 */ BatchHistoryValuesDto batchWindowAggHistoryData(List deviceIds, List points, Integer page, Integer pageSize, Long startTime, Long endTime, Integer windowSize, String interval); }
实现 IotDBServiceImpl 类,具体实现上述接口中的方法:
@Slf4j @Service public class IotDBServiceImpl implements IotDBService { @Autowired IotDbSessionPoolManager ioTDBSessionPoolManager; @Override public List realtime(String db, String device, List points) { //如果请求为空 直接返回null if (StringUtils.isEmpty(db) || StringUtils.isEmpty(device) || CollectionUtils.isEmpty(points)) { return null; } SessionPool session = null; SessionDataSetWrapper dataSetWrapper = null; try { session = ioTDBSessionPoolManager.getSessionPool(); dataSetWrapper = session.executeLastDataQueryForOneDevice(db, device, points, false); SessionDataSet resultSet = dataSetWrapper.getSessionDataSet(); List realtimeValueDtoList = new ArrayList(); while (resultSet.hasNext()) { RealtimeValueDto realtimeValueDto = RealtimeValueDto.builder().build(); RowRecord record = resultSet.next(); realtimeValueDto.setTs(record.getTimestamp()); List fields = record.getFields(); realtimeValueDto.setTimeSeries(fields.get(0).getStringValue()); realtimeValueDto.setVal(fields.get(1).getStringValue()); realtimeValueDtoList.add(realtimeValueDto); } return realtimeValueDtoList; } catch (Throwable ex) { log.error("realtime error", ex); throw new RuntimeException(ex); } finally { // 释放资源 if (session != null && dataSetWrapper != null) { session.closeResultSet(dataSetWrapper); } } } @Override public List avgValue(String device, List points, Long startTime, Long endTime) { //如果请求为空 直接返回null if (StringUtils.isEmpty(device) || CollectionUtils.isEmpty(points)) { return null; } // 构建查询语句 String sql = buildAvgValueQuery(device, points, startTime, endTime); log.info("avgValue sql::{}", sql); SessionPool session = null; SessionDataSetWrapper dataSetWrapper = null; // 执行查询 try { session = ioTDBSessionPoolManager.getSessionPool(); dataSetWrapper = session.executeQueryStatement(sql); SessionDataSet resultSet = dataSetWrapper.getSessionDataSet(); List avgValueList = new ArrayList(); while (resultSet.hasNext()) { RowRecord record = resultSet.next(); List fields = record.getFields(); for (int i = 0; i { HistoryDataDto newHistoryData = new HistoryDataDto(); newHistoryData.setPoint(k); newHistoryData.setValues(new ArrayList()); return newHistoryData; }); // 创建HistoryValue对象并添加到HistoryData的values列表中 HistoryValueDto historyValueDto = new HistoryValueDto(); historyValueDto.setTimestamp(timestamp); historyValueDto.setVal(val); historyData.getValues().add(historyValueDto); } } // 将Map中的所有HistoryData对象添加到列表中 historyDataList.addAll(historyDataMap.values()); historyValuesDto.setHistoryData(historyDataList); historyValuesDto.setTotal((int) totalRecords); historyValuesDto.setPageSize(pageSize); historyValuesDto.setPage(totalPage); historyValuesDto.setCurrent(page); return historyValuesDto; } catch (Throwable ex) { log.error("historyData error", ex); throw new RuntimeException(ex); } finally { // 释放资源 if (session != null && dataSetWrapper != null) { session.closeResultSet(dataSetWrapper); } } } @Override public HistoryValuesDto windowAggHistoryData(String device, List points, Integer page, Integer pageSize, Long startTime, Long endTime, Integer windowSize, String interval) { // 如果请求为空,直接返回null if (StringUtils.isEmpty(device) || CollectionUtils.isEmpty(points) || page == null || pageSize == null || windowSize == null || StringUtils.isEmpty(interval)) { return null; } // 执行计数查询 long totalRecords = getQueryResultCountForSingleDevices(device, points, startTime, endTime, windowSize, interval); if (totalRecords == 0) { // 如果没有记录,直接返回null return null; } // 将页码转换为基于0的索引 int offset = (page - 1) * pageSize; // 构建查询语句 String sql = buildWindowAggQuery(device, points, startTime, endTime, windowSize, interval, offset, pageSize); log.info("windowAggHistoryData sql::{}", sql); // 执行查询 SessionPool session = null; SessionDataSetWrapper dataSetWrapper = null; try { session = ioTDBSessionPoolManager.getSessionPool(); dataSetWrapper = session.executeQueryStatement(sql); SessionDataSet resultSet = dataSetWrapper.getSessionDataSet(); HistoryValuesDto historyValuesDto = new HistoryValuesDto(); List historyDataList = new ArrayList(); // 计算总页数 int totalPage = (int) Math.ceil((double) totalRecords / pageSize); log.info("totalPage::{},totalRecords::{},::pageSize{}", totalPage, totalRecords, pageSize); while (resultSet.hasNext()) { RowRecord record = resultSet.next(); // 获取时间戳 long timestamp = record.getTimestamp(); List fields = record.getFields(); // 遍历所有数据点 for (int i = 0; i h.getPoint().equals(point)) .findFirst() .orElseGet(() -> { HistoryDataDto newHistoryData = new HistoryDataDto(); newHistoryData.setPoint(point); newHistoryData.setValues(new ArrayList()); historyDataList.add(newHistoryData); return newHistoryData; }); // 创建HistoryValue对象并添加到HistoryData的values列表中 HistoryValueDto historyValueDto = new HistoryValueDto(); historyValueDto.setTimestamp(timestamp); historyValueDto.setVal(val); historyData.getValues().add(historyValueDto); } } historyValuesDto.setHistoryData(historyDataList); historyValuesDto.setTotal((int) totalRecords); historyValuesDto.setPageSize(pageSize); historyValuesDto.setPage(totalPage); historyValuesDto.setCurrent(page); return historyValuesDto; } catch (Throwable ex) { log.error("windowAggHistoryData error", ex); throw new RuntimeException(ex); } finally { // 释放资源 if (session != null && dataSetWrapper != null) { session.closeResultSet(dataSetWrapper); } } } /** * 插入数据到IoTDB * * @param deviceId 设备的唯一标识符 * @param pointArr 要插入的数据点名称列表 * @param types 各数据点对应的数据类型列表 * @param dataJson 包含要插入的数据的JSON字符串 * timestamp 数据点的时间戳,单位为毫秒。如果不传,则默认为当前时间。 */ @Override public int insertData(String deviceId, List pointArr, List types, String dataJson) { // log.info("dataJson===>{}", dataJson); List schemaList = new ArrayList(); int insertedRowCount = 0; // 用于跟踪成功插入的行数 try { SessionPool session = ioTDBSessionPoolManager.getSessionPool(); for (int i = 0; i 0) { Tablet tablet = new Tablet(deviceId, schemaList, dataArray.length()); tablet.rowSize = dataArray.length(); for (int i = 0; i { BatchRealtimeValueDto newValue = new BatchRealtimeValueDto(); newValue.setDeviceId(k); newValue.setRealtimeData(new ArrayList()); return newValue; }); RealtimeValueDto realtimeValueDto = RealtimeValueDto.builder().build(); realtimeValueDto.setTs(timestamp); realtimeValueDto.setTimeSeries(timeSeries); // 假设数据类型为 STRING,需要根据实际数据类型进行转换 realtimeValueDto.setVal(record.getFields().get(1).getStringValue()); batchRealtimeValue.getRealtimeData().add(realtimeValueDto); } } } // 将所有设备的最新数据添加到结果列表 batchRealtimeValues.addAll(deviceDataMap.values()); } catch (Exception ex) { log.error("batchRealtime error", ex); throw new RuntimeException(ex); } finally { // 释放资源 if (session != null && dataSetWrapper != null) { session.closeResultSet(dataSetWrapper); } } return batchRealtimeValues; } @Override public BatchHistoryValuesDto batchHistoryData(List deviceIds, List points, Integer page, Integer pageSize, Long startTime, Long endTime) { // 如果请求为空,直接返回null if (CollectionUtils.isEmpty(deviceIds) || CollectionUtils.isEmpty(points) || page == null || pageSize == null) { return null; } // 执行计数查询 long totalRecords = getQueryResultCountForMultipleDevices(deviceIds, points, startTime, endTime); if (totalRecords == 0) { // 如果没有记录,直接返回null return null; } // 将页码转换为基于0的索引 int offset = (page - 1) * pageSize; // 计算总页数和偏移量 int totalPage = (int) Math.ceil((double) totalRecords / pageSize); // 构建查询语句 String sql = buildHistoryDataQuery(deviceIds, points, startTime, endTime, offset, pageSize); log.info("batchHistoryData sql::{}", sql); SessionPool session = null; SessionDataSetWrapper dataSetWrapper = null; try { session = ioTDBSessionPoolManager.getSessionPool(); dataSetWrapper = session.executeQueryStatement(sql); SessionDataSet resultSet = dataSetWrapper.getSessionDataSet(); BatchHistoryValuesDto batchHistoryValuesDto = new BatchHistoryValuesDto(); List historyDataByDeviceList = new ArrayList(); Map deviceDataMap = new HashMap(); // 获取列名 List columnNames = resultSet.getColumnNames(); while (resultSet.hasNext()) { RowRecord record = resultSet.next(); long timestamp = record.getTimestamp(); // 遍历字段,从第一个字段开始(跳过时间字段) for (int i = 1; i { HistoryDataByDeviceDto newDeviceData = new HistoryDataByDeviceDto(); newDeviceData.setDeviceId(k); newDeviceData.setHistoryData(new ArrayList()); return newDeviceData; }); // 创建或获取 HistoryDataDto 对象 HistoryDataDto historyData = deviceData.getHistoryData().stream() .filter(h -> h.getPoint().equals(point)) .findFirst() .orElseGet(() -> { HistoryDataDto newHistoryData = new HistoryDataDto(); newHistoryData.setPoint(point); newHistoryData.setValues(new ArrayList()); deviceData.getHistoryData().add(newHistoryData); return newHistoryData; }); // 创建并添加 HistoryValueDto 对象 HistoryValueDto historyValueDto = new HistoryValueDto(); historyValueDto.setTimestamp(timestamp); historyValueDto.setVal(value); historyData.getValues().add(historyValueDto); } } historyDataByDeviceList.addAll(deviceDataMap.values()); batchHistoryValuesDto.setHistoryDataByDevice(historyDataByDeviceList); batchHistoryValuesDto.setTotal((int) totalRecords); batchHistoryValuesDto.setPage(totalPage); batchHistoryValuesDto.setCurrent(page); batchHistoryValuesDto.setPageSize(pageSize); return batchHistoryValuesDto; } catch (Throwable ex) { log.error("batchHistoryData error", ex); throw new RuntimeException("Error fetching batch history data", ex); } finally { // 释放资源 if (session != null && dataSetWrapper != null) { session.closeResultSet(dataSetWrapper); } } } @Override public BatchHistoryValuesDto batchWindowAggHistoryData(List deviceIds, List points, Integer page, Integer pageSize, Long startTime, Long endTime, Integer windowSize, String interval) { // 如果请求为空,直接返回null if (CollectionUtils.isEmpty(deviceIds) || CollectionUtils.isEmpty(points) || page == null || pageSize == null || windowSize == null || interval == null) { return null; } // 执行计数查询 long totalRecords = getQueryResultCountForMultipleDevices(deviceIds, points, startTime, endTime, windowSize, interval, pageSize, page); if (totalRecords == 0) { return null; } // 分页计算 int offset = (page - 1) * pageSize; int totalPage = (int) Math.ceil((double) totalRecords / pageSize); // 构建查询语句 String sql = buildBatchWindowAggQuery(deviceIds, points, startTime, endTime, windowSize, interval, offset, pageSize); log.info("batchWindowAggHistoryData sql::{}", sql); SessionPool session = null; SessionDataSetWrapper dataSetWrapper = null; try { session = ioTDBSessionPoolManager.getSessionPool(); dataSetWrapper = session.executeQueryStatement(sql); SessionDataSet resultSet = dataSetWrapper.getSessionDataSet(); BatchHistoryValuesDto batchHistoryValuesDto = new BatchHistoryValuesDto(); List historyDataByDeviceList = new ArrayList(); Map deviceDataMap = new HashMap(); // 获取列名 List columnNames = resultSet.getColumnNames(); while (resultSet.hasNext()) { RowRecord record = resultSet.next(); long timestamp = record.getTimestamp(); // 遍历字段,从第一个字段开始(跳过时间字段) for (int i = 1; i { HistoryDataByDeviceDto newDeviceData = new HistoryDataByDeviceDto(); newDeviceData.setDeviceId(k); newDeviceData.setHistoryData(new ArrayList()); return newDeviceData; }); // 创建或获取 HistoryDataDto 对象 HistoryDataDto historyData = deviceData.getHistoryData().stream() .filter(h -> h.getPoint().equals(point)) .findFirst() .orElseGet(() -> { HistoryDataDto newHistoryData = new HistoryDataDto(); newHistoryData.setPoint(point); newHistoryData.setValues(new ArrayList()); deviceData.getHistoryData().add(newHistoryData); return newHistoryData; }); // 创建并添加 HistoryValueDto 对象 HistoryValueDto historyValueDto = new HistoryValueDto(); historyValueDto.setTimestamp(timestamp); historyValueDto.setVal(value); historyData.getValues().add(historyValueDto); } } } historyDataByDeviceList.addAll(deviceDataMap.values()); batchHistoryValuesDto.setHistoryDataByDevice(historyDataByDeviceList); batchHistoryValuesDto.setTotal((int) totalRecords); batchHistoryValuesDto.setPage(totalPage); batchHistoryValuesDto.setCurrent(page); batchHistoryValuesDto.setPageSize(pageSize); return batchHistoryValuesDto; } catch (Throwable ex) { log.error("batchWindowAggHistoryData error", ex); throw new RuntimeException("Error fetching batch history data", ex); } finally { //释放资源 if (session != null && dataSetWrapper != null) { session.closeResultSet(dataSetWrapper); } } } public long getQueryResultCountForSingleDevices(String device, List points, Long startTime, Long endTime, Integer windowSize, String interval) { long count = 0; SessionPool session = null; SessionDataSetWrapper dataSetWrapper = null; try { session = ioTDBSessionPoolManager.getSessionPool(); // 构建查询语句 String sql = buildWindowAggQuery(device, points, startTime, endTime, windowSize, interval); log.info("getQueryResultCountForSingleDevices sql::{}", sql); // 执行查询 dataSetWrapper = session.executeQueryStatement(sql); SessionDataSet resultSet = dataSetWrapper.getSessionDataSet(); // 由于是聚合查询,获取行数的方法是简单地调用 next(),不需要记录具体的字段 while (resultSet.hasNext()) { resultSet.next(); count++; } } catch (Exception ex) { log.error("getQueryResultCountForSingleDevices error", ex); throw new RuntimeException(ex); } finally { // 释放资源 if (session != null && dataSetWrapper != null) { session.closeResultSet(dataSetWrapper); } } return count; } public long getQueryResultCountForMultipleDevices(List devices, List points, Long startTime, Long endTime, Integer windowSize, String interval, int offset, int limit) { long count = 0; SessionPool session = null; SessionDataSetWrapper dataSetWrapper = null; try { session = ioTDBSessionPoolManager.getSessionPool(); // 构建查询语句 String sql = buildBatchWindowAggQueryCount(devices, points, startTime, endTime, windowSize, interval); log.info("getQueryResultCountForMultipleDevices sql::{}", sql); // 执行查询 dataSetWrapper = session.executeQueryStatement(sql); SessionDataSet resultSet = dataSetWrapper.getSessionDataSet(); // 由于是聚合查询,获取行数的方法是简单地调用 next(),不需要记录具体的字段 while (resultSet.hasNext()) { resultSet.next(); count++; } } catch (Exception ex) { log.error("getQueryResultCountForMultipleDevices error", ex); throw new RuntimeException(ex); } finally { // 释放资源 if (session != null && dataSetWrapper != null) { session.closeResultSet(dataSetWrapper); } } return count; } private static String buildBatchWindowAggQueryCount(List devices, List points, Long startTime, Long endTime, Integer windowSize, String interval) { if (devices == null || devices.isEmpty() || points == null || points.isEmpty() || startTime == null || endTime == null || windowSize == null || interval == null) { throw new IllegalArgumentException("Invalid input parameters"); } StringBuilder queryBuilder = new StringBuilder(); queryBuilder.append("SELECT "); // 构建SELECT部分,多个测点的平均值 for (int i = 0; i = ").append(startTime).append(" AND time = ").append(startTime).append(" AND time = ").append(startTime).append(" AND time = ").append(startTime).append(" and time = " + startTime + " and time = " + startTime + " and time = " + startTime + " and time = " + startTime + " and time