使用java作为kafka的生产者和消费者

生产者

废话少说,直接上代码

/**
 * Created by xuguoliang on 2017/7/8.
 */
import java.util.Properties;

import kafka.javaapi.producer.Producer;  
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;

public class Main {

    private final Producer<String, String> producer;
    public final static String TOPIC = "datacenter2";
    public final static String brokerList = "192.168.199.10:9092,192.168.199.11:9092,192.168.199.12:9092";
    private Main(){
        Properties props = new Properties();
        //此处配置的是kafka的端口
        props.put("metadata.broker.list", brokerList);

        //配置value的序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        //配置key的序列化类
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");

        //request.required.acks
        //0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
        //1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
        //-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.
        props.put("request.required.acks","-1");

        producer = new Producer<String, String>(new ProducerConfig(props));
    }

    public void produce() {
        int messageNo = 10;
        final int COUNT = 100;

        while (messageNo < COUNT) {
            String key = String.valueOf(messageNo);
            String data = "hello kafka message " + key;
            producer.send(new KeyedMessage<String, String>(TOPIC, key ,data));
            System.out.println(data);
            messageNo ++;
        }
    }
    public static void main(String []args) {
        System.out.println("开始 kafka 生产者");
        new Main().produce();
    }
}

下面是pom.xml配置

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xuguoliang.kafka</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals><goal>shade</goal></goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>Main</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>  

需要把log4j的版本改成1.2.17,1.2.15版本在使用mvn打包的时候报错了,原因是1.2.15版本的log4j使用了https://maven-repository.dev.java.net/nonav/repository 这个域名已经失效了

消费者

上代码,这里使用了多线程的消费者模式

package com.xuguoliang.kafka; /**  
 * Created by xuguoliang on 2017/7/8.
 */
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.Properties;

import kafka.consumer.ConsumerConfig;  
import kafka.consumer.ConsumerIterator;  
import kafka.consumer.KafkaStream;  
import kafka.javaapi.consumer.ConsumerConnector;

public class Consumer extends Thread{  
    private String topic;

    public Consumer(String topic){
        super();
        this.topic = topic;
    }


    @Override
    public void run() {
        System.out.println("线程启动:"+this.getId());
        ConsumerConnector consumer = createConsumer();
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1); // 一次从主题中获取一个数据
        Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
        ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();
        long id = this.getId();
        while(iterator.hasNext()){
            String message = new String(iterator.next().message());
            System.out.println("线程:"+id+" 接收到: " + message);
        }
    }

    private ConsumerConnector createConsumer() {
        Properties config = new PropertyUtil().getConfig();
        Properties properties = new Properties();
        properties.put("zookeeper.connect", config.getProperty("zookeeper"));//声明zk
        //properties.put("metadata.broker.list", "192.168.199.10:9092,192.168.199.11:9092,192.168.199.12:9092");
        properties.put("group.id", "group1");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "smallest");
        return kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }


    public static void main(String[] args) {
        for(int i=1; i<2; i=i+1)
        {
            new Consumer("datacenter2").start();
        }
    }
}