NingG +

Storm 0.9.2:如何从Kafka读取数据

之前研读了In-Stream Big Data Processing,组里将基于此实现一个实时的数据分析系统,基本选定三个组件:Flume、Kafka、Storm,其中,Flume负责数据采集,Kafka是一个MQ:负责数据采集与数据分析之间解耦,Storm负责进行流式处理。

把这3个东西串起来,可以吗?可以的,之前整理了Flume与Kafka整合的文章,那Storm能够与Kafka整合吗?Storm官网有介绍: Storm Integrates,其中给出了Storm与Kafka集成的方案

之前都是以原文+注释方式,来阅读Storm的官方文档,现在集中整理一下。Storm集群的构成:

关于 spout 和 bolt ,说几点:

notes(ningg):topology中node是什么概念?spout、bolt?master、worker?jvm process?thread? RE:master、worker对应Storm的node,master负责控制,worker负责具体执行;spout、bolt是逻辑上的,并且分布在不同的worker上;每个spout、bolt可配置并发数,这个并发数对应启动的thread;不同的spout、bolt对应不同的thread,thread间不能共用;这些所有的thread由所有的worker process来执行,举例,累计300个thread,启动了30个worker,则平均每个worker process对应执行10个thread(前面的说法对吗?哈哈)

关于数据模型,即数据的结构,说几点:

Storm有两种执行模式,local modedistributed mode,补充几点:

关于Stream groupings,几点:

* stream grouping解决的问题:多个执行spout逻辑的thread都输出tuple,这些tuple要发送给bolt对应的多个thread,问题来了,tuple发给bolt的哪个thread?即,stream grouping解决:tuple在不同task之间传递关系; * shuffle grouping,随机分发;field grouping,根据给定的field进行分发;更多参考

Storm与Kafka的版本信息:

实现Storm读取Kafka中的数据,参考官网介绍, 本部分主要参考自storm-kafka的README。

Strom从Kafka中读取数据,本质:实现一个Storm中的Spout,来读取Kafka中的数据;这个Spout,可以称为Kafka Spout。实现一个Kafka Spout有两条路:

无论用哪种方式实现Kafka Spout,都分为两步走:

KafkaConfig类中涉及到的配置参数默认值如下:

  1. public int fetchSizeBytes = 1024 * 1024;
  2. public int socketTimeoutMs = 10000;
  3. public int fetchMaxWait = 10000;
  4. public int bufferSizeBytes = 1024 * 1024;
  5. public MultiScheme scheme = new RawMultiScheme();
  6. public boolean forceFromStart = false;
  7. public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
  8. public long maxOffsetBehind = Long.MAX_VALUE;
  9. public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
  10. public int metricsTimeBucketSizeInSecs = 60;

上面的MultiScheme类型的参数shceme,其负责:将Kafka中取出的byte[]转换为storm所需的tuple,这是一个扩展点,默认是原文输出。两种实现:SchemeAsMultiSchemeKeyValueSchemeAsMultiScheme可将读取的byte[]转换为String。

notes(ningg):几个疑问,列在下面了

Core Kafka Spout

本质是设置一个读取Kafka中数据的Kafka Spout,然后,将从替换原始local mode下,topology中的Spout即可。下面是一个已经验证过的实例

  1. TopologyBuilder builder = new TopologyBuilder();
  2. BrokerHosts hosts = new ZkHosts("121.7.2.12:2181");
  3. SpoutConfig spoutConfig = new SpoutConfig(hosts, "ningg", "/" + "ningg", UUID.randomUUID().toString());
  4. spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
  5. KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
  6. // set Spout.
  7. builder.setSpout("word", kafkaSpout, 3);
  8. builder.setBolt("result", new ExclamationBolt(), 3).shuffleGrouping("word");
  9. Config conf = new Config();
  10. conf.setDebug(true);
  11. // submit topology in local mode
  12. LocalCluster cluster = new LocalCluster();
  13. cluster.submitTopology("test", conf, builder.createTopology());

todo

下面的样例并还没验证:

  1. TridentTopology topology = new TridentTopology();
  2. BrokerHosts zk = new ZkHosts("localhost");
  3. TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
  4. spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
  5. OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);

原文地址:https://ningg.top/storm-with-kafka/
微信公众号 ningg, 联系我

同类文章:

微信搜索: 公众号 ningg, 联系我, 交个朋友.

Top