Apache Storm 简单实践

2024-08-22 14:58
文章标签 简单 实践 apache storm

本文主要是介绍Apache Storm 简单实践,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Apache Storm 简单实践

前两篇文章介绍了Apache Storm的一些基础知识以及核心架构。

  • Apache Storm 集群安装配置\
  • Apache Strom 实时计算系统

本篇文章介绍一些Storm的简单实践场景。

创建一个Storm项目

实践场景为,基于Storm开发出一个实时统计句子中的单词个数的拓扑,实时数据我们通过随机发射句子,在实际应用场景中,实时数据可能来自MQ或者其他来源。

使用IDEA创建一个maven项目,在pom.xml文件中添加以下依赖:

<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.1.0</version><scope>provided</scope>
</dependency>

上两篇文章说了,Storm的数据源自于Spout。所以我们需要创建一个Spout,由于是一个简单的场景,实时数据我们通过随机发射句子:

Spout的代码如下:

public static class RandomSentenceSpout extends BaseRichSpout{private SpoutOutputCollector collector;private Random random;public void open(Map conf, TopologyContext context,SpoutOutputCollector collector){this.collector = collector;this.random = new Random();}public void nextTuple(){Utils.sleep(100);String[] sentences = new String[]{"the cow jumped over the moon", "an apple a day keeps the doctor away","four score and seven years ago", "snow white and the seven dwarfs","i am at two with nature"};String sentence = sentences[random.nextInt(sentences.length)];collector.emit(new Values(sentence));}public void declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new Fields("sentence"));}
}

上面代码很简单,在拓扑启动的时候启动的时候会调用open方法,我们在这里保存了collector,然后Storm会不断的调用nextTuple方法,所以我们在这里把句子发射出去。然后在declareOutputFields声明了发射出去的句子的索引。

上面我们已经把句子发射出去了,接下来的工作就是把句子中的单词切割出来,然后再发射出去。

切割单词的任务交给一个bolt来做

  public static class SplitSentenceBolt extends BaseRichBolt{private OutputCollector collector;public void prepare(Map conf, TopologyContext context,OutputCollector collector) {this.collector = collector;}public void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for(String word : words) {collector.emit(new Values(word));}}/*** 定义发射出去的tuple,每个field的名称*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}

上面代码把句子切割后发射出去,最后我们还需要一个bolt来统计单词的数量。

 public static class WordCountBolt extends BaseRichBolt {private static final long serialVersionUID = 7208077706057284643L;private static final Logger LOGGER = LoggerFactory.getLogger(WordCountBolt.class);private OutputCollector collector;private Map<String, Long> wordCounts = new HashMap<String, Long>();@SuppressWarnings("rawtypes")public void prepare(Map conf, TopologyContext context, OutputCollector collector) {this.collector = collector;}public void execute(Tuple tuple) {String word = tuple.getStringByField("word");Long count = wordCounts.get(word);if(count == null) {count = 0L;}count++;wordCounts.put(word, count);LOGGER.info("【单词计数】" + word + "出现的次数是" + count);collector.emit(new Values(word, count));}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}
}

写完了SpoutBolt之后,接下来要创建一个Topology类,将SpoutBolt组合成为一个拓扑:

public class WordCountTopolpgy{public static void main(String[] args) {// 在main方法中,会去将spout和bolts组合起来,构建成一个拓扑TopologyBuilder builder = new TopologyBuilder();// 这里的第一个参数的意思,就是给这个spout设置一个名字// 第三个参数的意思,就是设置spout的executor有几个builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);builder.setBolt("SplitSentenceBolt", new SplitSentenceBolt(), 5).setNumTasks(10).shuffleGrouping("RandomSentence");builder.setBolt("WordCountBolt", new WordCountBolt(), 10).setNumTasks(20).fieldsGrouping("SplitSentenceBolt", new Fields("word"));Config config = new Config();// 说明是在命令行执行,打算提交到storm集群上去if(args != null && args.length > 0) {config.setNumWorkers(3);try {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {// 说明是在eclipse里面本地运行config.setMaxTaskParallelism(20);LocalCluster cluster = new LocalCluster();cluster.submitTopology("WordCountTopology", config, builder.createTopology());Utils.sleep(3000);cluster.shutdown();}}
}

上面已经开发完一个Topology了。接下来我们可以直接在本地运行,或者扔到Storm集群去运行。

如果在本地运行,则会模拟生成一个本地集群来运行,(注意本地运行需要去掉pom.xml Storm依赖中的<provided> 节点,真实集群运行需要加上这个节点)

如果要在集群中运行,需要执行命令:

mvn clean package 

得到一个jar包,将这个jar包上传到nimbus节点中,然后执行以下命令就可以运行了。

strom jar xxx.jar com.xxxx.WordCountTopology  WordCountTopology 

总结

本篇文章介绍了一个应用Storm的简单例子,演示了Storm的基本开发方式。可以通过这个例子,触类旁通,运行到真正的企业场景中去。

这篇关于Apache Storm 简单实践的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1096602

相关文章

JDK21对虚拟线程的几种用法实践指南

《JDK21对虚拟线程的几种用法实践指南》虚拟线程是Java中的一种轻量级线程,由JVM管理,特别适合于I/O密集型任务,:本文主要介绍JDK21对虚拟线程的几种用法,文中通过代码介绍的非常详细,... 目录一、参考官方文档二、什么是虚拟线程三、几种用法1、Thread.ofVirtual().start(

从基础到高级详解Go语言中错误处理的实践指南

《从基础到高级详解Go语言中错误处理的实践指南》Go语言采用了一种独特而明确的错误处理哲学,与其他主流编程语言形成鲜明对比,本文将为大家详细介绍Go语言中错误处理详细方法,希望对大家有所帮助... 目录1 Go 错误处理哲学与核心机制1.1 错误接口设计1.2 错误与异常的区别2 错误创建与检查2.1 基础

springboot依靠security实现digest认证的实践

《springboot依靠security实现digest认证的实践》HTTP摘要认证通过加密参数(如nonce、response)验证身份,避免明文传输,但存在密码存储风险,相比基本认证更安全,却因... 目录概述参数Demopom.XML依赖Digest1Application.JavaMyPasswo

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

Java 结构化并发Structured Concurrency实践举例

《Java结构化并发StructuredConcurrency实践举例》Java21结构化并发通过作用域和任务句柄统一管理并发生命周期,解决线程泄漏与任务追踪问题,提升代码安全性和可观测性,其核心... 目录一、结构化并发的核心概念与设计目标二、结构化并发的核心组件(一)作用域(Scopes)(二)任务句柄

Java中的Schema校验技术与实践示例详解

《Java中的Schema校验技术与实践示例详解》本主题详细介绍了在Java环境下进行XMLSchema和JSONSchema校验的方法,包括使用JAXP、JAXB以及专门的JSON校验库等技术,本文... 目录1. XML和jsON的Schema校验概念1.1 XML和JSON校验的必要性1.2 Sche

SpringBoot集成WebService(wsdl)实践

《SpringBoot集成WebService(wsdl)实践》文章介绍了SpringBoot项目中通过缓存IWebService接口实现类的泛型入参类型,减少反射调用提升性能的实现方案,包含依赖配置... 目录pom.XML创建入口ApplicationContextUtils.JavaJacksonUt

MyCat分库分表的项目实践

《MyCat分库分表的项目实践》分库分表解决大数据量和高并发性能瓶颈,MyCat作为中间件支持分片、读写分离与事务处理,本文就来介绍一下MyCat分库分表的实践,感兴趣的可以了解一下... 目录一、为什么要分库分表?二、分库分表的常见方案三、MyCat简介四、MyCat分库分表深度解析1. 架构原理2. 分

Java 中的 equals 和 hashCode 方法关系与正确重写实践案例

《Java中的equals和hashCode方法关系与正确重写实践案例》在Java中,equals和hashCode方法是Object类的核心方法,广泛用于对象比较和哈希集合(如HashMa... 目录一、背景与需求分析1.1 equals 和 hashCode 的背景1.2 需求分析1.3 技术挑战1.4

k8s搭建nfs共享存储实践

《k8s搭建nfs共享存储实践》本文介绍NFS服务端搭建与客户端配置,涵盖安装工具、目录设置及服务启动,随后讲解K8S中NFS动态存储部署,包括创建命名空间、ServiceAccount、RBAC权限... 目录1. NFS搭建1.1 部署NFS服务端1.1.1 下载nfs-utils和rpcbind1.1