引入pom:
- <dependencies>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>3.0.0</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.25</version>
- </dependency>
- </dependencies>
- package com.atguigu.kafka.producer;
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
-
- public class CustomProducer {
- public static void main(String[] args) {
- // 1. 创建 kafka 生产者的配置对象
- Properties properties = new Properties();
- // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "hadoop100:9092");
-
- // key,value 序列化(必须):key.serializer,value.serializer
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringSerializer");
-
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringSerializer");
- // 3. 创建 kafka 生产者对象
- KafkaProducer<String, String> kafkaProducer = new
- KafkaProducer<String, String>(properties);
- // 4. 调用 send 方法,发送消息
- for (int i = 0; i < 5; i++) {
- kafkaProducer.send(new
- ProducerRecord<>("first","atguigu " + i));
- }
- // 5. 关闭资源
- kafkaProducer.close();
-
-
-
-
- }
- }
测试效果:
回调的信息实际是从队列返回的
只需在异步发送的基础上,再调用一下 get()方法即可。
指定key的值:对key的hashcode做分配
希望将订单表的数据全部发到kafka的一个分区上,怎么处理?
将该表的名称作为key值然后发送即可
如:
如果研发人员可以根据企业需求,自己重新实现分区器。 1)需求 例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区, 不包含 atguigu,就发往 1 号分区。
自定义分区器:
- package com.atguigu.kafka.producer;
-
- import org.apache.kafka.clients.producer.Partitioner;
- import org.apache.kafka.common.Cluster;
-
- import java.util.Map;
-
- public class MyPartitioner implements Partitioner {
-
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- String s = value.toString();
- int partion;
- if(s.contains("atguigu")) {
- partion = 1;
- }else {
- partion=0;
- }
- return partion;
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void configure(Map<String, ?> map) {
-
- }
- }
拷贝全类名,产生关联
测试结论:
- properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
- properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
- properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
- // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
- properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");