public static void consumer(){ Properties props = new Properties(); props.put("zk.connect", "hadoop-2:2181"); props.put("zk.connectiontimeout.ms", "1000000"); props.put("groupid", "fans_group"); // Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map map = new HashMap(); map.put("fans", 1); // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume Map>> topicMessageStreams = consumerConnector.createMessageStreams(map); List> streams = topicMessageStreams.get("fans"); // create list of 4 threads to consume from each of the partitions ExecutorService executor = Executors.newFixedThreadPool(1); long startTime = System.currentTimeMillis(); // consume the messages in the threads for(final KafkaStream stream: streams) { executor.submit(new Runnable() { public void run() { ConsumerIterator it = stream.iterator(); while (it.hasNext()){ log.debug(byteBufferToString(it.next().message().payload())); } } }); log.debug("use time="+(System.currentTimeMillis()-startTime)); } }