NingG +

Flume 1.5.0.1:如何将flume聚合的数据送入Kafka

背景

Flume收集分布在不同机器上的日志信息,聚合之后,将信息送入Kafka消息队列,问题来了:如何将Flume输出的信息送入Kafka中?

定一个场景:flume读取apache的访问日志,然后送入Kafka中,最终消息从Kafka中取出,显示在终端屏幕上(stdout)。

Flume复习

整理一下Flume的基本知识,参考来源有两个:

几个概念

练习

场景:Flume收集apache访问日志,然后,在标准终端(stdout)显示。

分析:Flume官方文档中,已经给出了一个demo,flume从localhost:port收集数据,并在标准终端上显示。基于这一场景,只需要修改Source即可。

构造实例

通过参阅Flume官网,得知ExecSource可用于捕获命令的输出,并将输出结果按行构造event,tail -F [local file]命令用于查阅文件[local file]的新增内容;在$FLUME_HOME/conf目录下,新建文件apache_log_scan.log,内容如下:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/httpd/access_log

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动Flume agent,命令如下:

[ningg@localhost flume]$ cd conf
[ningg@localhost  conf]$ sudo ../bin/flume-ng agent --conf ../conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
...
...
 Component type: SOURCE, name: r1 started

然后访问一下Apache承载的网站,可以看到上面的窗口也在输出信息,即,已经在捕获Apache访问日志access_log的增量了。(可以另起一个窗口,通过tail -F access_log查看日志的实际内容)

存在的问题

通过比较Flume上sink的输出、tail -F access_log命令的输出,发现输出有差异:

# Flume上logger类型sink的输出
Event: { headers:{} body: 31 36 38 2E 35 2E 31 33 30 2E 31 37 35 20 2D 20 168.5.130.175 -  }

# access_log原始文件上的新增内容(长度超过上面logger sink的输出)
168.5.130.175 - - [23/Oct/2014:16:34:59 +0800] "GET /..."

思考:

  1. logger类型的sink,遇到[字符就结束?
  2. logger类型的sink,有字符长度的限制吗?
  3. channel有长度限制?channel中存储的event是什么形式存储的?

通过vim access_log,向文件最后添加一行内容,发现应该是logger类型的sink,对于event的长度有限制;或者,memory类型的channel对于存储的event有限制。 RE:上述问题已经解决,Logger sink输出内容不完整,详情可参考Advanced Logger Sink

Kafka复习

下面Kafka的相关总结都参考自:

几个概念

针对上面每个topic对应的partitioned log,其中包含了多个partition,这样设计有什么好处?

Producer产生的数据放到topic的哪个partition下?集中方式:

Consumer读取message有两种模式:

Kafka中通过将consumer泛化为consumer group来实现,来支持上述两种模式,关于此,详细说一下:

notes(ningg):几个问题:

Ordering guarantee,Kafka保证message按序处理,同时也保证并行处理,几点:

notes(ningg):同一个partition中的message,当其中一个message A被指派给一个consumer instance后,在message A被处理完之前,message B是否会被指派出去?RE:细节还没看,具当前的理解,应该是串行处理的,即,一个处理完后,才会发送另一个。

小结

Kafka通过 partition data by key 和 pre-partition ordering,满足了大部分需求。如果要保证所有message都顺序处理,则将topic设置为only one partition,此时,变为串行处理。

notes(ningg):单个partition是以什么形式存储在server上的?纯粹的文档文件?Flume的fan-in、fan-out什么含义?fan-in针对的是agent之间,fan-out针对agent内部source–channel之间?

Flume的Kafka sink

Flume中数据送入Kafka,本质上就是一个Kafka sink。很多人都有这个需求,甚至有的还需要将Flume来读取Kafka中的数据(Kafka source)。本次使用的Flume和Kafka的详细版本信息如下:

前人的工作

Flume中Kafka source和Kafka sink都有人在做,整体来说有几个进展:

现在,怎么做?打算学习thilinamb的版本,并且用起来,必要时,形成自己的版本。

notes(ningg):Flume官网虽然还没有发布 1.6 版本,但作为开源软件,能够提前查看针对Kafka source和sink部分的代码吗?JIRA上能不能看?

具体实现

直接参考thilinamb的Kafka 0.8.1.1的实现版本中的README。 说明:thilinamb的工程是用Maven进行管理的,可以作为Existing Maven Project直接导入,然后mvn clean instal即可。

notes(ningg):thilinamb的工程,使用maven进行管理,结构好像挺合理的,有一个parent的project,需要认真学习一下。

Flume Kafka sink原理

在上一部分,虽然实现了Flume中数据送入Kafka,但具体原理是什么?需要深入学习一下。

(TODO)

参考来源

Top