NingG +

Flume实现将Kafka中数据传入ElasticSearch中

目标:利用Flume Agent实现,将Kafka中数据取出,送入ElasticSearch中。

分析:Flume Agent需要的工作,两点:

当前Flume 1.5.2已经包含了ElasticSearchSink,因此,需要定制实现Flume Kafka Source即可。当前从Jira上得知,Flume 1.6.0 中将包含Flume-ng-kafka-source,但是,当前Flume 1.6.0版本并没有发布,怎么办?两条路:

初步选定Flume 1.6.0分支中的flume-ng-kafka-source部分,这部分代码已经包含在flume-ng-extends-source

编译代码

执行命令:mvn clean package得到jar包:flume-ng-extends-source-x.x.x.jar

安装插件

两类jar包:

疑问:maven打包时,如何将当前jar包以及其依赖包都导出? 参考thilinamb flume kafka sink

配置

在properties文件中进行配置,配置样本文件:

# Kafka Source For retrieve from Kafka cluster.
agent.sources.seqGenSrc.type = com.github.ningg.flume.source.KafkaSource
#agent.sources.seqGenSrc.batchSize = 2
agent.sources.seqGenSrc.batchDurationMillis = 1000
agent.sources.seqGenSrc.topic = good
agent.sources.seqGenSrc.zookeeperConnect = 168.7.2.164:2181,168.7.2.165:2181,168.7.2.166:2181
agent.sources.seqGenSrc.groupId = elasticsearch
#agent.sources.seqGenSrc.kafka.consumer.timeout.ms = 1000
#agent.sources.seqGenSrc.kafka.auto.commit.enable = false

# ElasticSearchSink for ElasticSearch.
agent.sinks.loggerSink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent.sinks.loggerSink.indexName = flume
agent.sinks.loggerSink.indexType = log
agent.sinks.loggerSink.batchSize = 100
#agent.sinks.loggerSink.ttl = 5
agent.sinks.loggerSink.client = transport
agent.sinks.loggerSink.hostNames = 168.7.1.69:9300
#agent.sinks.loggerSink.client = rest
#agent.sinks.loggerSink.hostNames = 168.7.1.69:9200
#agent.sinks.loggerSink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer

定制

目标:定制ElasticSearchSink的serializer。

现象:设置ElasticSearchSink的参数batchSize=1000后,当前ES中当天的Index中出现了120,000+的记录,而此时,原有平台发现,当前产生的数据只有20,000,因此,猜测KafkaSource将Kafka集群中指定topic下的所有数据都传入了ES中。

几点:

ElasticSearchSink中新的配置参数:

重启

如果终止Flume Agent,然后重启。疑问:

思考,几个情况:

需要详细思考Flume Agent的重启场景。

Top