influxdb产生实时数据

2024-08-31 03:32
文章标签 数据 实时 influxdb 产生

本文主要是介绍influxdb产生实时数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

开发工具:idea 简历maven工程

<dependency><groupId>org.influxdb</groupId><artifactId>influxdb-java</artifactId><version>2.17</version>
</dependency>
<dependency><groupId>com.github.javafaker</groupId><artifactId>javafaker</artifactId><version>1.0.2</version>
</dependency>

 java工程钟创立两个java文件,目录结构如下

influxDBConnection文件中添加如下代码

package com.influx;import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.*;import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;public class InfluxDBConnection {// 用户名private String username;// 密码private String password;// 连接地址private String openurl;// 数据库private String database;// 保留策略private String retentionPolicy;private InfluxDB influxDB;public InfluxDBConnection(String username, String password, String openurl, String database,String retentionPolicy) {this.username = username;this.password = password;this.openurl = openurl;this.database = database;this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy;influxDbBuild();}/*** 创建数据库** @param dbName*/@SuppressWarnings("deprecation")public void createDB(String dbName) {influxDB.createDatabase(dbName);}/*** 删除数据库** @param dbName*/@SuppressWarnings("deprecation")public void deleteDB(String dbName) {influxDB.deleteDatabase(dbName);}/*** 测试连接是否正常** @return true 正常*/public boolean ping() {boolean isConnected = false;Pong pong;try {pong = influxDB.ping();if (pong != null) {isConnected = true;}} catch (Exception e) {e.printStackTrace();}return isConnected;}/*** 连接时序数据库 ,若不存在则创建** @return*/public InfluxDB influxDbBuild() {if (influxDB == null) {influxDB = InfluxDBFactory.connect(openurl, username, password);}try {// if (!influxDB.databaseExists(database)) {// influxDB.createDatabase(database);// }} catch (Exception e) {// 该数据库可能设置动态代理,不支持创建数据库// e.printStackTrace();} finally {influxDB.setRetentionPolicy(retentionPolicy);}influxDB.setLogLevel(InfluxDB.LogLevel.NONE);return influxDB;}/*** 创建自定义保留策略** @param policyName*            策略名* @param duration*            保存天数* @param replication*            保存副本数量* @param isDefault*            是否设为默认保留策略*/public void createRetentionPolicy(String policyName, String duration, int replication, Boolean isDefault) {String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s ", policyName,database, duration, replication);if (isDefault) {sql = sql + " DEFAULT";}this.query(sql);}/*** 创建默认的保留策略** @param :default,保存天数:30天,保存副本数量:1*            设为默认保留策略*/public void createDefaultRetentionPolicy() {String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT","default", database, "30d", 1);this.query(command);}/*** 查询** @param command*            查询语句* @return*/public QueryResult query(String command) {return influxDB.query(new Query(command, database));}/*** 插入** @param measurement*            表* @param tags*            标签* @param fields*            字段*/public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time,TimeUnit timeUnit) {Point.Builder builder = Point.measurement(measurement);builder.tag(tags);builder.fields(fields);if (0 != time) {builder.time(time, timeUnit);}influxDB.write(database, retentionPolicy, builder.build());}/*** 批量写入测点** @param batchPoints*/public void batchInsert(BatchPoints batchPoints) {influxDB.write(batchPoints);// influxDB.enableGzip();// influxDB.enableBatch(2000,100,TimeUnit.MILLISECONDS);// influxDB.disableGzip();// influxDB.disableBatch();}/*** 批量写入数据** @param database*            数据库* @param retentionPolicy*            保存策略* @param consistency*            一致性* @param records*            要保存的数据(调用BatchPoints.lineProtocol()可得到一条record)*/public void batchInsert(final String database, final String retentionPolicy, final InfluxDB.ConsistencyLevel consistency,final List<String> records) {influxDB.write(database, retentionPolicy, consistency, records);}/*** 删除** @param command*            删除语句* @return 返回错误信息*/public String deleteMeasurementData(String command) {QueryResult result = influxDB.query(new Query(command, database));return result.getError();}/*** 关闭数据库*/public void close() {influxDB.close();}/*** 构建Point** @param measurement* @param time* @param fields* @return*/public Point pointBuilder(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) {Point point = Point.measurement(measurement).time(time, TimeUnit.MILLISECONDS).tag(tags).fields(fields).build();return point;}
}

 InfluxDBManager中编写数据写入代码

package com.influx;import com.github.javafaker.Faker;
import com.mysql.cj.x.protobuf.MysqlxDatatypes;
import org.influxdb.InfluxDB;
import org.influxdb.dto.QueryResult;import java.util.HashMap;
import java.util.Locale;
import java.util.concurrent.TimeUnit;public class InfluxDBManager {public static void main(String[] args) throws InterruptedException {//这一行代码数据库名,表名,ip地址,还有持久策略要根据每个人实际情况填写InfluxDBConnection influxDBConnection = new InfluxDBConnection("influx", "influx", "http://192.168.224.129:8086", "influxdb", "influx_retention");Faker faker = new Faker(new Locale("zh-CN"));String[] areas = {"南","北","东","西"};Integer[] altitudes = new Integer[]{500,800,1000,1500};int k = 1;while(true){System.out.println("当前是第轮插入数据:"+ k);int al_intdex = (int)Math.floor(Math.random() * altitudes.length);int ar_index = (int)Math.floor(Math.random() * areas.length);HashMap<String, String> hashMap = new HashMap<String, String>();hashMap.put("altitude", altitudes[al_intdex].toString());hashMap.put("area", areas[ar_index].toString());HashMap<String, Object> stringObjectHashMap = new HashMap<String, Object>();stringObjectHashMap.put("temperature", faker.number().numberBetween(10,30));stringObjectHashMap.put("humidity", faker.number().numberBetween(10,30));influxDBConnection.insert("weather", hashMap, stringObjectHashMap, System.currentTimeMillis(), TimeUnit.MILLISECONDS);Thread.sleep(faker.number().numberBetween(500,1000));k++;}}
}

这篇关于influxdb产生实时数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1122763

相关文章

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

深入浅出SpringBoot WebSocket构建实时应用全面指南

《深入浅出SpringBootWebSocket构建实时应用全面指南》WebSocket是一种在单个TCP连接上进行全双工通信的协议,这篇文章主要为大家详细介绍了SpringBoot如何集成WebS... 目录前言为什么需要 WebSocketWebSocket 是什么Spring Boot 如何简化 We

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

MyBatis-Plus通用中等、大量数据分批查询和处理方法

《MyBatis-Plus通用中等、大量数据分批查询和处理方法》文章介绍MyBatis-Plus分页查询处理,通过函数式接口与Lambda表达式实现通用逻辑,方法抽象但功能强大,建议扩展分批处理及流式... 目录函数式接口获取分页数据接口数据处理接口通用逻辑工具类使用方法简单查询自定义查询方法总结函数式接口

SQL中如何添加数据(常见方法及示例)

《SQL中如何添加数据(常见方法及示例)》SQL全称为StructuredQueryLanguage,是一种用于管理关系数据库的标准编程语言,下面给大家介绍SQL中如何添加数据,感兴趣的朋友一起看看吧... 目录在mysql中,有多种方法可以添加数据。以下是一些常见的方法及其示例。1. 使用INSERT I

Python使用vllm处理多模态数据的预处理技巧

《Python使用vllm处理多模态数据的预处理技巧》本文深入探讨了在Python环境下使用vLLM处理多模态数据的预处理技巧,我们将从基础概念出发,详细讲解文本、图像、音频等多模态数据的预处理方法,... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核