java 发送数据到flume_flume接收http请求,并将数据写到kafka

2023-10-07 04:40

本文主要是介绍java 发送数据到flume_flume接收http请求,并将数据写到kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

flume接收http请求,并将数据写到kafka,spark消费kafka的数据。是数据采集的经典框架。

直接上flume的配置:

source : http

channel : file

sink : kafka

xx :~/software/flume1.8/conf$ cat http-file-kafka.conf

# example.conf: A single-node Flume configuration

##########

# data example

# use post request, select raw, data example : [{"body" : "{'xx':'xxxxx1'}"}]

# just use the office request demo

#[{

# "headers" : {

# "timestamp" : "434324343",

# "host" : "random_host.example.com"

# "topic" : "venn" # if headers contain topic, will replace the default topic

# },

# "body" : "random_body" # random_body is the message send to channel

# }]

# Name the components on this agent1

agent1.sources = s1

agent1.sinks = k1

agent1.channels = c1

# Describe/configure the source

agent1.sources.s1.type = http

agent1.sources.s1.bind = spring # localhost 只能接收本地请求

agent1.sources.s1.port = 8084 # http的端口

agent1.sources.s1.handler = org.apache.flume.source.http.JSONHandler # 自带的接收http请求的handler

# Describe the sink

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # kafkasink

agent1.sinks.k1.kafka.topic = mytopic # topic

agent1.sinks.k1.kafka.bootstrap.servers = localhost:9092 # kafka host and port

agent1.sinks.k1.kafka.flumeBatchSize = 20

agent1.sinks.k1.kafka.producer.acks = 1

agent1.sinks.k1.kafka.producer.linger.ms = 1

agent1.sinks.k1.kafka.producer.compression.type = snappy # 压缩

# Use a channel which buffers events in memory

agent1.channels.c1.type = file

#agent1.channels.c1.capacity = 1000 # 这两个参数要配置,需要配大一点,不然channel满了会报错,http返回503(通道已满)

#agent1.channels.c1.transactionCapacity = 100

agent1.channels.c1.checkpointDir = /opt/flume/checkpoint

agent1.channels.c1.dataDirs = /opt/flume/channel

# Bind the source and sink to the channel

agent1.sources.s1.channels = c1

agent1.sinks.k1.channel = c1

有了flume的配置,下面启动flume:

./bin/flume-ng agent -n agent1 -c conf -f conf/http-to-kafka.properties -Dflume.root.logger=INFO,console

启动之后,就可以发http请求了。

http请求的格式如下:

[{

"headers" : {

"timestamp" : "434324343",

"host" : "random_host.example.com",

"topic" : "xxx"

},

"body" : "random_body"

},

{

"headers" : {

"namenode" : "namenode.example.com",

"datanode" : "random_datanode.example.com"

},

"body" : "really_random_body"

}]

注: http请求的headers中又topic 会替代配置文件中的topic

flume官网文档说:1.8.0版本的flume只支持0.9.x的kafka,不支持0.8.x的kafka了(没测过)

然后就是发数的程序了(自己请求太麻烦了。)

package com.venn.http;

import com.venn.entity.User;

import java.io.BufferedReader;

import java.io.DataOutputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.io.UnsupportedEncodingException;

import java.net.HttpURLConnection;

import java.net.MalformedURLException;

import java.net.URL;

import java.util.*;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.event.JSONEvent;

import com.google.gson.Gson;

import org.apache.flume.source.http.HTTPBadRequestException;

import org.apache.flume.source.http.HTTPSourceHandler;

import javax.servlet.http.HttpServletRequest;

/**

* Created by venn on 19-1-17.

*/

public class HttpDemo {

private static String urlStr = "http://localhost:8084";

private static Random random = new Random();

public static void main(String[] args) throws InterruptedException {

while (true){

String message = new User().toString();

send(message);

// Thread.sleep(1);

}

}

public static void send(String message){

System.out.println("send message : " + message);

try{

//创建连接

URL url = new URL(urlStr);

HttpURLConnection connection = (HttpURLConnection) url.openConnection();

connection.setDoOutput(true);

connection.setDoInput(true);

connection.setRequestMethod("POST");

connection.setUseCaches(false);

connection.setInstanceFollowRedirects(true);

connection.setRequestProperty("Content-Type",

"application/x-www-form-urlencoded");

connection.connect();

//POST请求

DataOutputStream out = new DataOutputStream(

connection.getOutputStream());

JSONEvent jsonEvent = new JSONEvent();

Map header = new HashMap();

header.put("timestamp", System.currentTimeMillis());

header.put("host", "venn");

header.put("topic","venn"+random.nextInt(4));

jsonEvent.setBody(message.getBytes());

jsonEvent.setHeaders(header);

Gson gson = new Gson();

List list = new ArrayList();

list.add(jsonEvent);

out.writeBytes(gson.toJson(list));

out.flush();

out.close();

//读取响应

BufferedReader reader = new BufferedReader(new InputStreamReader(

connection.getInputStream())); // 不会返回数据

int code = connection.getResponseCode();

String lines;

StringBuffer sb = new StringBuffer("");

while ((lines = reader.readLine()) != null) {

lines = new String(lines.getBytes(), "utf-8");

sb.append(lines);

}

System.out.println("code : " + code + ", message : " + sb);

reader.close();

// 断开连接

connection.disconnect();

} catch (MalformedURLException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

}

}

搞定。。

发数:

145113d51fba9115b1a3d501769313e5.png

kafka接收到的数据:

1e7605f3042aa2b8d28394f2f636cee6.png

注意: 由于在headers中加入了topic参数,实际接收到的数据是在不同的kafka topic中的

.net接收post请求并把数据转为字典格式

public SortedDictionary GetRequestPost() { int i = 0; SortedDictionary

MVC Control 接收post请求的json数据

[HttpPost] public string QueryInvoice() { string stream; using (var sr = new StreamReader(Request.In ...

servlet接收request请求的json数据

此次使用的是alibaba的fastjson:jar包为fastjson-1.2.7.jar 参考:https://www.qingtingip.com/h_229797.html 思路:由于此次接收 ...

FLume监控文件夹,将数据发送给Kafka以及HDFS的配置文件详解

详细配置文件flume-conf.properties如下: ############################################ # producer config ###### ...

将数据写到kafka的topic

package test05 import java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, P ...

Struts2 Action接收POST请求JSON数据及其实现解析

一.认识JSON JSON是一种轻量级.基于文本.与语言无关的数据交换格式,可以用文本格式的形式来存储或表示结构化的数据. 二.POST请求与Content-Type: application/jso ...

javaweb Servlet接收Android请求,并返回json数据

1.实现功能 (1)接收http请求 (2)获取Android客户端发送的参数对应的内容 (3)hibernate查询数据库 (4)返回json数据 2.java代码 import EntityCla ...

学习笔记_springmvc返回值、数据写到页面、表单提交、ajax、重定向

数据写到页面 后台往前台传数据 TestController添加 /** * 方法的返回值采用ModelAndView, new ModelAndView("index", map ...

随机推荐

Js添加消息提示数量

接到个新需求,类似以下这种需求,得把它封装成一个插件 后端给返回一个这种数据 var data = [ { key:"020506", num:5 }, { key:"0 ...

SQL Server 统计信息

SELECT * FROM SYS.stats _WA_Sys_00000009_00000062:统计对象的名称.不同的机器名称不同,自动创建的统计信息都以_WA_Sys开头,00000009表示的 ...

Window I/O 完成端口 (Windows I/O Completion Port (IOCP))

相关对象 IO EndPoint, 所有支持重叠IO(overlapped IO)的设备,比如文件,Winsock,管道等. IOCP, IO完成端口内核对象,可以使用API CreateIoComp ...

原创:整理编辑jQuery全部思维导图【附下载地址】

主图 全部图已经打包:下载地址 2. 3. 4. 5. 6. 附上一点简单说明 Dom对象和jquer对象之间的转化 如何将一个jquery对象转换为DOM对象? test是一个span元素 var ...

jqcss选择器

$("p").css("background-color","red"); $(this) 当前 HTML 元素$("p&quot ...

UVA 11300 Spreading the Wealth (数学推导 中位数)

Spreading the Wealth Problem A Communist regime is trying to redistribute wealth in a village. They ...

CKEditor 集成CKFinder集成

lCKEditor原名FckEditor,著名的HTML编辑器,可以在线编辑HTML内容,演示一下.打开.自己人用CKEditor,网友用UBBEditor. l配置参考文档,主要将ckeditor中 ...

UNIX环境高级编程——线程和fork

当线程调用fork时,就为子进程创建了整个进程地址空间的副本.子进程通过继承整个地址空间的副本,也从父进程那里继承了所有互斥量.读写锁和条件变量的状态.如果父进程包含多个线程,子进程在fork返回以后 ...

Reactive Programming

Reactive的表现 Reactive 规范是 JVM Reactive 扩展规范 Reactive Streams JVM,而 Reactive 实现框架则是最典型的实现: Reactive St ...

rman实验——测试备份压缩

oracle rman自带的备份压缩机制,可以有效的压缩备份的大小,降低磁盘的占用率.但是也会因为压缩而消耗更多的系统性能,和增加备份时间.现在就通过实验来看压缩和不压缩的区别. 进行不压缩全备 RM ...

这篇关于java 发送数据到flume_flume接收http请求,并将数据写到kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/156076

相关文章

redis在spring boot中异常退出的问题解决方案

《redis在springboot中异常退出的问题解决方案》:本文主要介绍redis在springboot中异常退出的问题解决方案,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴... 目录问题:解决 问题根源️ 解决方案1. 异步处理 + 提前ACK(关键步骤)2. 调整Redis消费者组

一文教你Java如何快速构建项目骨架

《一文教你Java如何快速构建项目骨架》在Java项目开发过程中,构建项目骨架是一项繁琐但又基础重要的工作,Java领域有许多代码生成工具可以帮助我们快速完成这一任务,下面就跟随小编一起来了解下... 目录一、代码生成工具概述常用 Java 代码生成工具简介代码生成工具的优势二、使用 MyBATis Gen

springboot项目redis缓存异常实战案例详解(提供解决方案)

《springboot项目redis缓存异常实战案例详解(提供解决方案)》redis基本上是高并发场景上会用到的一个高性能的key-value数据库,属于nosql类型,一般用作于缓存,一般是结合数据... 目录缓存异常实践案例缓存穿透问题缓存击穿问题(其中也解决了穿透问题)完整代码缓存异常实践案例Red

SpringCloud整合MQ实现消息总线服务方式

《SpringCloud整合MQ实现消息总线服务方式》:本文主要介绍SpringCloud整合MQ实现消息总线服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、背景介绍二、方案实践三、升级版总结一、背景介绍每当修改配置文件内容,如果需要客户端也同步更新,

Python Pandas高效处理Excel数据完整指南

《PythonPandas高效处理Excel数据完整指南》在数据驱动的时代,Excel仍是大量企业存储核心数据的工具,Python的Pandas库凭借其向量化计算、内存优化和丰富的数据处理接口,成为... 目录一、环境搭建与数据读取1.1 基础环境配置1.2 数据高效载入技巧二、数据清洗核心战术2.1 缺失

java中XML的使用全过程

《java中XML的使用全过程》:本文主要介绍java中XML的使用全过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录什么是XML特点XML作用XML的编写语法基本语法特殊字符编写约束XML的书写格式DTD文档schema文档解析XML的方法​​DOM解析XM

Java 的 Condition 接口与等待通知机制详解

《Java的Condition接口与等待通知机制详解》在Java并发编程里,实现线程间的协作与同步是极为关键的任务,本文将深入探究Condition接口及其背后的等待通知机制,感兴趣的朋友一起看... 目录一、引言二、Condition 接口概述2.1 基本概念2.2 与 Object 类等待通知方法的区别

SpringBoot项目中Redis存储Session对象序列化处理

《SpringBoot项目中Redis存储Session对象序列化处理》在SpringBoot项目中使用Redis存储Session时,对象的序列化和反序列化是关键步骤,下面我们就来讲讲如何在Spri... 目录一、为什么需要序列化处理二、Spring Boot 集成 Redis 存储 Session2.1

使用Java实现Navicat密码的加密与解密的代码解析

《使用Java实现Navicat密码的加密与解密的代码解析》:本文主要介绍使用Java实现Navicat密码的加密与解密,通过本文,我们了解了如何利用Java语言实现对Navicat保存的数据库密... 目录一、背景介绍二、环境准备三、代码解析四、核心代码展示五、总结在日常开发过程中,我们有时需要处理各种软

Java List排序实例代码详解

《JavaList排序实例代码详解》:本文主要介绍JavaList排序的相关资料,Java排序方法包括自然排序、自定义排序、Lambda简化及多条件排序,实现灵活且代码简洁,文中通过代码介绍的... 目录一、自然排序二、自定义排序规则三、使用 Lambda 表达式简化 Comparator四、多条件排序五、