• Flink写入kafka的自定义Key值


    第一步:序列化

    1. package com.mz.iot.functions;
    2. import com.mz.iot.bean.AlarmHistoryOrigin;
    3. import com.mz.iot.utils.JsonUtil;
    4. import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
    5. import org.slf4j.Logger;
    6. import org.slf4j.LoggerFactory;
    7. /**
    8. * liuyage
    9. */
    10. public class UserDefinedSerializationSchema implements KeyedSerializationSchema {
    11. private final static Logger logger = LoggerFactory.getLogger(UserDefinedSerializationSchema.class);
    12. String topicName; //取模确定的Topic名称
    13. public UserDefinedSerializationSchema(String topicName){
    14. this.topicName = topicName;
    15. }
    16. @Override
    17. public byte[] serializeKey(String element) {
    18. //element就是flink流中的真实数据,这里可以根据主键下标得到主键的值key,然后下面分区器就根据这个key去决定发送到kafka哪个分区中
    19. AlarmHistoryOrigin aho = JsonUtil.getObjFromJson(element, AlarmHistoryOrigin.class);
    20. StringBuffer sb=new StringBuffer();
    21. sb.append(aho.getProjectId()).append(":");
    22. sb.append(aho.getMeterCode()).append(":");
    23. sb.append(aho.getAlarmType()).append(":");
    24. sb.append(aho.getId()).append(":");
    25. sb.append(aho.getAlarmRuleId());
    26. String key = sb.toString();
    27. //取出key后要转成字节数组返回
    28. return key.getBytes();
    29. }
    30. @Override
    31. public byte[] serializeValue(String element) {
    32. //要序列化的value,这里一般就原封不动的转成字节数组就行了
    33. return element.toString().getBytes();
    34. }
    35. @Override
    36. public String getTargetTopic(String element) {
    37. //本次需求没啥用,可以用于根据字段分发不同topic的需求
    38. return topicName;
    39. }
    40. }

    第二步:自定义分区

    1. package com.mz.iot.functions;
    2. import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
    3. import org.slf4j.Logger;
    4. import org.slf4j.LoggerFactory;
    5. import java.util.Arrays;
    6. /**
    7. * liuyage
    8. */
    9. public class UserDefinedPartitioner extends FlinkKafkaPartitioner {
    10. private final static Logger logger = LoggerFactory.getLogger(UserDefinedPartitioner.class);
    11. /**
    12. *
    13. * @param record 正常记录Value {"id":"15959499868079104","alarmRuleId":"6789709006840532992","beginTime":"2022-09-01 04:42:29","endTime":"2022-09-01 04:42:40","ruleValue":0.60000,"firstValue":0.62000,"alarmType":1,"meterCode":"R19013-0003-0001-440300OII-10248","projectId":"440300OII","acquisitionTime":"2022-09-01 04:42:49","alarmValue":0.62000,"createUser":0,"createTime":"2022-09-01 04:42:49","updateUser":0,"updateTime":"2022-09-01 04:42:49","reasonTag":0,"makeUp":0}
    14. * @param key KeyedSerializationSchema中配置的key 440300OII:R19013-0003-0001-440300OII-10248:1:15959499868079104:6789709006840532992
    15. * @param value KeyedSerializationSchema中配置的value {"id":"15959499868079104","alarmRuleId":"6789709006840532992","beginTime":"2022-09-01 04:42:29","endTime":"2022-09-01 04:42:40","ruleValue":0.60000,"firstValue":0.62000,"alarmType":1,"meterCode":"R19013-0003-0001-440300OII-10248","projectId":"440300OII","acquisitionTime":"2022-09-01 04:42:49","alarmValue":0.62000,"createUser":0,"createTime":"2022-09-01 04:42:49","updateUser":0,"updateTime":"2022-09-01 04:42:49","reasonTag":0,"makeUp":0}
    16. * @param targetTopic iot-history-alarm-topic-test
    17. * @param partitions partition列表[0, 1, 2, 3, 4, 5]
    18. * @return
    19. */
    20. @Override
    21. public int partition(Object record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    22. //这里接收到的key是上面MySchema()中序列化后的key,需要转成string,然后取key的hash值取模kafka分区数量
    23. int part = Math.abs(new String(key).hashCode() % partitions.length);
    24. logger.info("record:"+record.toString()+"key:"+new String(key)+" value:"+new String(value) +" targetTopic:"+targetTopic +" partitions:"+ Arrays.toString(partitions) );
    25. /**
    26. *
    27. record:{"id":"15959499868079104","alarmRuleId":"6789709006840532992","beginTime":"2022-09-01 04:42:29","endTime":"2022-09-01 04:42:40","ruleValue":0.60000,"firstValue":0.62000,"alarmType":1,"meterCode":"R19013-0003-0001-440300OII-10248","projectId":"440300OII","acquisitionTime":"2022-09-01 04:42:49","alarmValue":0.62000,"createUser":0,"createTime":"2022-09-01 04:42:49","updateUser":0,"updateTime":"2022-09-01 04:42:49","reasonTag":0,"makeUp":0}
    28. key:440300OII:R19013-0003-0001-440300OII-10248:1:15959499868079104:6789709006840532992
    29. value:{"id":"15959499868079104","alarmRuleId":"6789709006840532992","beginTime":"2022-09-01 04:42:29","endTime":"2022-09-01 04:42:40","ruleValue":0.60000,"firstValue":0.62000,"alarmType":1,"meterCode":"R19013-0003-0001-440300OII-10248","projectId":"440300OII","acquisitionTime":"2022-09-01 04:42:49","alarmValue":0.62000,"createUser":0,"createTime":"2022-09-01 04:42:49","updateUser":0,"updateTime":"2022-09-01 04:42:49","reasonTag":0,"makeUp":0}
    30. targetTopic:iot-history-alarm-topic-test
    31. partitions:[0, 1, 2, 3, 4, 5]
    32. */
    33. return part;
    34. }
    35. }

    第三步:调用自定义序列化和自定义的分区

    1. public static FlinkKafkaProducer getKafkaSinkSimple(String topic,Boolean flag) {
    2. Properties props = new Properties();
    3. props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ConfigUtils.SERVICE_KAFKA_HOST);
    4. if (ConfigUtils.SERVICE_KAFKA_SASL_PLAINTEXT) {
    5. props.put("security.protocol", "SASL_PLAINTEXT");
    6. props.put("sasl.mechanism", "PLAIN");
    7. props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='" + ConfigUtils.SERVICE_KAFKA_USERNAME + "' password='" + ConfigUtils.SERVICE_KAFKA_PASSWORD + "';");
    8. }
    9. //设置生产数据的超时时间
    10. props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15 * 60 * 1000 + "");
    11. if(flag){
    12. return new FlinkKafkaProducer<>(
    13. topic,
    14. new UserDefinedSerializationSchema(topic), //自定义序列化
    15. props,
    16. Optional.of(new UserDefinedPartitioner()), //自定义的分区
    17. FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
    18. 6
    19. );
    20. }else{
    21. KeyedSerializationSchema keyedSerializationSchema = new KeyedSerializationSchemaWrapper(new SimpleStringSchema());
    22. return new FlinkKafkaProducer<>(
    23. topic,
    24. keyedSerializationSchema,
    25. props,
    26. Optional.empty(),
    27. FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
    28. 6
    29. );
    30. }
    31. }

  • 相关阅读:
    vue单页面怎么做SEO优化
    java毕业设计软件基于ssh+mysql+jsp的电影|影院购票选座系统
    Faster R-CNN详解
    【Linux】Alibaba Cloud Linux 3 yum 安装 PHP8.1
    mybatis-3.5.0使用插件拦截sql以及通用字段赋值
    Requests爬虫方法
    Security思想项目总结
    机器学习实战读书笔记——端到端的机器学习项目
    JavaEE之CSS②(前端)
    骑砍2霸主MOD开发(6)-使用C#-Harmony修改本体游戏逻辑
  • 原文地址:https://blog.csdn.net/liuygvip/article/details/126829510