java 发送数据到flume_flume接收http请求,并将数据写到kafka

2023-10-07 04:40

本文主要是介绍java 发送数据到flume_flume接收http请求,并将数据写到kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

flume接收http请求,并将数据写到kafka,spark消费kafka的数据。是数据采集的经典框架。

直接上flume的配置:

source : http

channel : file

sink : kafka

xx :~/software/flume1.8/conf$ cat http-file-kafka.conf

# example.conf: A single-node Flume configuration

##########

# data example

# use post request, select raw, data example : [{"body" : "{'xx':'xxxxx1'}"}]

# just use the office request demo

#[{

# "headers" : {

# "timestamp" : "434324343",

# "host" : "random_host.example.com"

# "topic" : "venn" # if headers contain topic, will replace the default topic

# },

# "body" : "random_body" # random_body is the message send to channel

# }]

# Name the components on this agent1

agent1.sources = s1

agent1.sinks = k1

agent1.channels = c1

# Describe/configure the source

agent1.sources.s1.type = http

agent1.sources.s1.bind = spring # localhost 只能接收本地请求

agent1.sources.s1.port = 8084 # http的端口

agent1.sources.s1.handler = org.apache.flume.source.http.JSONHandler # 自带的接收http请求的handler

# Describe the sink

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # kafkasink

agent1.sinks.k1.kafka.topic = mytopic # topic

agent1.sinks.k1.kafka.bootstrap.servers = localhost:9092 # kafka host and port

agent1.sinks.k1.kafka.flumeBatchSize = 20

agent1.sinks.k1.kafka.producer.acks = 1

agent1.sinks.k1.kafka.producer.linger.ms = 1

agent1.sinks.k1.kafka.producer.compression.type = snappy # 压缩

# Use a channel which buffers events in memory

agent1.channels.c1.type = file

#agent1.channels.c1.capacity = 1000 # 这两个参数要配置,需要配大一点,不然channel满了会报错,http返回503(通道已满)

#agent1.channels.c1.transactionCapacity = 100

agent1.channels.c1.checkpointDir = /opt/flume/checkpoint

agent1.channels.c1.dataDirs = /opt/flume/channel

# Bind the source and sink to the channel

agent1.sources.s1.channels = c1

agent1.sinks.k1.channel = c1

有了flume的配置,下面启动flume:

./bin/flume-ng agent -n agent1 -c conf -f conf/http-to-kafka.properties -Dflume.root.logger=INFO,console

启动之后,就可以发http请求了。

http请求的格式如下:

[{

"headers" : {

"timestamp" : "434324343",

"host" : "random_host.example.com",

"topic" : "xxx"

},

"body" : "random_body"

},

{

"headers" : {

"namenode" : "namenode.example.com",

"datanode" : "random_datanode.example.com"

},

"body" : "really_random_body"

}]

注: http请求的headers中又topic 会替代配置文件中的topic

flume官网文档说:1.8.0版本的flume只支持0.9.x的kafka,不支持0.8.x的kafka了(没测过)

然后就是发数的程序了(自己请求太麻烦了。)

package com.venn.http;

import com.venn.entity.User;

import java.io.BufferedReader;

import java.io.DataOutputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.io.UnsupportedEncodingException;

import java.net.HttpURLConnection;

import java.net.MalformedURLException;

import java.net.URL;

import java.util.*;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.event.JSONEvent;

import com.google.gson.Gson;

import org.apache.flume.source.http.HTTPBadRequestException;

import org.apache.flume.source.http.HTTPSourceHandler;

import javax.servlet.http.HttpServletRequest;

/**

* Created by venn on 19-1-17.

*/

public class HttpDemo {

private static String urlStr = "http://localhost:8084";

private static Random random = new Random();

public static void main(String[] args) throws InterruptedException {

while (true){

String message = new User().toString();

send(message);

// Thread.sleep(1);

}

}

public static void send(String message){

System.out.println("send message : " + message);

try{

//创建连接

URL url = new URL(urlStr);

HttpURLConnection connection = (HttpURLConnection) url.openConnection();

connection.setDoOutput(true);

connection.setDoInput(true);

connection.setRequestMethod("POST");

connection.setUseCaches(false);

connection.setInstanceFollowRedirects(true);

connection.setRequestProperty("Content-Type",

"application/x-www-form-urlencoded");

connection.connect();

//POST请求

DataOutputStream out = new DataOutputStream(

connection.getOutputStream());

JSONEvent jsonEvent = new JSONEvent();

Map header = new HashMap();

header.put("timestamp", System.currentTimeMillis());

header.put("host", "venn");

header.put("topic","venn"+random.nextInt(4));

jsonEvent.setBody(message.getBytes());

jsonEvent.setHeaders(header);

Gson gson = new Gson();

List list = new ArrayList();

list.add(jsonEvent);

out.writeBytes(gson.toJson(list));

out.flush();

out.close();

//读取响应

BufferedReader reader = new BufferedReader(new InputStreamReader(

connection.getInputStream())); // 不会返回数据

int code = connection.getResponseCode();

String lines;

StringBuffer sb = new StringBuffer("");

while ((lines = reader.readLine()) != null) {

lines = new String(lines.getBytes(), "utf-8");

sb.append(lines);

}

System.out.println("code : " + code + ", message : " + sb);

reader.close();

// 断开连接

connection.disconnect();

} catch (MalformedURLException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

}

}

搞定。。

发数:

145113d51fba9115b1a3d501769313e5.png

kafka接收到的数据:

1e7605f3042aa2b8d28394f2f636cee6.png

注意: 由于在headers中加入了topic参数,实际接收到的数据是在不同的kafka topic中的

.net接收post请求并把数据转为字典格式

public SortedDictionary GetRequestPost() { int i = 0; SortedDictionary

MVC Control 接收post请求的json数据

[HttpPost] public string QueryInvoice() { string stream; using (var sr = new StreamReader(Request.In ...

servlet接收request请求的json数据

此次使用的是alibaba的fastjson:jar包为fastjson-1.2.7.jar 参考:https://www.qingtingip.com/h_229797.html 思路:由于此次接收 ...

FLume监控文件夹,将数据发送给Kafka以及HDFS的配置文件详解

详细配置文件flume-conf.properties如下: ############################################ # producer config ###### ...

将数据写到kafka的topic

package test05 import java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, P ...

Struts2 Action接收POST请求JSON数据及其实现解析

一.认识JSON JSON是一种轻量级.基于文本.与语言无关的数据交换格式,可以用文本格式的形式来存储或表示结构化的数据. 二.POST请求与Content-Type: application/jso ...

javaweb Servlet接收Android请求,并返回json数据

1.实现功能 (1)接收http请求 (2)获取Android客户端发送的参数对应的内容 (3)hibernate查询数据库 (4)返回json数据 2.java代码 import EntityCla ...

学习笔记_springmvc返回值、数据写到页面、表单提交、ajax、重定向

数据写到页面 后台往前台传数据 TestController添加 /** * 方法的返回值采用ModelAndView, new ModelAndView("index", map ...

随机推荐

Js添加消息提示数量

接到个新需求,类似以下这种需求,得把它封装成一个插件 后端给返回一个这种数据 var data = [ { key:"020506", num:5 }, { key:"0 ...

SQL Server 统计信息

SELECT * FROM SYS.stats _WA_Sys_00000009_00000062:统计对象的名称.不同的机器名称不同,自动创建的统计信息都以_WA_Sys开头,00000009表示的 ...

Window I/O 完成端口 (Windows I/O Completion Port (IOCP))

相关对象 IO EndPoint, 所有支持重叠IO(overlapped IO)的设备,比如文件,Winsock,管道等. IOCP, IO完成端口内核对象,可以使用API CreateIoComp ...

原创:整理编辑jQuery全部思维导图【附下载地址】

主图 全部图已经打包:下载地址 2. 3. 4. 5. 6. 附上一点简单说明 Dom对象和jquer对象之间的转化 如何将一个jquery对象转换为DOM对象? test是一个span元素 var ...

jqcss选择器

$("p").css("background-color","red"); $(this) 当前 HTML 元素$("p&quot ...

UVA 11300 Spreading the Wealth (数学推导 中位数)

Spreading the Wealth Problem A Communist regime is trying to redistribute wealth in a village. They ...

CKEditor 集成CKFinder集成

lCKEditor原名FckEditor,著名的HTML编辑器,可以在线编辑HTML内容,演示一下.打开.自己人用CKEditor,网友用UBBEditor. l配置参考文档,主要将ckeditor中 ...

UNIX环境高级编程——线程和fork

当线程调用fork时,就为子进程创建了整个进程地址空间的副本.子进程通过继承整个地址空间的副本,也从父进程那里继承了所有互斥量.读写锁和条件变量的状态.如果父进程包含多个线程,子进程在fork返回以后 ...

Reactive Programming

Reactive的表现 Reactive 规范是 JVM Reactive 扩展规范 Reactive Streams JVM,而 Reactive 实现框架则是最典型的实现: Reactive St ...

rman实验——测试备份压缩

oracle rman自带的备份压缩机制,可以有效的压缩备份的大小,降低磁盘的占用率.但是也会因为压缩而消耗更多的系统性能,和增加备份时间.现在就通过实验来看压缩和不压缩的区别. 进行不压缩全备 RM ...

这篇关于java 发送数据到flume_flume接收http请求,并将数据写到kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/156076

相关文章

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

Java中Redisson 的原理深度解析

《Java中Redisson的原理深度解析》Redisson是一个高性能的Redis客户端,它通过将Redis数据结构映射为Java对象和分布式对象,实现了在Java应用中方便地使用Redis,本文... 目录前言一、核心设计理念二、核心架构与通信层1. 基于 Netty 的异步非阻塞通信2. 编解码器三、

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

一篇文章彻底搞懂macOS如何决定java环境

《一篇文章彻底搞懂macOS如何决定java环境》MacOS作为一个功能强大的操作系统,为开发者提供了丰富的开发工具和框架,下面:本文主要介绍macOS如何决定java环境的相关资料,文中通过代码... 目录方法一:使用 which命令方法二:使用 Java_home工具(Apple 官方推荐)那问题来了,

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

Java中的.close()举例详解

《Java中的.close()举例详解》.close()方法只适用于通过window.open()打开的弹出窗口,对于浏览器的主窗口,如果没有得到用户允许是不能关闭的,:本文主要介绍Java中的.... 目录当你遇到以下三种情况时,一定要记得使用 .close():用法作用举例如何判断代码中的 input

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、