• vertx学习


    写在前面

    新公司用到了vertx,所以这里学习下。

    源码

    1:vertx是啥?

    是个框架吗?不是。只是一个工具类,只不过提供的功能比较全面,如http,websocket,tcp,json处理,定时任务,文件IO等。

    2:Vert.x Core

    核心对象io.vertx.core.Vertx

    2.1:定时器

    // 使用vertx执行定时执行任务
    @Test
    public void timerWithVertx() throws Exception {
        Vertx vertx = Vertx.vertx();
        vertx.setPeriodic(1000, id -> {
            // 这个处理器将会每隔一秒被调用一次
            System.out.println("timer fired!");
        });
        Thread.sleep(999999);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    输出:

    timer fired!
    timer fired!
    timer fired!
    timer fired!
    timer fired!
    timer fired!
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.2:简单的http调用

    @Test
    public void httpCall() throws Exception {
        Vertx vertx = Vertx.vertx();
        HttpServer server = vertx.createHttpServer();
        server.requestHandler(request -> {
            // 服务器每次收到一个HTTP请求时这个处理器将被调用
            request.response().end("hello world!!!");
        });
    
        // 监听端口9999
        server.listen(9999);
        Thread.sleep(999999);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    测试:

    E:\workspace-idea\dongshidaddy-labs-new>curl http://localhost:9999
    hello world!!!
    
    • 1
    • 2

    2.3:获取文件大小

    // 获取文件大小
    @Test
    public void fileSizeTest() throws Exception {
        Vertx vertx = Vertx.vertx();
        FileSystem fs = vertx.fileSystem();
        Future<FileProps> future = fs.props("d:\\test\\starter.zip");
        future.onComplete((AsyncResult<FileProps> ar) -> {
            if (ar.succeeded()) {
                FileProps props = ar.result();
                System.out.println("File size = " + props.size());
            } else {
                System.out.println("Failure: " + ar.cause().getMessage());
            }
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    运行:

    File size = 69281
    
    • 1

    2.4:组合多个future,任意一个失败则失败

    // 组合多个future,任意一个失败则失败
    @Test
    public void CompositeFutureTest() throws Exception {
        Vertx vertx = Vertx.vertx();
        HttpServer httpServer = vertx.createHttpServer();
        httpServer.requestHandler(request -> {
            // 服务器每次收到一个HTTP请求时这个处理器将被调用
            request.response().end("hello world!!!");
        });
        HttpServer netServer = vertx.createHttpServer();
        netServer.requestHandler(request -> {
            // 服务器每次收到一个HTTP请求时这个处理器将被调用
            request.response().end("hello world!!!!");
        });
        Future<HttpServer> httpServerFuture = httpServer.listen(8889);
    
        Future<HttpServer> netServerFuture = netServer.listen(9998);
        // 所有的成功才算是成功
        // 1:如果是希望其中一个成功就算是成功,则可以使用any方法
        // 2:如果是希望获取返回结果,则可以使用resultAt方法接受一个整数,和future的list一一对应
        // 3:如果是希望不论成功和失败所有的future都执行则可以使用join,虽然所有的future都会执行,但同all必须是所有的future都成功才算是成功,否则算是失败
        CompositeFuture.all(httpServerFuture, netServerFuture).onComplete(ar -> {
            if (ar.succeeded()) {
                // 所有服务器启动完成
                System.out.println("8889,9998全部监听成功");
            } else {
                // 有一个服务器启动失败
                System.out.println("yyyy");
            }
        });
    
        Thread.sleep(999999);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    运行:

    8889,9998全部监听成功
    
    • 1

    2.5:使用verticle实现http服务器

    Verticle是vertx提供给我们用来封装一个具体功能的对象,可以更好的进行管理,提供生命周期的能力等!!!使用vertx的话,一般使用这种方式来进行编程。

    • 定义一个http的verticle
    public class MyVerticle1 extends AbstractVerticle {
    
        private HttpServer server;
    
        public void start(Promise<Void> startPromise) {
            server = vertx.createHttpServer().requestHandler(req -> {
                req.response()
                        .putHeader("content-type", "text/plain")
                        .end("Hello from Vert.x!");
            });
    
            // Now bind the server:
            server.listen(8080, res -> {
                if (res.succeeded()) {
                    System.out.println("bind 8080 suc!");
                    startPromise.complete();
                } else {
                    System.out.println("bind 8080 failed!");
                    startPromise.fail(res.cause());
                }
            });
        }
    
    
        @Override
        public void start() throws Exception {
            System.out.println("verticle 1 start !!!");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 部署
    @Test
    public void verticleTest1() throws Exception {
        Vertx vertx = Vertx.vertx();
        //1: 如果是想利用多核提高并发性能,也可以部署多个verticle实例,如下:
        /*
        DeploymentOptions options = new DeploymentOptions().setInstances(16);
        vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
            */
        // 2:传入配置
        /*
        JsonObject config = new JsonObject().put("name", "tim").put("directory", "/blah");
        DeploymentOptions options = new DeploymentOptions().setConfig(config);
        vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
    
        获取配置:
        传入之后,这个配置可以通过 Context 对象或使用 config 方法访问。这个配置会以 JSON 对象(JsonObject)的形式返回, 因此您可以用下边代码读取数据:
        System.out.println("Configuration: " + config().getString("name"));
            */
        vertx.deployVerticle(new MyVerticle1(), res -> {
            if (res.succeeded()) {
                // 部署成功打印部署id,可以通过其来撤销部署,如下:
                /*
                vertx.undeploy(deploymentID, res -> {
                    if (res.succeeded()) {
                    System.out.println("Undeployed ok");
                    } else {
                    System.out.println("Undeploy failed!");
                    }
                });
                    */
                System.out.println("verticle 1 deploy suc, deploy id is: " + res.result());
            }
        });
    
        Thread.sleep(999999);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 测试
    bind 8080 suc!
    verticle 1 deploy suc, deploy id is: 6d64fec4-89dc-4ff8-b254-cd43920584f8
    
    E:\workspace-idea\dongshidaddy-labs-new>curl http://localhost:8080
    Hello from Vert.x!
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.6:一次性计时器

    // 一次性计时器
    @Test
    public void timerTest1() throws Exception {
        Vertx vertx = Vertx.vertx();
        long timerID = vertx.setTimer(1000, id -> {
            System.out.println("And one second later this is printed");
        });
        System.out.println("First this is printed, timerID is: " + timerID);
        Thread.sleep(999999);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    运行:

    First this is printed, timerID is: 0
    And one second later this is printed
    
    • 1
    • 2

    2.7:周期性计时器.

    // 周期性计时器
    @Test
    public void timerTest2() throws Exception {
        Vertx vertx = Vertx.vertx();
        // 1:取消计时器 vertx.cancelTimer(timerID);
        // 2:如果您在 Verticle 中创建了计时器, 当这个 Verticle 被撤销时这个计时器会被自动关闭。
        long timerID = vertx.setPeriodic(1000, id -> {
            System.out.println("And every second this is printed");
        });
        System.out.println("First this is printed, timerID is: " + timerID);
        Thread.sleep(999999);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    运行:

    First this is printed, timerID is: 0
    And every second this is printed
    And every second this is printed
    And every second this is printed
    ...
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.8:通过eventbus生产和发布消息

    可用于同一进程内的模块解耦

    // 通过eventbus生产和发布消息
    @Test
    public void eventyBusTest() throws Exception {
        Vertx vertx = Vertx.vertx();
        // 订阅消息
        EventBus eb = vertx.eventBus();
        MessageConsumer<String> consumer = eb.consumer("news.uk.sport");
        // 1:带有确认的,可以像下边这样发送和接收消息
        /*
        接收者:
        MessageConsumer consumer = eventBus.consumer("news.uk.sport");
        consumer.handler(message -> {
            System.out.println("I have received a message: " + message.body());
            message.reply("how interesting!");
        });
        发送者:
    
        eventBus.request("news.uk.sport", "Yay! Someone kicked a ball across a patch of grass", ar -> {
            if (ar.succeeded()) {
            System.out.println("Received reply: " + ar.result().body());
            }
        });
            */
        consumer.handler(message -> {
            System.out.println("I have received a message: " + message.body());
        });
        // 发布消息
        System.out.println("发布消息:" + "Yay! Someone kicked a ball");
        eb.publish("news.uk.sport", "Yay! Someone kicked a ball");
        Thread.sleep(999999);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    运行:

    发布消息:Yay! Someone kicked a ball
    I have received a message: Yay! Someone kicked a ball
    
    • 1
    • 2

    2.9:json支持测试

    // json支持测试
    @Test
    public void jsonTest() throws Exception {
        JsonObject object = new JsonObject();
        object.put("foo", "bar").put("num", 123).put("mybool", true);
        System.out.println(object.getInteger("num"));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    运行:

    123
    
    Process finished with exit code 0
    
    • 1
    • 2
    • 3

    2.10:tcp支持

    private static Vertx vertx = Vertx.vertx();
    // tcp支持
    @Test
    public void tcpTest() {
        NetServer server = vertx.createNetServer();
        // 1:若想想要监听随机端口可以指定端口号为0,后续可以调用 actualPort 方法来获得服务器实际监听的端口
        // tcp server端
        server.listen(1234, "localhost", res -> {
            if (res.succeeded()) {
                System.out.println("Server is now listening!");
            } else {
                System.out.println("Failed to bind!");
            }
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    运行:

    
    
    • 1

    写在后面

    参考文章列表

    Vert.x Core 文档手册

  • 相关阅读:
    【从0到1设计一个网关】过滤器链的实现---实现负载均衡过滤器
    基于单片机的煤气泄漏检测报警装置设计
    unity unityWebRequest 通过http下载服务器资源
    想知道海外技术面试都考些什么吗?
    基于java的社会公益平台计算机毕业设计源码+系统+lw文档+mysql数据库+调试部署
    基于ElasticSearch存储海量AIS数据:AIS数据索引机制篇
    关于遥感像元反射率的思考
    实验室安全巡检管理系统—全面安全检查
    【算法练习Day5】有效的字母异位词 &两个数组的交集&&快乐数&&两数之和
    【Spring Cloud系列】Config详解与应用
  • 原文地址:https://blog.csdn.net/wang0907/article/details/134017978