• 使用AWS的API Gateway实现websocket


    问题

    最近业务上面需要使用到WebSocket长连接来解决某些业务场景。

    一图胜千言

    AWS API Gateway WebSocket
    注意:这里承担WebSocket服务器的是AWS API Gateway;后面的EC2业务服务,其实都是REST接口服务。
    这里主要关注API Gateway和REST业务服务怎么实现API Gateway要求的WebSocket协议。VPCLink,ELB和Auto Scaling不是我们的重点。这里假设这些都已经完成了。

    初始化网关

    选择【构建】,如下图:
    websocket构建
    设置网关名称,类似如下图:
    设置网关名称
    全部选择3种路由,如下图:
    网关路由选择
    集成方式暂时选择mock方式:
    网关mock集成
    阶段,随便填个名称:
    网关阶段
    预览一下,即将创建的空websocket网关:
    websocket网关预览

    网关事件

    初始化完成部署后,进入网关会出现如下图:
    初始化网关3种路由

    路由选择表达式

    $request.body.action:这个表示客户端与网关通过ws方式连接上后,可以向网关发送json数据来触发网关来进行路由调用。这里的action就是被选中路由资源的关键字段而已。类似发送这样:

    {
    	"action": "getList"
    }
    
    • 1
    • 2
    • 3

    接下来介绍默认自带的三种路由:
    $connect:表示客户端通过ws方式与网关进行了连接;
    $disconnect:表示ws连接中断;
    $default:如果客户端发送过来的消息不是json方式,默认就这这个路由。
    一般来说,前两个都需要集成后台业务程序进行相关处理;最后一个默认根据业务情况来定,我这里不需要业务程序处理。
    接下来介绍前2种路由的集成配置。

    $connect路由事件

    connect路由配置
    上面是该事件的主体配置。接下来我们先看集成请求是怎么配置的?

    集成请求

    connect集成请求配置
    这里选择vpclink方式对后台业务程序进行集成。这里值得关注的是请求集成模板参数如下:
    模板选择表达式:\$default
    模板密钥名称:$default
    生成模板内容:

    {
      "connectionId": "$context.connectionId",
      "payload": $input.body
    }
    
    • 1
    • 2
    • 3
    • 4

    这样后台业务程序,就能够读取到connectionId和payload,这两个字段。业务程序类似如下:

       @PostMapping("/connect")
        public R connect(@RequestBody WebsocketConn websocketConn) {
            // 记录connectionId
            String connectionId = websocketConn.getConnectionId();
            log.info(String.format("WebSocket ID: %s 上线", connectionId));
            redisRepository.setExpire(String.format("%s:%s", TokenConstant.WEBSOCKET_API_ID_PREFIX, connectionId),
                    connectionId, TokenConstant.SESSTION_API_TOKEN_EXPIRE);
            return R.status(true);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这段Java代码主要就是实现connect接口,将aws api gateway的ws连接ID保存到redis里面,然后,返回200给api gateway。

    集成响应

    集成响应配置
    这里集成响应配置,主要就是添加响应$default

    $disconnect路由事件

    disconnect事件主要配置
    上图就是disconnect路由事件的主要配置,只要配置一个集成请求就完事了。

    集成请求

    集成请求
    集成请求事件如上图。
    这里选择vpclink方式对后台业务程序进行集成。这里值得关注的是请求集成模板参数如下:
    模板选择表达式:\$default
    模板密钥名称:$default
    生成模板内容:

    {
      "connectionId": "$context.connectionId"
    }
    
    • 1
    • 2
    • 3

    这样后台业务程序,就能够读取到connectionId,这两个字段。业务程序类似如下:

       @PostMapping("/disconnect")
        public R disconnect(@RequestBody WebsocketConn websocketConn) {
            // 移除connectionId
            String connectionId = websocketConn.getConnectionId();
            log.info(String.format("WebSocket ID: %s 下线", connectionId));
            redisRepository.del(String.format("%s:%s", TokenConstant.WEBSOCKET_API_ID_PREFIX, connectionId));
            return R.status(true);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    这段Java代码主要就是实现disconnect接口,将aws api gateway的ws连接时保存的连接ID从redis里面移除,然后,返回200给api gateway。

    $default路由事件

    default路由配置
    以上是default路由配置事件。

    集成请求

    集成请求配置
    这里选择mock方式,不需要与后台业务程序集成。这里值得关注的是请求集成模板参数如下:
    模板选择表达式:\$default
    模板密钥名称:$default
    生成模板内容:

    {
      "statusCode": 200
    }
    
    • 1
    • 2
    • 3

    部署网关

    部署网关
    将API Gateway部署到阶段里面去。

    查看阶段

    阶段
    从阶段可以看到两种URL,第一种就是通过ws协议进行连接到URL,第二种就是操作这个API Gateway的接口。

    推送消息(Java SDK)

    Maven

    <dependency>
        <groupId>com.amazonawsgroupId>
        <artifactId>aws-java-sdk-coreartifactId>
        <version>1.12.348version>
    dependency>
    
    
    <dependency>
        <groupId>com.amazonawsgroupId>
        <artifactId>aws-java-sdk-apigatewaymanagementapiartifactId>
        <version>1.12.348version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    ApiGatewayConfig.java(Spring配置类)

    package org.xxxx.config;
    
    import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
    import com.amazonaws.client.builder.AwsClientBuilder;
    import com.amazonaws.regions.Region;
    import com.amazonaws.regions.Regions;
    import com.amazonaws.services.apigatewaymanagementapi.AmazonApiGatewayManagementApiAsync;
    import com.amazonaws.services.apigatewaymanagementapi.AmazonApiGatewayManagementApiAsyncClientBuilder;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ApiGatewayConfig {
        @Value("${api.gateway.endpoint}")
        private String endpoint;
    
        @Value("${region.name:cn-north-1}")
        private String regionName;
        @Bean
        public AmazonApiGatewayManagementApiAsync amazonApiGatewayManagementApi() {
            Region region = Regions.getCurrentRegion();
            if (region != null){
                regionName = region.getName();
            }
            AwsClientBuilder.EndpointConfiguration endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(endpoint, regionName);
            return AmazonApiGatewayManagementApiAsyncClientBuilder.standard()
                    .withEndpointConfiguration(endpointConfiguration)
                    .withCredentials(DefaultAWSCredentialsProviderChain.getInstance())
                    .build();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    注意:这里的EC2运行角色需要设置api gateway调用权限类似:AmazonAPIGatewayInvokeFullAccess,也可以根据情况进行自定义权限设置。

    推送消息

        @Override
        public void pushAllFromMessage(String message) {
            Set keys = redisRepository.getListKey(TokenConstant.WEBSOCKET_API_ID_PREFIX);
            if (!CollectionUtils.isEmpty(keys)){
                for (String key : keys) {
                    String connectionId = (String) redisRepository.get(key);
                    PostToConnectionRequest postToConnectionRequest = new PostToConnectionRequest();
                    postToConnectionRequest.withConnectionId(connectionId);
                    ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBytes());
                    byteBuffer = byteBuffer.duplicate();
                    postToConnectionRequest.withData(byteBuffer);
                    amazonApiGatewayManagementApi.postToConnectionAsync(postToConnectionRequest);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    这里主要使用postToConnectionAsync方法进行消息推送,注意这里的ByteBuffer的使用,是需要调用duplicate方法的。

    测试

    安装wscat客户端:

    npm install -g wscat
    
    • 1

    使用wscat连接上去,即可接收消息推送了。

    wscat -c wss://aabbccddee.execute-api.cn-north-1.amazonaws.com.cn/test
    
    • 1

    总结

    到这里就完成了AWS API Gateway中使用websocket的功能。注意,我们的业务程序不要集成任何websocket库,只用正常的restful方式实现aws要求的那个几个路由事件接口即可。再集成api gateway相关的sdk就可以对websocket连接进行管理了。

    参考:

  • 相关阅读:
    NumPy使用不当引起的内存泄漏
    jsp简单实现新闻发布系统中用户注册确认和用户模拟登录功能的开发
    我抄底了被清算的NFT,却被OpenSea上了锁
    【Deep Learning 1】遗传算法GA
    光伏瓦屋顶
    numpy的基本操作
    三表联查(怎么把表1中存在但表2表3不存在的姓名查找出来)
    ps 命令 快捷键 知识点汇总
    三维点云的格网(体素)组织
    智慧农业:农林牧数据可视化监控平台
  • 原文地址:https://blog.csdn.net/fxtxz2/article/details/128082227