java 发送数据到flume_flume接收http请求,并将数据写到kafka

2023-10-07 04:40

本文主要是介绍java 发送数据到flume_flume接收http请求,并将数据写到kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

flume接收http请求,并将数据写到kafka,spark消费kafka的数据。是数据采集的经典框架。

直接上flume的配置:

source : http

channel : file

sink : kafka

xx :~/software/flume1.8/conf$ cat http-file-kafka.conf

# example.conf: A single-node Flume configuration

##########

# data example

# use post request, select raw, data example : [{"body" : "{'xx':'xxxxx1'}"}]

# just use the office request demo

#[{

# "headers" : {

# "timestamp" : "434324343",

# "host" : "random_host.example.com"

# "topic" : "venn" # if headers contain topic, will replace the default topic

# },

# "body" : "random_body" # random_body is the message send to channel

# }]

# Name the components on this agent1

agent1.sources = s1

agent1.sinks = k1

agent1.channels = c1

# Describe/configure the source

agent1.sources.s1.type = http

agent1.sources.s1.bind = spring # localhost 只能接收本地请求

agent1.sources.s1.port = 8084 # http的端口

agent1.sources.s1.handler = org.apache.flume.source.http.JSONHandler # 自带的接收http请求的handler

# Describe the sink

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # kafkasink

agent1.sinks.k1.kafka.topic = mytopic # topic

agent1.sinks.k1.kafka.bootstrap.servers = localhost:9092 # kafka host and port

agent1.sinks.k1.kafka.flumeBatchSize = 20

agent1.sinks.k1.kafka.producer.acks = 1

agent1.sinks.k1.kafka.producer.linger.ms = 1

agent1.sinks.k1.kafka.producer.compression.type = snappy # 压缩

# Use a channel which buffers events in memory

agent1.channels.c1.type = file

#agent1.channels.c1.capacity = 1000 # 这两个参数要配置,需要配大一点,不然channel满了会报错,http返回503(通道已满)

#agent1.channels.c1.transactionCapacity = 100

agent1.channels.c1.checkpointDir = /opt/flume/checkpoint

agent1.channels.c1.dataDirs = /opt/flume/channel

# Bind the source and sink to the channel

agent1.sources.s1.channels = c1

agent1.sinks.k1.channel = c1

有了flume的配置,下面启动flume:

./bin/flume-ng agent -n agent1 -c conf -f conf/http-to-kafka.properties -Dflume.root.logger=INFO,console

启动之后,就可以发http请求了。

http请求的格式如下:

[{

"headers" : {

"timestamp" : "434324343",

"host" : "random_host.example.com",

"topic" : "xxx"

},

"body" : "random_body"

},

{

"headers" : {

"namenode" : "namenode.example.com",

"datanode" : "random_datanode.example.com"

},

"body" : "really_random_body"

}]

注: http请求的headers中又topic 会替代配置文件中的topic

flume官网文档说:1.8.0版本的flume只支持0.9.x的kafka,不支持0.8.x的kafka了(没测过)

然后就是发数的程序了(自己请求太麻烦了。)

package com.venn.http;

import com.venn.entity.User;

import java.io.BufferedReader;

import java.io.DataOutputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.io.UnsupportedEncodingException;

import java.net.HttpURLConnection;

import java.net.MalformedURLException;

import java.net.URL;

import java.util.*;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.event.JSONEvent;

import com.google.gson.Gson;

import org.apache.flume.source.http.HTTPBadRequestException;

import org.apache.flume.source.http.HTTPSourceHandler;

import javax.servlet.http.HttpServletRequest;

/**

* Created by venn on 19-1-17.

*/

public class HttpDemo {

private static String urlStr = "http://localhost:8084";

private static Random random = new Random();

public static void main(String[] args) throws InterruptedException {

while (true){

String message = new User().toString();

send(message);

// Thread.sleep(1);

}

}

public static void send(String message){

System.out.println("send message : " + message);

try{

//创建连接

URL url = new URL(urlStr);

HttpURLConnection connection = (HttpURLConnection) url.openConnection();

connection.setDoOutput(true);

connection.setDoInput(true);

connection.setRequestMethod("POST");

connection.setUseCaches(false);

connection.setInstanceFollowRedirects(true);

connection.setRequestProperty("Content-Type",

"application/x-www-form-urlencoded");

connection.connect();

//POST请求

DataOutputStream out = new DataOutputStream(

connection.getOutputStream());

JSONEvent jsonEvent = new JSONEvent();

Map header = new HashMap();

header.put("timestamp", System.currentTimeMillis());

header.put("host", "venn");

header.put("topic","venn"+random.nextInt(4));

jsonEvent.setBody(message.getBytes());

jsonEvent.setHeaders(header);

Gson gson = new Gson();

List list = new ArrayList();

list.add(jsonEvent);

out.writeBytes(gson.toJson(list));

out.flush();

out.close();

//读取响应

BufferedReader reader = new BufferedReader(new InputStreamReader(

connection.getInputStream())); // 不会返回数据

int code = connection.getResponseCode();

String lines;

StringBuffer sb = new StringBuffer("");

while ((lines = reader.readLine()) != null) {

lines = new String(lines.getBytes(), "utf-8");

sb.append(lines);

}

System.out.println("code : " + code + ", message : " + sb);

reader.close();

// 断开连接

connection.disconnect();

} catch (MalformedURLException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

}

}

搞定。。

发数:

145113d51fba9115b1a3d501769313e5.png

kafka接收到的数据:

1e7605f3042aa2b8d28394f2f636cee6.png

注意: 由于在headers中加入了topic参数,实际接收到的数据是在不同的kafka topic中的

.net接收post请求并把数据转为字典格式

public SortedDictionary GetRequestPost() { int i = 0; SortedDictionary

MVC Control 接收post请求的json数据

[HttpPost] public string QueryInvoice() { string stream; using (var sr = new StreamReader(Request.In ...

servlet接收request请求的json数据

此次使用的是alibaba的fastjson:jar包为fastjson-1.2.7.jar 参考:https://www.qingtingip.com/h_229797.html 思路:由于此次接收 ...

FLume监控文件夹,将数据发送给Kafka以及HDFS的配置文件详解

详细配置文件flume-conf.properties如下: ############################################ # producer config ###### ...

将数据写到kafka的topic

package test05 import java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, P ...

Struts2 Action接收POST请求JSON数据及其实现解析

一.认识JSON JSON是一种轻量级.基于文本.与语言无关的数据交换格式,可以用文本格式的形式来存储或表示结构化的数据. 二.POST请求与Content-Type: application/jso ...

javaweb Servlet接收Android请求,并返回json数据

1.实现功能 (1)接收http请求 (2)获取Android客户端发送的参数对应的内容 (3)hibernate查询数据库 (4)返回json数据 2.java代码 import EntityCla ...

学习笔记_springmvc返回值、数据写到页面、表单提交、ajax、重定向

数据写到页面 后台往前台传数据 TestController添加 /** * 方法的返回值采用ModelAndView, new ModelAndView("index", map ...

随机推荐

Js添加消息提示数量

接到个新需求,类似以下这种需求,得把它封装成一个插件 后端给返回一个这种数据 var data = [ { key:"020506", num:5 }, { key:"0 ...

SQL Server 统计信息

SELECT * FROM SYS.stats _WA_Sys_00000009_00000062:统计对象的名称.不同的机器名称不同,自动创建的统计信息都以_WA_Sys开头,00000009表示的 ...

Window I/O 完成端口 (Windows I/O Completion Port (IOCP))

相关对象 IO EndPoint, 所有支持重叠IO(overlapped IO)的设备,比如文件,Winsock,管道等. IOCP, IO完成端口内核对象,可以使用API CreateIoComp ...

原创:整理编辑jQuery全部思维导图【附下载地址】

主图 全部图已经打包:下载地址 2. 3. 4. 5. 6. 附上一点简单说明 Dom对象和jquer对象之间的转化 如何将一个jquery对象转换为DOM对象? test是一个span元素 var ...

jqcss选择器

$("p").css("background-color","red"); $(this) 当前 HTML 元素$("p&quot ...

UVA 11300 Spreading the Wealth (数学推导 中位数)

Spreading the Wealth Problem A Communist regime is trying to redistribute wealth in a village. They ...

CKEditor 集成CKFinder集成

lCKEditor原名FckEditor,著名的HTML编辑器,可以在线编辑HTML内容,演示一下.打开.自己人用CKEditor,网友用UBBEditor. l配置参考文档,主要将ckeditor中 ...

UNIX环境高级编程——线程和fork

当线程调用fork时,就为子进程创建了整个进程地址空间的副本.子进程通过继承整个地址空间的副本,也从父进程那里继承了所有互斥量.读写锁和条件变量的状态.如果父进程包含多个线程,子进程在fork返回以后 ...

Reactive Programming

Reactive的表现 Reactive 规范是 JVM Reactive 扩展规范 Reactive Streams JVM,而 Reactive 实现框架则是最典型的实现: Reactive St ...

rman实验——测试备份压缩

oracle rman自带的备份压缩机制,可以有效的压缩备份的大小,降低磁盘的占用率.但是也会因为压缩而消耗更多的系统性能,和增加备份时间.现在就通过实验来看压缩和不压缩的区别. 进行不压缩全备 RM ...

这篇关于java 发送数据到flume_flume接收http请求,并将数据写到kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/156076

相关文章

一文详解SpringBoot中控制器的动态注册与卸载

《一文详解SpringBoot中控制器的动态注册与卸载》在项目开发中,通过动态注册和卸载控制器功能,可以根据业务场景和项目需要实现功能的动态增加、删除,提高系统的灵活性和可扩展性,下面我们就来看看Sp... 目录项目结构1. 创建 Spring Boot 启动类2. 创建一个测试控制器3. 创建动态控制器注

Python使用vllm处理多模态数据的预处理技巧

《Python使用vllm处理多模态数据的预处理技巧》本文深入探讨了在Python环境下使用vLLM处理多模态数据的预处理技巧,我们将从基础概念出发,详细讲解文本、图像、音频等多模态数据的预处理方法,... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

Java操作Word文档的全面指南

《Java操作Word文档的全面指南》在Java开发中,操作Word文档是常见的业务需求,广泛应用于合同生成、报表输出、通知发布、法律文书生成、病历模板填写等场景,本文将全面介绍Java操作Word文... 目录简介段落页头与页脚页码表格图片批注文本框目录图表简介Word编程最重要的类是org.apach

Spring Boot中WebSocket常用使用方法详解

《SpringBoot中WebSocket常用使用方法详解》本文从WebSocket的基础概念出发,详细介绍了SpringBoot集成WebSocket的步骤,并重点讲解了常用的使用方法,包括简单消... 目录一、WebSocket基础概念1.1 什么是WebSocket1.2 WebSocket与HTTP

SpringBoot+Docker+Graylog 如何让错误自动报警

《SpringBoot+Docker+Graylog如何让错误自动报警》SpringBoot默认使用SLF4J与Logback,支持多日志级别和配置方式,可输出到控制台、文件及远程服务器,集成ELK... 目录01 Spring Boot 默认日志框架解析02 Spring Boot 日志级别详解03 Sp

java中反射Reflection的4个作用详解

《java中反射Reflection的4个作用详解》反射Reflection是Java等编程语言中的一个重要特性,它允许程序在运行时进行自我检查和对内部成员(如字段、方法、类等)的操作,本文将详细介绍... 目录作用1、在运行时判断任意一个对象所属的类作用2、在运行时构造任意一个类的对象作用3、在运行时判断

java如何解压zip压缩包

《java如何解压zip压缩包》:本文主要介绍java如何解压zip压缩包问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java解压zip压缩包实例代码结果如下总结java解压zip压缩包坐在旁边的小伙伴问我怎么用 java 将服务器上的压缩文件解压出来,

SpringBoot中SM2公钥加密、私钥解密的实现示例详解

《SpringBoot中SM2公钥加密、私钥解密的实现示例详解》本文介绍了如何在SpringBoot项目中实现SM2公钥加密和私钥解密的功能,通过使用Hutool库和BouncyCastle依赖,简化... 目录一、前言1、加密信息(示例)2、加密结果(示例)二、实现代码1、yml文件配置2、创建SM2工具

Spring WebFlux 与 WebClient 使用指南及最佳实践

《SpringWebFlux与WebClient使用指南及最佳实践》WebClient是SpringWebFlux模块提供的非阻塞、响应式HTTP客户端,基于ProjectReactor实现,... 目录Spring WebFlux 与 WebClient 使用指南1. WebClient 概述2. 核心依

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2