# Kafka 0.8.1：Producer API and Producer Configs

## Producer

• Flume架构中有Sink，是用来将Flume收集到的数据送出去的；Flume下的Kafka Sink插件，在Flume看来，就是个Sink；
• Kafka架构中有Producer，是用来向Kafka broker中送入数据的；Flume下的Kafka Sink插件，在Kafka看来，就是个Producer；

### Kafka Producer API

/**
*  V: type of the message
*  K: type of the optional key associated with the message
*/

class kafka.javaapi.producer.Producer<K,V> {

public Producer(ProducerConfig config);

/**
* Sends the data to a single topic, partitioned by key, using either the
* synchronous or the asynchronous producer
* @param message the producer data object that encapsulates the topic, key and message data
*/
public void send(KeyedMessage<K,V> message);

/**
* Use this API to send data to multiple topics
* @param messages list of producer data objects that encapsulate the topic, key and message data
*/
public void send(List<KeyedMessage<K,V>> messages);

/**
* Close API to close the producer pool connections to all Kafka brokers.
*/
public void close();
}


### 0.8.0 Producer Example

The Producer class is used to create new messages for a specific Topic and optional Partition.

If using Java you need to include a few packages for the Producer and supporting classes:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


The first step in your code is to define properties for how the Producer finds the cluster, serializes the messages and if appropriate directs the message to a specific Partition.

• Producer如何找到Kafka Cluster；
• message传输的格式；（serialize，序列化）
• 如何将message存入指定的Partition中；

These properties are defined in the standard Java Properties object:

Properties props = new Properties();

props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);


The first property, “metadata.broker.list” defines where the Producer can find a one or more Brokers to determine the Leader for each topic. This does not need to be the full set of Brokers in your cluster but should include at least two in case the first Broker is not available. No need to worry about figuring out which Broker is the leader for the topic (and partition), the Producer knows how to connect to the Broker and ask for the meta data then connect to the correct Broker.

The second property “serializer.class” defines what Serializer to use when preparing the message for transmission to the Broker. In our example we use a simple String encoder provided as part of Kafka. Note that the encoder must accept the same type as defined in the KeyedMessage object in the next step.

notes(ningg)：“Note that the encoder must accept the same type as defined in the KeyedMessage object in the next step.” 什么含义？ KeyedMessage

It is possible to change the Serializer for the Key (see below) of the message by defining “key.serializer.class” appropriately. By default it is set to the same value as “serializer.class”.

notes(ningg)：Kafka是按照key进行partition的，每个message绑定的key也是需要传输到broker的，传输过程中也需要进行序列化，

The third property “partitioner.class” defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven’t defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.

The last property “request.required.acks” tells Kafka that you want your Producer to require an acknowledgement from the Broker that the message was received. Without this setting the Producer will ‘fire and forget’ possibly leading to data loss. Additional information can be found here.

notes(ningg)：有个问题，即使Broker在接收到message之后，返回了ack信息，那Producer提供了重发机制吗？还是Producer只是进行登记？

Next you define the Producer object itself:

Producer<String, String> producer = new Producer<String, String>(config);


Note that the Producer is a Java Generic and you need to tell it the type of two parameters. The first is the type of the Partition key, the second the type of the message. In this example they are both Strings, which also matches to what we defined in the Properties above.

Producer是一个Java Generic（泛型），需要输入两个参数，<String, String>，第一个参数是Partition key的类型，第二个是message的类型

notes(ningg)：java中Generic的用法、注意事项有哪些？上面说的Partition key，到底指什么？是properties中的属性和属性值吗？不是的，查看源代码，Partition key就是按照key进行partition的key。

Now build your message:

Random rnd = new Random();
long runtime = new Date().getTime();
String ip = “192.168.2.” + rnd.nextInt(255);
String msg = runtime + “,www.example.com,” + ip;


In this example we are faking a message for a website visit by IP address. First part of the comma-separated message is the timestamp of the event, the second is the website and the third is the IP address of the requester. We use the Java Random class here to make the last octet of the IP vary so we can see how Partitioning works.（上面msg中是伪造的一个网站访问记录）

Finally write the message to the Broker:

KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);

producer.send(data);


The “page_visits” is the Topic to write to. Here we are passing the IP as the partition key. Note that if you do not include a key, even if you’ve defined a partitioner class, Kafka will assign the message to a random partition.

KeyedMessage<String, String>(topic, message)或者KeyedMessage<String, String>(topic, key, message)，如果没输入key，那么即使设定了partitioner.class也不会对message分发到相应partition的，原因很简单，因为真的没有key。

Full Source:

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();

Properties props = new Properties();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(config);

for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = “192.168.2.” + rnd.nextInt(255);
String msg = runtime + “,www.example.com,” + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
}


Partitioning Code:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {

}

public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
}
return partition;
}

}


The logic takes the key, which we expect to be the IP address, finds the last octet and does a modulo operation on the number of partitions defined within Kafka for the topic. The benefit of this partitioning logic is all web visits from the same source IP end up in the same Partition. Of course so do other IPs, but your consumer logic will need to know how to handle that. （将有时间顺序的message放到同一个partition中）

Before running this, make sure you have created the Topic page_visits. From the command line:

bin/kafka-create-topic.sh --topic page_visits --replica 3 --zookeeper localhost:2181 --partition 5


Make sure you include a --partition option so you create more than one. （要使用--partition来创建多个partition，否则可能只有一个）

Now compile and run your Producer and data will be written to Kafka.

To confirm you have data, use the command line tool to see what was written:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic page_visits --from-beginning


<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
</exclusions>
</dependency>


### 几个情况

• 如果只在metadata.broker.list中配置一个broker，那么Producer能够识别出其他broker吗？
• 如果能够识别出未配置的broker，那么，只配置一个broker不就行了吗？
• 如果不能识别出未配置的broker，那Kafka集群中动态增加了broker，岂不是需要重新启动flume？（因为metadata.broker.list实际上是flume的配置，要更新这一参数配置，就需要重启flume）

## Producer配置的详细参数

Essential configuration properties for the producer include:（Producer必须的参数有几个，如下）

• request.required.acks，broker收到producer发来的message后，是否ack？
• producer.type，这个什么滴干活？
• serializer.class，message从producer发往broker的过程中，需要序列化；

• Property
• Default
• Description
• null
• This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
• request.required.acks
• 0
• This value controls when a produce request is considered completed. Specifically, how many other brokers must have committed the data to their log and acknowledged this to the leader? Typical values are
• 0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).（不等待ack信息）
• 1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).（leader完成数据写入）
• -1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.（所有replica都完成数据写入）
• request.timeout.ms
• 10000
• The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.（broker收到producer发来的message后，如果需要返回ack信息，那这个参数设定了broker返回ack信息的时间限制，如果超过这个时间，则broker向producer返回一个error信息）
• producer.type
• sync
• This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are (1) async for asynchronous send and (2) sync for synchronous send. By setting the producer to async we allow batching together of requests (which is great for throughput) but open the possibility of a failure of the client machine dropping unsent data.（producer发送message的方式：同步、异步；设置为异步时，producer处理的吞吐量会提升，但可能丢失数据）
• serializer.class
• kafka.serializer.DefaultEncoder
• The serializer class for messages. The default encoder takes a byte[] and returns the same byte[].（message的序列化方法，默认是byte[]
• key.serializer.class
• defaults to the same as for messages if nothing is given.
• The serializer class for keys .
• partitioner.class
• kafka.producer.DefaultPartitioner（The default partitioner is based on the hash of the key.）
• The partitioner class for partitioning messages amongst sub-topics. （将message放入哪个partition中）
• compression.codec
• none
• This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are “none”, “gzip” and “snappy”.（producer向broker发送的信息，是否进行压缩，包含：key、message信息。）
• compressed.topics
• null
• This parameter allows you to set whether compression should be turned on for particular topics. If the compression codec is anything other than NoCompressionCodec, enable compression only for specified topics if any. If the list of compressed topics is empty, then enable the specified compression codec for all topics. If the compression codec is NoCompressionCodec, compression is disabled for all topics（当开启compression.codec时，通过设置compressed.topics，设置只针对某些特定的topic进行压缩，默认，对所有的topic都进行压缩）
• message.send.max.retries
• 3
• This property will cause the producer to automatically retry a failed send request. This property specifies the number of retries when such failures occur. Note that setting a non-zero value here can lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgement to be lost.（当producer发送message失败后，尝试重新发送的次数；特别说明：如果message发送成功，但broker返回的ack信息丢失时，会有message重发，即，此处有消息重复发送）
• retry.backoff.ms
• 100
• Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.（producer在进行重新发送message之前，都会refresh metadata，主要目标，查看是否更新了topic的leader；因为leader election需要一段时间，因此，在refresh metadata之前，需要等待一段时间，retry.backoff.ms参数设置的就是等待的时间，等待选出新的topic leader）
• 600 * 1000
• The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available…). It will also poll regularly (default: every 10min so 600000ms). （正常情况，多长时间刷新一次broker metadata，即，刷新间隔）
• If you set this to a negative value, metadata will only get refreshed on failure. （<0时，仅当发送message失败时，才刷新）
• If you set this to zero, the metadata will get refreshed after each message sent (not recommended). （0，每次发送完message之后，都刷新，不推荐
• Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed（重要提示：无论设置刷新间隔为多少，具体刷新metadata都发生在producer发送message之后，因此，如果一直没有message发送，就不会有metadata刷新）
• queue.buffering.max.ms
• 5000
• Maximum time to buffer data when using async mode. For example a setting of 100 will try to batch together 100ms of messages to send at once. This will improve throughput but adds message delivery latency due to the buffering.（当使用producer.type为async模式时，这一参数才有用，含义：一时间为单位，将这一时间单位内的message一起发送给broker，这样有利于提高throughput，但会增加时延。）
• queue.buffering.max.messages
• 10000
• The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped.（当producer.type使用async时，producer能够缓存的unsent message的数量，如果超过这一数量，producer就会blocked？message就会被丢弃？具体什么情况？）
• queue.enqueue.timeout.ms
• -1
• The amount of time to block before dropping messages when running in async mode and the buffer has reached queue.buffering.max.messages. （当queue.buffering.max.message设定的值已经触顶，等待多久block，之后就开始丢弃message）
• If set to 0 events will be enqueued immediately or dropped if the queue is full (the producer send call will never block).
• If set to -1 the producer will block indefinitely and never willingly drop a send.
• batch.num.messages
• 200
• The number of messages to send in one batch when using async mode. The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached.（在async模式下，当message数量达到batch.num.messages时，或者，当等待时间达到queue.buffer.max.ms时，producer都会发送一次缓存的message）
• send.buffer.bytes
• 100 * 1024
• Socket write buffer size（socket写缓存的大小）
• client.id
• ””
• The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.（用户自己定义的producer标识，会伴随发送的message一起发送，用于追踪message的来源）

More details about producer configuration can be found in the scala class kafka.producer.ProducerConfig.

notes(ningg)：几个新的理解：

• producer.type：sync、async，当设置为async时，能够提升吞吐量，但是会丢失数据？丢失，不能重发吗？
• request.required.acks：设置是否需要broker在完成数据写入后，向producer返回ack信息；一个问题：如果broker上数据写入失败，那，producer会进行重发吗？有没有类似的机制？
• queue.enqueue.timeout.me：其中说明的producer block什么含义？还会继续缓存未发送的message吗？

• Server上，有进程监听port后，在服务器上无法再启动一个进程来监听这一port；
• 在远端通过telnet能够与server上这一port建立连接，并且，多个client都能与server上这一port建立连接；

## New Producer Configs（补充）

We are working on a replacement for our existing producer. The code is available in trunk now and can be considered beta quality. Below is the configuration for the new producer.

• Name
• Type
• Default
• Importance
• Description
• bootstrap.servers
• list
• null
• high
• A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Data will be load balanced over all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,…. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down). If no server in this list is available sending data will fail until on becomes available.
• acks
• string
• 1
• high
• The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common:
• acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1.
• acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.（leader将message写入local log后，直接返回ack信息；如果leader，返回ack信息后，leader宕机了，那其他follwer上并没有这条message，将导致message丢失）
• acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
• Other settings such as acks=2 are also possible, and will require the given number of acknowledgements but this is generally less useful.
• buffer.memory
• long
• 33554432
• high
• The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.（用于存储未发送出去的message，当producer接收到的message速度大于发送message速度时，producer will block，或者抛出异常）
• This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.（什么含义？需仔细琢磨）
• compression.type
• string
• none
• high
• The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, or snappy. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).
• retries
• int
• 0
• high
• Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second succeeds, then the second record may appear first.（设定，message尝试重发的次数；这个重发机制，可能会改变message之间的相互顺序）
• batch.size
• int
• 16384
• medium
• The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.（将发送到同一partition的多条message集中起来发送，构成一个batch）
• No attempt will be made to batch records larger than this size.
• Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.（发送给broker的request包含多个batch？每一个batch对应一个partition）
• A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.
• client.id
• string
• null
• medium
• The id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included with the request. The application can set any string it wants as this has no functional purpose other than in logging and metrics.
• linger.ms
• long
• 0
• medium
• The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will ‘linger’ for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load.（当producer收到一个message后，不直接发送出去，而是，等待linger.ms时间，目的：相同partition的多个message同时发送。）
• max.request.size
• int
• 1048576
• medium
• The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.（限制单个request的大小）
• int
• 32768
• medium
• The size of the TCP receive buffer to use when reading data（上层读取TCP数据时，一次读取的缓冲单元？）
• send.buffer.bytes
• int
• 131072
• medium
• The size of the TCP send buffer to use when sending data
• timeout.ms
• int
• 30000
• medium
• The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include the network latency of the request.（server等待follower返回ack信息的时间，这个时间是指server端的时间）
• block.on.buffer.full
• boolean
• true
• low
• When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default this setting is true and we block, however in some scenarios blocking is not desirable and it is better to immediately give an error. Setting this to false will accomplish that: the producer will throw a BufferExhaustedException if a recrord is sent and the buffer space is full.（默认true，当memory buffer中内容满了之后，producer不再接收新的message；如果设置为false，则当memory buffer中内容满了之后，producer会直接抛出异常BufferExhaustedException
• long
• 60000
• low
• The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic’s partitions. This configuration controls the maximum amount of time we will block waiting for the metadata fetch to succeed before throwing an exception back to the client.（当第一次向topic中传入数据时，需要从server请求metadata，参数metadata.fetch.timeout.ms设定了发送metadata请求后，producer等待的时间，如果超时，则抛出异常。）
• long
• 300000
• low
• The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions.（定期请求metadata的时常）
• metric.reporters
• list
• []
• low
• A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.（什么含义？生成测试报告？测试什么？为什么要测试？）
• metrics.num.samples
• int
• 2
• low
• The number of samples maintained to compute metrics.（计算指标时，保留的samples的个数）
• metrics.sample.window.ms
• long
• 30000
• low
• The metrics system maintains a configurable number of samples over a fixed window size. This configuration controls the size of the window. For example we might maintain two samples each measured over a 30 second period. When a window expires we erase and overwrite the oldest window.（选出sample的window大小）
• reconnect.backoff.ms
• long
• 10
• low
• The amount of time to wait before attempting to reconnect to a given host when a connection fails. This avoids a scenario where the client repeatedly attempts to connect to a host in a tight loop.（当与一个host断开连接后，等待多长时间，再去进行连接，避免过于频繁的无效连接）
• retry.backoff.ms
• long
• 100
• low
• The amount of time to wait before attempting to retry a failed produce request to a given topic partition. This avoids repeated sending-and-failing in a tight loop.

notes(ningg)metrics的含义？为什么有这个？干什么的？

## 杂谈

• 做事的方向对不对？
• 做事的人脑袋是否灵光？
• 做事的人，是否投入了足够的时间？

• 做事的方向对不对，只要针对做的事情，当前能够达到基本一致，方向基本确定，而不是一团乱麻，那就可以开始做下去了，而在后期的过程中，可能会涉及到多次调整、迭代，这些都是可以预见的；
• 做事的人脑袋是否灵光，事情是由人来做的，做的人脑袋行不行？基础理论、基本操作技能，基本的世界观：劳动创造价值，获得报酬；还是，跟着大牛有肉吃？（这本质是希望拿牛人的劳动价值，换取自己的报酬）
• 做事的人时间投入，天资尚可的人就行了，团队高低档次的人都需要，但是，有一点，如果不投入时间，或者时间很少，那又如何保证产出？特别是针对以前就没有涉足的领域，需要不畏艰险、持续的投入时间，才能有所理解、有所深入；脑袋还可以，但做事不投入充足时间的人，这个就不太好。

Top