NingG +

Apache Kafka 0.10 技术内幕:数据倾斜详解


要解决数据倾斜的问题,主要从 Producer 入手,弄清楚 Producer 生成的 Msg,是如何选择传输到哪个 Partition 的。只要让 Producer 把生成的 Msg 均匀的分发到各个 Partition 中,就解决了数据倾斜问题。

Producer 产生的数据,送入哪个 Partition


发送 msg 时,需要同时设定:key、msg:

new Java Producer API

对于 key 的取值不同,new Java Producer API 选定的 partition 不同:

new Java Producer API,标志是:send(ProducerRecord()), 示例代码如下:

Producer<String, String> producer = new KafkaProducer(props);
 for(int i = 0; i < 100; i++){
     String key = Integer.toString(i);
     String msg = Integer.toString(i);
     producer.send(new ProducerRecord<String, String>("my-topic", key, msg));

计算 partition 的过程,源代码:

package org.apache.kafka.clients.producer.internals;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin fashion
public class DefaultPartitioner implements Partitioner {
    private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
     * A cheap way to deterministically convert a number to a positive value. When the input is
     * positive, the original value is returned. When the input number is negative, the returned
     * positive value is the original value bit AND against 0x7fffffff which is not its absolutely
     * value.
     * Note: changing this method in the future will possibly cause partition selection not to be
     * compatible with the existing messages already placed on a partition.
     * @param number a given number
     * @return a positive number.
    private static int toPositive(int number) {
        return number & 0x7fffffff;
    public void configure(Map<String, ?> configs) {}
     * Compute the partition for the given record.
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = counter.getAndIncrement();
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return DefaultPartitioner.toPositive(nextValue) % numPartitions;
        } else {
            // hash the keyBytes to choose a partition
            return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    public void close() {}

legacy Scala Producer API

对于 key 的取值不同,legacy Scala Producer API 选定的 partition 不同:

legacy Scala Producer API,标志是:send(KeyedMessage()), 示例代码如下:

import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();
        Properties props = new Properties();
        props.put("", "broker1:9092,broker2:9092 ");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        // 定制的Partitioner,处理 key -- partition 之间的映射关系
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        for (long nEvents = 0; nEvents < events; nEvents++) {
               long runtime = new Date().getTime(); 
               String ip = “192.168.2.” + rnd.nextInt(255);
               String msg = runtime + “,,” + ip;
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);

计算 partition 的过程,源代码:

class DefaultEventHandler[K,V]
if (topicMetadataRefreshInterval >= 0 &&
    SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
  lastTopicMetadataRefreshTime = SystemTime.milliseconds
 * Retrieves the partition id and throws an UnknownTopicOrPartitionException if
 * the value of partition is not between 0 and numPartitions-1
 * @param topic The topic
 * @param key the partition key
 * @param topicPartitionList the list of available partitions
 * @return the partition id
private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
  val numPartitions = topicPartitionList.size
  if(numPartitions <= 0)
    throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
  val partition =
    if(key == null) {
      // If the key is null, we don't really need a partitioner
      // So we look up in the send partition cache for the topic to decide the target partition
      val id = sendPartitionPerTopicCache.get(topic)
      id match {
        case Some(partitionId) =>
          // directly return the partitionId without checking availability of the leader,
          // since we want to postpone the failure until the send operation anyways
        case None =>
          val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
          if (availablePartitions.isEmpty)
            throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
          val index = Utils.abs(Random.nextInt) % availablePartitions.size
          val partitionId = availablePartitions(index).partitionId
          sendPartitionPerTopicCache.put(topic, partitionId)
    } else
      partitioner.partition(key, numPartitions)
  if(partition < 0 || partition >= numPartitions)
    throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
      "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
  trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))


  1. key 为 null 时,legacy Scala Producer API 将随机计算一个 partition index,然后缓存到 sendPartitionPerTopicCache,下次还用这个 partition index.
  2. 到源代码中,详细阅读 DefaultEventHandler 类,其中,描述了清除 sendPartitionPerTopicCache 缓存的条件:间隔,会清除一次,默认:600000 ms.
  3. Kafka 为什么采取上述策略?为了减少 broker 的socket 数量,节省socket 资源,参考:邮件列表
  4. 数据量大、并且持续不断时,使用 legacy Scala Producer API 并且 key 为 null,此时,数据倾斜现象不明显.
  5. 建议:使用 legacy Scala Producer API,一定要设置 key.

key 为 null 时,msg 发送到哪个 Partition

简单回答一下「key 为 null 时,msg 发送到哪个 Partition」答案是:跟使用的 Producer API 有关:

  1. new Java Producer API轮循,round-robin,每次换一个 partition
  2. legacy Scala Producer API:随机一个 partition index,并且缓存起来,每 10 mins 清除一次缓存 ,随机下一个 partition index,并再次缓存


new Java Producer API,从 Kafka 0.8.2.x 开始引入,但后续版本中,仍然保留 legacy Scala Producer API



  1. Producer 发送 msg 时,设置 key
  2. 对 key 没有特殊要求时,建议设置 key 为随机数


微信公众号 ningg, 联系我


微信搜索: 公众号 ningg, 联系我, 交个朋友.