• 【Azure 事件中心】使用Azure AD认证方式创建Event Hub Consume Client + 自定义Event Position


    问题描述

    当使用SDK连接到Azure Event Hub时,最常规的方式为使用连接字符串。这种做法参考官网文档就可成功完成代码:https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send

    只是,如果使用Azure AD认证方式进行访问,代码需要如何修改呢? 如何来使用AAD的 TokenCredential呢?

     

    分析问题

    在使用Connection String的时候,EventProcessorClientBuilder使用connectionString方法配置连接字符串。

     

    如果使用Azure AD认证,则需要先根据AAD中注册应用获取到Client ID, Tenant ID, Client Secret,然后把这些内容设置为系统环境变量

    1. AZURE_TENANT_ID :对应AAD 注册应用页面的 Tenant ID
    2. AZURE_CLIENT_ID :对应AAD 注册应用页面的 Application (Client) ID
    3. AZURE_CLIENT_SECRET :对应AAD 注册应用的 Certificates & secrets 中创建的Client Secrets

    然后使用 credential 初始化 EventProcessorClientBuilder 对象

    注意点:

    1) DefaultAzureCredentialBuilder 需要指定 Authority Host为 Azure China

    2) EventProcessorClientBuilder . Credential 方法需要指定Event Hub Namespce 的域名

     

    操作实现

    第一步:为AAD注册应用赋予操作Event Hub Data的权限

    Azure 提供了以下 Azure 内置角色,用于通过 Azure AD 和 OAuth 授予对事件中心数据的访问权限:

    1. Azure 事件中心数据所有者 (Azure Event Hubs Data Owner): 使用此角色可以授予对事件中心资源的完全访问权限。
    2. Azure 事件中心数据发送者 (Azure Event Hubs Data Sender) : 使用此角色可以授予对事件中心资源的发送访问权限。
    3. Azure 事件中心数据接收者 (Azure Event Hubs Data Receiver): 使用此角色可以授予对事件中心资源的使用/接收访问权限。

    本实例中,只需要接收数据,所以只赋予了 Azure Event Hubs Data Receiver权限。

     

    第二步:在Java 项目中添加SDK依赖  

    添加在pom.xml文件中 dependencies 部分的内容为:azure-identity , azure-messaging-eventhubs , azure-messaging-eventhubs-checkpointstore-blob,最好都是用最新版本,避免出现运行时出现类型冲突或找不到

    复制代码
    
    
    
      4.0.0
    
      com.example
      spdemo
      1.0-SNAPSHOT
    
      spdemo
      
      http://www.example.com
    
      
        UTF-8
        1.8
        1.8
      
    
      
        
          com.azure
          azure-messaging-eventhubs
          5.12.2
        
        
          com.azure
          azure-messaging-eventhubs-checkpointstore-blob
          1.14.0
        
        
          com.azure
          azure-identity
          1.5.3
        
        
          junit
          junit
          4.11
          test
        
      
    
      
        
          
            org.apache.maven.plugins
            maven-assembly-plugin
            2.5.5
            
              
                
                  com.example.App
                
              
              
                jar-with-dependencies
              
            
          
        
      
    
    复制代码

     

    第三步:添加完整代码

    复制代码
    package com.example;
    
    import com.azure.core.credential.TokenCredential;
    import com.azure.identity.AzureAuthorityHosts;
    import com.azure.identity.DefaultAzureCredentialBuilder;
    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    
    import java.io.IOException;
    import java.sql.Date;
    import java.time.Instant;
    import java.time.temporal.TemporalUnit;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.function.Consumer;
    
    /**
     * Hello world!
     *
     */
    public class App {
        // private static final String connectionString ="";
        // private static final String eventHubName = "";
        // private static final String storageConnectionString = "";
        // private static final String storageContainerName = "";
    
        public static void main(String[] args) throws IOException {
            System.out.println("Hello World!");
    
            // String connectionString =""; 
            // The fully qualified namespace for the Event Hubs instance. This is likely to
            // be similar to:
            // {your-namespace}.servicebus.windows.net
            // String fullyQualifiedNamespace =".servicebus.chinacloudapi.cn";
            // String eventHubName = "";
    
            String storageConnectionString = System.getenv("storageConnectionString");
            String storageContainerName = System.getenv("storageContainerName");
            String fullyQualifiedNamespace = System.getenv("fullyQualifiedNamespace");
            String eventHubName = System.getenv("eventHubName");
    
            TokenCredential credential = new DefaultAzureCredentialBuilder().authorityHost(AzureAuthorityHosts.AZURE_CHINA)
                    .build();
    
            // Create a blob container client that you use later to build an event processor
            // client to receive and process events
            BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
                    .connectionString(storageConnectionString)
                    .containerName(storageContainerName)
                    .buildAsyncClient();
            
        
            // EventHubProducerClient
            // EventHubProducerClient client = new EventHubClientBuilder()
            // .credential(fullyQualifiedNamespace, eventHubName, credential)
            // .buildProducerClient();
    
            Map initialPartitionEventPosition = new HashMap<>();
            initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000));
     
            // EventProcessorClientBuilder
            // Create a builder object that you will use later to build an event processor
            // client to receive and process events and errors.
            EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
                    // .connectionString(connectionString, eventHubName)
                    .credential(fullyQualifiedNamespace, eventHubName, credential)
                    .initialPartitionEventPosition(initialPartitionEventPosition)
                    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
                    .processEvent(PARTITION_PROCESSOR)
                    .processError(ERROR_HANDLER)
                    .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
    
            // EventPosition.f
            // Use the builder object to create an event processor client
            EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
    
            System.out.println("Starting event processor");
            eventProcessorClient.start();
    
            System.out.println("Press enter to stop.");
            System.in.read();
    
            System.out.println("Stopping event processor");
            eventProcessorClient.stop();
            System.out.println("Event processor stopped.");
    
            System.out.println("Exiting process");
    
        }
    
        public static final Consumer PARTITION_PROCESSOR = eventContext -> {
            PartitionContext partitionContext = eventContext.getPartitionContext();
            EventData eventData = eventContext.getEventData();
    
            System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
                    partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
            // Every 10 events received, it will update the checkpoint stored in Azure Blob
            // Storage.
            if (eventData.getSequenceNumber() % 10 == 0) {
                eventContext.updateCheckpoint();
            }
        };
    
        public static final Consumer ERROR_HANDLER = errorContext -> {
            System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
                    errorContext.getPartitionContext().getPartitionId(),
                    errorContext.getThrowable());
        };
    }
    复制代码

     

     

     

    附录:自定义设置 Event Position,当程序运行时,指定从Event Hub中获取消息的 Sequence Number

    使用EventPosition对象中的fromSequenceNumber方法,可以指定一个序列号,Consume端会根据这个号码获取之后的消息。其他的方法还有 fromOffset(指定游标) / fromEnqueuedTime(指定一个时间点,获取之后的消息) / earliest(从最早开始) / latest(从最后开始获取新的数据,旧数据不获取) 

        Map initialPartitionEventPosition = new HashMap<>();
        initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000));

    注意:

    Map 中的String 对象为Event Hub的分区ID,如果Event Hub有2个分区,则它的值分别时0,1.

    EventPosition 设置的值,只在Storage Account所保存在CheckPoint Store中的值没有,或者小于此处设定的值时,才会起效果。否则,Consume 会根据从Checkpoint中获取的SequenceNumber为准。

     

     

     

     

    参考资料

    授予对 Azure 事件中心的访问权限 :https://docs.azure.cn/zh-cn/event-hubs/authorize-access-event-hubs

    对使用 Azure Active Directory 访问事件中心资源的应用程序进行身份验证 : https://docs.azure.cn/zh-cn/event-hubs/authenticate-application

    使用 Java 向/从 Azure 事件中心 (azure-messaging-eventhubs) 发送/接收事件 : https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send

     

     

     [END]

     

     

     
  • 相关阅读:
    【运维知识进阶篇】Ansible自动化运维-ad-hoc详解
    什么是副业思维,副业应该怎么做,用创业思维分析副业的可行性
    5分钟搞懂MySQL - 行转列
    ssm网上书城系统毕业设计-附源码180919
    Windows系统版本下载
    ElasticSearch学习和使用 (使用head软件可视化es数据)
    iPhone 隐私新规下的“大地震”:四大平台损失近百亿美元,“连用户是男是女都分不清”
    第三节:kafka sarama 遇到Bug?
    AFL模糊测试+GCOV覆盖率分析
    Zookeeper实战案例(1)
  • 原文地址:https://www.cnblogs.com/lulight/p/16547914.html