influxdb产生实时数据

2024-08-31 03:32
文章标签 数据 实时 influxdb 产生

本文主要是介绍influxdb产生实时数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

开发工具:idea 简历maven工程

<dependency><groupId>org.influxdb</groupId><artifactId>influxdb-java</artifactId><version>2.17</version>
</dependency>
<dependency><groupId>com.github.javafaker</groupId><artifactId>javafaker</artifactId><version>1.0.2</version>
</dependency>

 java工程钟创立两个java文件,目录结构如下

influxDBConnection文件中添加如下代码

package com.influx;import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.*;import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;public class InfluxDBConnection {// 用户名private String username;// 密码private String password;// 连接地址private String openurl;// 数据库private String database;// 保留策略private String retentionPolicy;private InfluxDB influxDB;public InfluxDBConnection(String username, String password, String openurl, String database,String retentionPolicy) {this.username = username;this.password = password;this.openurl = openurl;this.database = database;this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy;influxDbBuild();}/*** 创建数据库** @param dbName*/@SuppressWarnings("deprecation")public void createDB(String dbName) {influxDB.createDatabase(dbName);}/*** 删除数据库** @param dbName*/@SuppressWarnings("deprecation")public void deleteDB(String dbName) {influxDB.deleteDatabase(dbName);}/*** 测试连接是否正常** @return true 正常*/public boolean ping() {boolean isConnected = false;Pong pong;try {pong = influxDB.ping();if (pong != null) {isConnected = true;}} catch (Exception e) {e.printStackTrace();}return isConnected;}/*** 连接时序数据库 ,若不存在则创建** @return*/public InfluxDB influxDbBuild() {if (influxDB == null) {influxDB = InfluxDBFactory.connect(openurl, username, password);}try {// if (!influxDB.databaseExists(database)) {// influxDB.createDatabase(database);// }} catch (Exception e) {// 该数据库可能设置动态代理,不支持创建数据库// e.printStackTrace();} finally {influxDB.setRetentionPolicy(retentionPolicy);}influxDB.setLogLevel(InfluxDB.LogLevel.NONE);return influxDB;}/*** 创建自定义保留策略** @param policyName*            策略名* @param duration*            保存天数* @param replication*            保存副本数量* @param isDefault*            是否设为默认保留策略*/public void createRetentionPolicy(String policyName, String duration, int replication, Boolean isDefault) {String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s ", policyName,database, duration, replication);if (isDefault) {sql = sql + " DEFAULT";}this.query(sql);}/*** 创建默认的保留策略** @param :default,保存天数:30天,保存副本数量:1*            设为默认保留策略*/public void createDefaultRetentionPolicy() {String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT","default", database, "30d", 1);this.query(command);}/*** 查询** @param command*            查询语句* @return*/public QueryResult query(String command) {return influxDB.query(new Query(command, database));}/*** 插入** @param measurement*            表* @param tags*            标签* @param fields*            字段*/public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time,TimeUnit timeUnit) {Point.Builder builder = Point.measurement(measurement);builder.tag(tags);builder.fields(fields);if (0 != time) {builder.time(time, timeUnit);}influxDB.write(database, retentionPolicy, builder.build());}/*** 批量写入测点** @param batchPoints*/public void batchInsert(BatchPoints batchPoints) {influxDB.write(batchPoints);// influxDB.enableGzip();// influxDB.enableBatch(2000,100,TimeUnit.MILLISECONDS);// influxDB.disableGzip();// influxDB.disableBatch();}/*** 批量写入数据** @param database*            数据库* @param retentionPolicy*            保存策略* @param consistency*            一致性* @param records*            要保存的数据(调用BatchPoints.lineProtocol()可得到一条record)*/public void batchInsert(final String database, final String retentionPolicy, final InfluxDB.ConsistencyLevel consistency,final List<String> records) {influxDB.write(database, retentionPolicy, consistency, records);}/*** 删除** @param command*            删除语句* @return 返回错误信息*/public String deleteMeasurementData(String command) {QueryResult result = influxDB.query(new Query(command, database));return result.getError();}/*** 关闭数据库*/public void close() {influxDB.close();}/*** 构建Point** @param measurement* @param time* @param fields* @return*/public Point pointBuilder(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) {Point point = Point.measurement(measurement).time(time, TimeUnit.MILLISECONDS).tag(tags).fields(fields).build();return point;}
}

 InfluxDBManager中编写数据写入代码

package com.influx;import com.github.javafaker.Faker;
import com.mysql.cj.x.protobuf.MysqlxDatatypes;
import org.influxdb.InfluxDB;
import org.influxdb.dto.QueryResult;import java.util.HashMap;
import java.util.Locale;
import java.util.concurrent.TimeUnit;public class InfluxDBManager {public static void main(String[] args) throws InterruptedException {//这一行代码数据库名,表名,ip地址,还有持久策略要根据每个人实际情况填写InfluxDBConnection influxDBConnection = new InfluxDBConnection("influx", "influx", "http://192.168.224.129:8086", "influxdb", "influx_retention");Faker faker = new Faker(new Locale("zh-CN"));String[] areas = {"南","北","东","西"};Integer[] altitudes = new Integer[]{500,800,1000,1500};int k = 1;while(true){System.out.println("当前是第轮插入数据:"+ k);int al_intdex = (int)Math.floor(Math.random() * altitudes.length);int ar_index = (int)Math.floor(Math.random() * areas.length);HashMap<String, String> hashMap = new HashMap<String, String>();hashMap.put("altitude", altitudes[al_intdex].toString());hashMap.put("area", areas[ar_index].toString());HashMap<String, Object> stringObjectHashMap = new HashMap<String, Object>();stringObjectHashMap.put("temperature", faker.number().numberBetween(10,30));stringObjectHashMap.put("humidity", faker.number().numberBetween(10,30));influxDBConnection.insert("weather", hashMap, stringObjectHashMap, System.currentTimeMillis(), TimeUnit.MILLISECONDS);Thread.sleep(faker.number().numberBetween(500,1000));k++;}}
}

这篇关于influxdb产生实时数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1122763

相关文章

使用Java将各种数据写入Excel表格的操作示例

《使用Java将各种数据写入Excel表格的操作示例》在数据处理与管理领域,Excel凭借其强大的功能和广泛的应用,成为了数据存储与展示的重要工具,在Java开发过程中,常常需要将不同类型的数据,本文... 目录前言安装免费Java库1. 写入文本、或数值到 Excel单元格2. 写入数组到 Excel表格

python处理带有时区的日期和时间数据

《python处理带有时区的日期和时间数据》这篇文章主要为大家详细介绍了如何在Python中使用pytz库处理时区信息,包括获取当前UTC时间,转换为特定时区等,有需要的小伙伴可以参考一下... 目录时区基本信息python datetime使用timezonepandas处理时区数据知识延展时区基本信息

Qt实现网络数据解析的方法总结

《Qt实现网络数据解析的方法总结》在Qt中解析网络数据通常涉及接收原始字节流,并将其转换为有意义的应用层数据,这篇文章为大家介绍了详细步骤和示例,感兴趣的小伙伴可以了解下... 目录1. 网络数据接收2. 缓冲区管理(处理粘包/拆包)3. 常见数据格式解析3.1 jsON解析3.2 XML解析3.3 自定义

SpringMVC 通过ajax 前后端数据交互的实现方法

《SpringMVC通过ajax前后端数据交互的实现方法》:本文主要介绍SpringMVC通过ajax前后端数据交互的实现方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价... 在前端的开发过程中,经常在html页面通过AJAX进行前后端数据的交互,SpringMVC的controll

Pandas统计每行数据中的空值的方法示例

《Pandas统计每行数据中的空值的方法示例》处理缺失数据(NaN值)是一个非常常见的问题,本文主要介绍了Pandas统计每行数据中的空值的方法示例,具有一定的参考价值,感兴趣的可以了解一下... 目录什么是空值?为什么要统计空值?准备工作创建示例数据统计每行空值数量进一步分析www.chinasem.cn处

如何使用 Python 读取 Excel 数据

《如何使用Python读取Excel数据》:本文主要介绍使用Python读取Excel数据的详细教程,通过pandas和openpyxl,你可以轻松读取Excel文件,并进行各种数据处理操... 目录使用 python 读取 Excel 数据的详细教程1. 安装必要的依赖2. 读取 Excel 文件3. 读

Spring 请求之传递 JSON 数据的操作方法

《Spring请求之传递JSON数据的操作方法》JSON就是一种数据格式,有自己的格式和语法,使用文本表示一个对象或数组的信息,因此JSON本质是字符串,主要负责在不同的语言中数据传递和交换,这... 目录jsON 概念JSON 语法JSON 的语法JSON 的两种结构JSON 字符串和 Java 对象互转

C++如何通过Qt反射机制实现数据类序列化

《C++如何通过Qt反射机制实现数据类序列化》在C++工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作,所以本文就来聊聊C++如何通过Qt反射机制实现数据类序列化吧... 目录设计预期设计思路代码实现使用方法在 C++ 工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作。由于数据类

SpringBoot使用GZIP压缩反回数据问题

《SpringBoot使用GZIP压缩反回数据问题》:本文主要介绍SpringBoot使用GZIP压缩反回数据问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录SpringBoot使用GZIP压缩反回数据1、初识gzip2、gzip是什么,可以干什么?3、Spr

SpringBoot集成Milvus实现数据增删改查功能

《SpringBoot集成Milvus实现数据增删改查功能》milvus支持的语言比较多,支持python,Java,Go,node等开发语言,本文主要介绍如何使用Java语言,采用springboo... 目录1、Milvus基本概念2、添加maven依赖3、配置yml文件4、创建MilvusClient