NingG +

Kafka 0.8.1:Producer API and Producer Configs


最近在做Flume与Kafka的整合,其中用到了一个工程:flume-ng-kafka-sink,本质上就是Flume的一个插件:Kafka sink。遇到一个问题:Kafka sink通过设置kafak broker的ip:port来寻找broker,那就有一个问题,如果设置连接的kafka broker 宕掉了,flume的数据是不是就送不出去了?


开始介绍Producer之前,说个小问题:上面背景中一直在说Flume的Sink:Kafka Sink,那与Kafka producer什么关系呢?为什么这次标题是Kafka Producer,而丝毫未提Flume Sink?这个问题很好,说明读者在思考,大概说几点:


Kafka Producer API

下面是kafka.javaapi.producer.Producer类的java API,实际上这个类是scala编写的,

 *  V: type of the message
 *  K: type of the optional key associated with the message
class kafka.javaapi.producer.Producer<K,V> {

  public Producer(ProducerConfig config);

   * Sends the data to a single topic, partitioned by key, using either the
   * synchronous or the asynchronous producer
   * @param message the producer data object that encapsulates the topic, key and message data
  public void send(KeyedMessage<K,V> message);

   * Use this API to send data to multiple topics
   * @param messages list of producer data objects that encapsulate the topic, key and message data
  public void send(List<KeyedMessage<K,V>> messages);

   * Close API to close the producer pool connections to all Kafka brokers.
  public void close();

具体如何使用上述Producer API,可参考0.8.0 Producer Example

0.8.0 Producer Example

研究要深入,上面提到的0.8.0 Producer Example,下面简要介绍一下。

The Producer class is used to create new messages for a specific Topic and optional Partition.

If using Java you need to include a few packages for the Producer and supporting classes:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

The first step in your code is to define properties for how the Producer finds the cluster, serializes the messages and if appropriate directs the message to a specific Partition.


These properties are defined in the standard Java Properties object:

Properties props = new Properties();
props.put("", "broker1:9092,broker2:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);

The first property, “” defines where the Producer can find a one or more Brokers to determine the Leader for each topic. This does not need to be the full set of Brokers in your cluster but should include at least two in case the first Broker is not available. No need to worry about figuring out which Broker is the leader for the topic (and partition), the Producer knows how to connect to the Broker and ask for the meta data then connect to the correct Broker.


The second property “serializer.class” defines what Serializer to use when preparing the message for transmission to the Broker. In our example we use a simple String encoder provided as part of Kafka. Note that the encoder must accept the same type as defined in the KeyedMessage object in the next step.


notes(ningg):“Note that the encoder must accept the same type as defined in the KeyedMessage object in the next step.” 什么含义? KeyedMessage

It is possible to change the Serializer for the Key (see below) of the message by defining “key.serializer.class” appropriately. By default it is set to the same value as “serializer.class”.



The third property “partitioner.class” defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven’t defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.


The last property “request.required.acks” tells Kafka that you want your Producer to require an acknowledgement from the Broker that the message was received. Without this setting the Producer will ‘fire and forget’ possibly leading to data loss. Additional information can be found here.

最后一项参数request.required.acks,设定Broker在接收到message之后,是否返回一个确认信息(ack)。如果没有这个信息,那么很有可能fire and forget并且丢失数据。更多Kafka的相关配置信息,参考:Kafka Configuration


Next you define the Producer object itself:

Producer<String, String> producer = new Producer<String, String>(config);

Note that the Producer is a Java Generic and you need to tell it the type of two parameters. The first is the type of the Partition key, the second the type of the message. In this example they are both Strings, which also matches to what we defined in the Properties above.

Producer是一个Java Generic(泛型),需要输入两个参数,<String, String>,第一个参数是Partition key的类型,第二个是message的类型

notes(ningg):java中Generic的用法、注意事项有哪些?上面说的Partition key,到底指什么?是properties中的属性和属性值吗?不是的,查看源代码,Partition key就是按照key进行partition的key。

Now build your message:

Random rnd = new Random();
long runtime = new Date().getTime();
String ip = “192.168.2.” + rnd.nextInt(255);
String msg = runtime + “,,” + ip;

In this example we are faking a message for a website visit by IP address. First part of the comma-separated message is the timestamp of the event, the second is the website and the third is the IP address of the requester. We use the Java Random class here to make the last octet of the IP vary so we can see how Partitioning works.(上面msg中是伪造的一个网站访问记录)

Finally write the message to the Broker:

KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);

The “page_visits” is the Topic to write to. Here we are passing the IP as the partition key. Note that if you do not include a key, even if you’ve defined a partitioner class, Kafka will assign the message to a random partition.

KeyedMessage<String, String>(topic, message)或者KeyedMessage<String, String>(topic, key, message),如果没输入key,那么即使设定了partitioner.class也不会对message分发到相应partition的,原因很简单,因为真的没有key。

Full Source:

import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
	public static void main(String[] args) {
		long events = Long.parseLong(args[0]);
		Random rnd = new Random();
		Properties props = new Properties();
		props.put("", "broker1:9092,broker2:9092 ");
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		props.put("partitioner.class", "example.producer.SimplePartitioner");
		props.put("request.required.acks", "1");
		ProducerConfig config = new ProducerConfig(props);
		Producer<String, String> producer = new Producer<String, String>(config);
		for (long nEvents = 0; nEvents < events; nEvents++) { 
			   long runtime = new Date().getTime();  
			   String ip = “192.168.2.” + rnd.nextInt(255); 
			   String msg = runtime + “,,” + ip; 
			   KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);

Partitioning Code:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
	public SimplePartitioner (VerifiableProperties props) {
	public int partition(Object key, int a_numPartitions) {
		int partition = 0;
		String stringKey = (String) key;
		int offset = stringKey.lastIndexOf('.');
		if (offset > 0) {
		   partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
	   return partition;

The logic takes the key, which we expect to be the IP address, finds the last octet and does a modulo operation on the number of partitions defined within Kafka for the topic. The benefit of this partitioning logic is all web visits from the same source IP end up in the same Partition. Of course so do other IPs, but your consumer logic will need to know how to handle that. (将有时间顺序的message放到同一个partition中)

Before running this, make sure you have created the Topic page_visits. From the command line:

bin/ --topic page_visits --replica 3 --zookeeper localhost:2181 --partition 5

Make sure you include a --partition option so you create more than one. (要使用--partition来创建多个partition,否则可能只有一个)

Now compile and run your Producer and data will be written to Kafka.

To confirm you have data, use the command line tool to see what was written:

bin/ --zookeeper localhost:2181 --topic page_visits --from-beginning




思考1:Kafka 0.7.2版本中,直接在Producer中配置Zookeeper,Producer通过Zookeeper来获知Broker的位置,简单来说,应用与Kafka之间是解耦的,可以在不修改Producer信息的情况下,动态增减Broker。


思考2:如果network interrupt,producer会如何动作?记录log?还是抛出异常?



针对Kafka 0.8.1版本,这一部分介绍的Producer配置信息,主要参考两个地方:

Essential configuration properties for the producer include:(Producer必须的参数有几个,如下)


More details about producer configuration can be found in the scala class kafka.producer.ProducerConfig.




New Producer Configs(补充)

下面是今后Kafka Producer会采用的新的配置参数,当前,可以有一个基本的了解。

We are working on a replacement for our existing producer. The code is available in trunk now and can be considered beta quality. Below is the configuration for the new producer.







今天突然想起一件事,几年前,跟某位好友一起走路,无意间说起坚韧这种性格,我就问道:如果要在午门城墙上打一个洞,如何才能做到?谈到锲而不舍,如果一个人没有这种精神,那遇到困难的事情,就难办了;后来又说起,今后工作的打算,我们基本达成一致:精挑细选公司,一旦入门后,就当自己是公司的创始人,然后,返老还童,恢复到20多岁年轻小伙儿的年纪 ,只不过,返老还童的代价是放弃对于公司的所有权、职务等,以这种心态去工作,重塑自己的公司、再造辉煌,可以说想象还是比较大胆的;基于这种定位,每次做事,都是创始人心态,全力做好。(每次整理blog都会到夜里12点才睡,略累呀,如果下班就走,那就有时间了,只不过工作要做好,要投入时间,下班很难按时走,权衡吧)

微信公众号 ningg, 联系我


微信搜索: 公众号 ningg, 联系我, 交个朋友.
