java实现Kafka生产者Producer工具类

摘 要

本文将介绍java实现Kafka生产者Producer的简单工具类

 

相关版本

kafka:kafka_2.10-0.10.1.1

jdk:1.7

相关代码实现

  1. package com.itunic.util;
  2. import java.util.List;
  3. import java.util.Map;
  4. import java.util.Properties;
  5. import org.apache.kafka.clients.producer.KafkaProducer;
  6. import org.apache.kafka.clients.producer.ProducerRecord;
  7. import net.sf.json.JSONObject;
  8. public class KafkaTools {
  9.     /**
  10.      * 
  11.      * 私有静态方法,创建Kafka生产者
  12.      * 
  13.      * @author IG
  14.      * @Date 2017年4月14日 上午10:32:32
  15.      * @version 1.0.0
  16.      * @return KafkaProducer
  17.      */
  18.     private static KafkaProducer<String, String> createProducer() {
  19.         Properties properties = new Properties();
  20.         properties.put("bootstrap.servers""huakServer1:9092,huakServer2:9092");// 声明kafka
  21.         // properties.put("value.serializer",
  22.         // "org.apache.kafka.common.serialization.ByteArraySerializer");
  23.         // properties.put("key.serializer",
  24.         // "org.apache.kafka.common.serialization.ByteArraySerializer");
  25.         properties.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
  26.         properties.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
  27.         return new KafkaProducer<String, String>((properties));
  28.     }
  29.     /**
  30.      * 
  31.      * 传入kafka约定的topicName,json格式字符串,发送给kafka集群
  32.      * 
  33.      * @author IG
  34.      * @Date 2017年4月14日 下午1:29:09
  35.      * @version 1.0.0
  36.      * @param topicName
  37.      * @param jsonMessage
  38.      */
  39.     public static void sendMessage(String topicName, String jsonMessage) {
  40.         KafkaProducer<String, String> producer = createProducer();
  41.         producer.send(new ProducerRecord<String, String>(topicName, jsonMessage));
  42.         producer.close();
  43.     }
  44.     /**
  45.      * 
  46.      * 传入kafka约定的topicName,json格式字符串数组,发送给kafka集群<br> 
  47.      * 用于批量发送消息,性能较高。
  48.      * 
  49.      * @author IG
  50.      * @Date 2017年4月14日 下午2:00:12
  51.      * @version 1.0.0
  52.      * @param topicName
  53.      * @param jsonMessages
  54.      * @throws InterruptedException
  55.      */
  56.     public static void sendMessage(String topicName, String... jsonMessages) throws InterruptedException {
  57.         KafkaProducer<String, String> producer = createProducer();
  58.         for (String jsonMessage : jsonMessages) {
  59.             producer.send(new ProducerRecord<String, String>(topicName, jsonMessage));
  60.         }
  61.         producer.close();
  62.     }
  63.     /**
  64.      * 
  65.      * 传入kafka约定的topicName,Map集合,内部转为json发送给kafka集群 <br>
  66.      * 用于批量发送消息,性能较高。
  67.      * 
  68.      * @author IG
  69.      * @Date 2017年4月14日 下午2:01:18
  70.      * @version 1.0.0
  71.      * @param topicName
  72.      * @param mapMessageToJSONForArray
  73.      */
  74.     public static void sendMessage(String topicName, List<Map<Object, Object>> mapMessageToJSONForArray) {
  75.         KafkaProducer<String, String> producer = createProducer();
  76.         for (Map<Object, Object> mapMessageToJSON : mapMessageToJSONForArray) {
  77.             String array = JSONObject.fromObject(mapMessageToJSON).toString();
  78.             producer.send(new ProducerRecord<String, String>(topicName, array));
  79.         }
  80.         producer.close();
  81.     }
  82.     /**
  83.      * 
  84.      * 传入kafka约定的topicName,Map,内部转为json发送给kafka集群
  85.      * 
  86.      * @author IG
  87.      * @Date 2017年4月14日 下午1:30:10
  88.      * @version 1.0.0
  89.      * @param topicName
  90.      * @param mapMessageToJSON
  91.      */
  92.     public static void sendMessage(String topicName, Map<Object, Object> mapMessageToJSON) {
  93.         KafkaProducer<String, String> producer = createProducer();
  94.         String array = JSONObject.fromObject(mapMessageToJSON).toString();
  95.         producer.send(new ProducerRecord<String, String>(topicName, array));
  96.         producer.close();
  97.     }
  98.     public static void main(String[] args) throws InterruptedException {
  99.         // System.out.println(System.getProperty("file.encoding"));
  100.         String[] s = new String[] { "{\"userName\":\"赵四31\",\"pwd\":\"lisi\",\"age\":13}",
  101.                 "{\"userName\":\"赵四41\",\"pwd\":\"lisi\",\"age\":14}",
  102.                 "{\"userName\":\"赵四51\",\"pwd\":\"lisi\",\"age\":15}" };
  103.         // KafkaTools.sendMessage("logstest",
  104.         // "{\"userName\":\"赵四\",\"pwd\":\"lisi\",\"age\":13}");
  105.         /*
  106.          * for (String a : s) { System.out.println(a); Thread.sleep(3000);
  107.          * KafkaTools.sendMessage(topicName, jsonMessages); }
  108.          */
  109.          KafkaTools.sendMessage("logstest", s);
  110.     }
  111. }
  • java实现Kafka生产者Producer工具类已关闭评论
  • 95 views
  • A+
所属分类:未分类
avatar