• Debezium发布历史153


    原文地址: https://debezium.io/blog/2023/07/10/custom-http-signaling-notification/

    欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.

    Debezium signaling and notifications - Part 2: Customisation
    July 10, 2023 by Anisha Mohanty
    debezium features notifications signaling custom channels

    欢迎来到这个系列文章专门讨论信号和通知德贝齐姆!这篇文章作为本系列的第二部分,我们将在这里讨论如何自定义Debezns中的信号和通知通道。

    Debezum2.3引入了信号和通知能力的新改进。您可以设置新的信号和通知通道,除了预先定义的信号和通知通道由DEBeZUS提供。这一功能使用户能够定制系统,以适应其独特的需求,并将其与现有的基础设施或第三方解决方案相结合。它能够通过精确捕捉和传递信号事件并通过首选渠道触发通知,对数据变化进行有效的监测和积极的响应。

    本系列的第一篇文章, 代贝兹的信号和通知 ,概述德贝兹的信号和通知功能。它还讨论了可用的渠道及其在各种情况下的用例。

    定制信号和通知通道
    在Debezum中,可以定制信号和通知通道以适应特定的需求。例如,我们可以通过创建一个HTTP 信号和通知的通道。这个HTTP 通道接收来自HTTP端点的信号,并且可以在信号传递时将通知送回端点。

    让我们来探索一个演示如何创建和利用HTTP 使用德贝兹波斯特连接器的信号和通知通道,A 模拟服务器 发出信号, 邮筒 通过http端点接收通知。

    设置HTTP 信号通道:
    在发生相关数据库更改时,配置德贝兹(Debezum)连接器以接收信号。

    建立一个服务,利用HTTP 频道。该服务可以是数据库、第三方应用程序或任何其他可以发送HTTP请求的系统。在本例中,我们将使用模拟服务器将信号发送到Debezns。模拟服务器是一种可以用来模拟HTTP请求和响应的服务。

    配置模拟服务器,使用适当的http方法(例如)通过http端点发送信号。,邮政)。

    定制HTTP 通道设置,以定义http端点网址、身份验证、标题和任何所需的其他参数。

    设置HTTP 通知渠道:
    一旦信号被Debezum接收和处理,它就可以触发将通知张贴到http端点。在本例中,我们将使用HTTP 频道。邮政箱是一种服务,可以用来接收HTTP请求并查看请求详细信息。

    定制HTTP 通知的通道设置,在邮件箱中创建一个资料库,并定义http端点URL、身份验证、标题和任何所需的附加参数。

    使用适当的HTTP方法将通知事件转发到http端点即邮政资料箱(例如。,邮政)。通知有效载荷可以根据需要定制。

    在博客帖子中,此示例的完整源代码在Debezum示例存储库中提供。 http-signal-notification 目录。

    创建一个Java项目来构建HTTP 信号和通知渠道。运行以下命令,以使用MAVIN创建一个新的Java项目:

    mvn archetype:generate
    -DgroupId=io.debezium.examples
    -DartifactId=http-signaling-notification
    将下列从属关系添加到pom.xml 德贝佐姆版本的文件(2.3及后版本):

    io.debezium debezium-core 2.3.0.Final 若要使用模拟服务器接收信号,请创建一个定义模拟服务器服务的码头组合文件。模拟服务器服务的配置如下:

    services:
    mockServer:
    image: mockserver/mockserver:latest
    ports:
    - 1080:1080
    environment:
    - MOCKSERVER_WATCH_INITIALIZATION_JSON=true
    - MOCKSERVER_INITIALIZATION_JSON_PATH=/config/initializerJson.json
    volumes:
    - ./initializerJson.json:/config/initializerJson.json
    环境变量MOCKSERVER_WATCH_INITIALIZATION_JSON 和MOCKSERVER_INITIALIZATION_JSON_PATH 设置为使模拟服务器监视初始化JSON文件中的更改并指定其路径。…initializerJson.json 文件,其中包含的http请求和响应信息的信号,是安装到模拟服务器容器。

    …initializerJson.json 文件定义了对路径的模拟http请求/api/signal 使用查询字符串参数code=10969 .当模拟服务器收到此请求时,它将与一个含有id ,type ,以及data .响应的状态代码为200,表示成功响应。定义:initializerJson.json 文件如下:

    [
    {
    “httpRequest” : {
    “method” : “GET”,
    “path” : “/api/signal”,
    “queryStringParameters” : {
    “code” : [“10969”]
    }
    },
    “httpResponse” : {
    “body”: “{“id”:“924e3ff8-2245-43ca-ba77-2af9af02fa07”,“type”:“log”,“data”:{“message”: “Signal message received from http endpoint.”}}”,
    “statusCode”: 200
    }
    }
    ]
    id一个用来识别信号实例的任意唯一字符串。

    type :发送的信号类型。在这个例子中,类型是log 它要求连接器向连接器的日志文件添加一个条目。在处理信号之后,连接器在日志中打印指定的消息。

    data :传递到信号事件的jon格式参数。在这个例子中,message 参数传递到信号事件。

    创造HTTP 通过实施SignalChannelReader 接口如下:

    public class HttpSignalChannel implements SignalChannelReader {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpSignalChannel.class);

    public static final String CHANNEL_NAME = "http";
    private static final List SIGNALS = new ArrayList<>();
    public CommonConnectorConfig connectorConfig;
    
        @Override
    public String name() { 
        return CHANNEL_NAME;
    }
    
    @Override
    public void init(CommonConnectorConfig connectorConfig) { 
        this.connectorConfig = connectorConfig;
    }
    
    @Override
    public List read() { 
        try {
            String requestUrl = "http://mockServer:1080/api/signal?code=10969";
    
            // send http request to the mock server
            HttpClient httpClient = HttpClient.newHttpClient();
            HttpRequest request = HttpRequest.newBuilder()
                    .uri(URI.create(requestUrl))
                    .GET()
                    .header("Content-Type", "application/json")
                    .build();
    
            // read the response
            HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
           if (response.statusCode() == 200) {
               ObjectMapper mapper = new ObjectMapper();
               String responseBody = response.body();
    
               // parse the response body
               JsonNode signalJson = mapper.readTree(responseBody);
               Map additionalData = signalJson.has("additionalData") ? mapper.convertValue(signalJson.get("additionalData"), new TypeReference<>() {}) : new HashMap<>();
               String id = signalJson.get("id").asText();
               String type = signalJson.get("type").asText();
               String data = signalJson.get("data").toString();
               SignalRecord signal = new SignalRecord(id, type, data, additionalData);
    
               LOGGER.info("Recorded signal event '{}' ", signal);
    
               // process the signal
               SIGNALS.add(signal);
                } else {
                    LOGGER.warn("Error while reading signaling events from endpoint: {}", response.statusCode());
                }
            } catch (IOException | InterruptedException e) {
                LOGGER.warn("Exception while preparing to process the signal '{}' from the endpoint", e.getMessage());
                e.printStackTrace();
            }
        return SIGNALS;
        }
    
    @Override
    public void close() { 
       SIGNALS.clear();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    }
    …name() 方法返回信号通道的名称。若要启用德贝兹使用通道,请指定名称http 在连接器的signal.enabled.channels 财产。
    …init() 方法可以用来初始化HTTP通道需要的特定配置、变量或连接。
    …read() 方法从http端点读取信号并返回一个列表SignalRecord 将由德贝兹连接器处理的对象。
    …close() 方法关闭所有分配的资源。
    通过实现NotificationChannel 接口如下:

    public class HttpNotificationChannel implements NotificationChannel {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpNotificationChannel.class);

    public static final String CHANNEL_NAME = "http";
    private static final String NOTIFICATION_PREFIX = "[HTTP NOTIFICATION SERVICE]";
    
    @Override
    public String name() { 
        return CHANNEL_NAME;
    }
    
    @Override
    public void init(CommonConnectorConfig config) { 
        // custom configuration
    }
    
    @Override
    public void send(Notification notification) { 
        LOGGER.info(String.format("%s Sending notification to http channel", NOTIFICATION_PREFIX));
        String binId = createBin();
        sendNotification(binId, notification);
    }
    
    private static String createBin()  {
        // Create a bin on the server
        try {
            HttpRequest request = HttpRequest.newBuilder()
                    .uri(new URI("https://www.toptal.com/developers/postbin/api/bin"))
                    .POST(HttpRequest.BodyPublishers.ofString(" "))
                    .build();
    
            HttpClient httpClient = HttpClient.newHttpClient();
            HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
    
            if (response.statusCode() == HTTP_CREATED) {
                String binId = response.body().replaceAll(".*\"binId\":\"([^\"]+)\".*", "$1");
                LOGGER.info("Bin created: " + response.body());
                return binId;
            }
        } catch (URISyntaxException | InterruptedException | IOException e) {
            throw new RuntimeException(e);
        }
        return null;
    }
    
    private static void sendNotification (String binId, Notification notification) {
        // Get notification from the bin
        try {
            ObjectMapper mapper = new ObjectMapper();
            String notificationString = mapper.writeValueAsString(notification);
            HttpRequest request = HttpRequest.newBuilder()
                    .uri(new URI("https://www.toptal.com/developers/postbin/" + binId))
                    .header("Content-Type", "application/json")
                    .POST(HttpRequest.BodyPublishers.ofString(notificationString))
                    .build();
    
            HttpClient httpClient = HttpClient.newHttpClient();
            HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
    
            if (response.statusCode() == HTTP_OK) {
                LOGGER.info("Notification received : " + response.body());
            }
        } catch (URISyntaxException | InterruptedException | IOException e) {
            throw new RuntimeException(e);
        }
    }
    
    @Override
    public void close() { 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    }
    …name() 方法返回通知通道的名称。若要启用德贝兹使用通道,请指定http 在连接器的notification.enabled.channels 财产。
    …init() 方法可以用来初始化通道所需的特定配置、变量或连接。
    …send() 方法将通知发送到通道。该通知载有SignalRecord 由德贝兹接头处理的对象。
    …close() 方法关闭所有分配的资源。
    宣布HTTP 的信号和通知渠道META-INF/services 目录下io.debezium.pipeline.signal.SignalChannelReader 和io.debezium.pipeline.notification.channels.NotificationChannel 文件。

    将Java项目编译并导出为JAR文件。这可以使用MAVIN或您首选的构建工具来完成。将JAR文件复制到包含您想要使用的Debezum连接器的JAR文件的目录中。例如,如果您想使用与Debezum后角连接器的自定义信号和通知通道,请将JAR文件复制到/kafka/connect/debezium-connector-postgres 目录。

    这个例子提供了一个码头组合文件,其中定义了必要的服务,其中包括模拟服务器、动物园管理员、卡夫卡连接和波斯特格雷斯数据库。

    要启动服务,请运行以下命令:

    export DEBEZIUM_VERSION=2.3
    docker-compose up -d
    在确保服务的运行和运行之后,以及之后的下一步是注册连接器。这包括创建一个连接器配置文件。我们创建一个名为register-postgres.json 拥有下列属性:

    {
    “name”: “inventory-connector”,
    “config”: {
    “connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
    “tasks.max”: 1,
    “database.hostname”: “postgres”,
    “database.port”: 5432,
    “database.user”: “postgres”,
    “database.password”: “postgres”,
    “database.dbname” : “postgres”,
    “topic.prefix”: “dbserver1”,
    “schema.include.list”: “inventory”,
    “signal.enabled.channels”: “http”,
    “notification.enabled.channels”: “http”
    }
    }
    …signal.enabled.channels 属性指定连接器要使用的信号通道。在这种情况下,连接器使用http 信号通道。
    …notification.enabled.channels 属性指定连接器要使用的通知通道。在这种情况下,连接器使用http 通知频道。
    现在我们已经准备好了连接器配置文件,我们可以通过执行以下命令来注册卡夫卡连接器:

    curl -i -X POST -H “Accept:application/json”
    -H “Content-Type:application/json” http://localhost:8083/connectors/
    -d @register-postgres.json
    一旦连接器成功注册,您可以查看连接器日志以观察信号事件。日志提供了对连接器处理和进展的深入了解,包括任何与信号相关的信息。你会遇到类似以下的日志信息:

    Recorded signal event ‘SignalRecord{id=‘924e3ff8-2245-43ca-ba77-2af9af02fa07’, type=‘log’, data=’{“message”:“Signal message received from http endpoint.”}’, additionalData={}}’ [io.debezium.examples.signal.HttpSignalChannel]
    此外,您可能会注意到与发送到邮件箱的通知事件有关的日志消息。例如:

    [HTTP NOTIFICATION SERVICE] Sending notification to http channel [io.debezium.examples.notification.HttpNotificationChannel]
    Bin created: {“binId”:“1688742588469-1816775151528”,“now”:1688742588470,“expires”:1688744388470} [io.debezium.examples.notification.HttpNotificationChannel]
    它提供关于通知事件的信息,例如创建带有唯一标识符(BINID)的垃圾箱和其他相关细节。若欲从邮政局检索通知事件,请取binId 从日志消息,并使用它来请求来自邮政信息库的相应通知事件。要查看通知事件,您可以使用以下网址访问邮政资料库:https://www.toptal.com/developers/postbin/b/:binId .替换:binId 从连接器日志中获得的实际二进制的URL中。

    发送到邮政局的通知事件如下:
    图片来自官网原文
    在这里插入图片描述

    资料夹后预览
    结论
    在本教程中,我们探讨了如何为德贝齐亚连接器创建自定义信号和通知通道。我们创建了一个自定义信号通道,它接收来自http端点的信号事件。我们还创建了一个自定义通知通道,该通道将通知事件发送到http端点。

    Debezum的全面的信号和通知系统提供了与第三方解决方案的无缝集成,使用户能够随时了解Debezum连接器的状态和进展。该系统的可扩展性使用户能够定制信号和通知通道,以适应其定制需求。

    请继续关注本系列的第3部分,我们将在这里探索关于jmx信号和通知的内容。与此同时,您可以查看Debezum文档,了解更多有关信号和通知通道的信息。

    如果您有任何问题或反馈,请随时与我们联系。 邮寄清单 或 #通信秘书长 关于郁金香聊天的频道。我们很乐意听到你的消息!

  • 相关阅读:
    HarmonyOS方舟开发框架容器类API的介绍与使用
    写一个计算器(C语言版本),可以求出:整数的加,减,乘,除四则运算
    解决访问出现404和500的问题(Tomcat与Servlet、JSP、JDK的版本适配问题)
    【笔者感悟】笔者的学习感悟【十】
    C#语法基础
    java 线程池
    《模拟龙生》|500行Go代码写一个随机冒险游戏|巨龙修为挑战开启
    leetcode 242. Valid Anagram(有效的字母异位词)
    凭着这份《微服务架构实战》,带你立足实战落地微服务架构
    Selenium自动化测试框架工作原理你明白了吗?
  • 原文地址:https://blog.csdn.net/a309450028a/article/details/135878601