• Kyuubi


    Get started

    Start Engines

    Engines are launched by the server automatically without end users’ attention.

    If you use the same user in the above case to create another connection, the engine will be reused. You may notice that the time cost for connection here is much shorter than the last round.

    If you use a different user to create a new connection, another engine will be started.

    $ bin/beeline -u 'jdbc:hive2://localhost:10009/' -n kentyao
    
    • 1
    The Share Level Of Kyuubi Engines

    All sessions with USER(default) share level use the same engine if and only if the session user is the same.

    Those sessions share the same engine with objects belong to the one and only SparkContext instance, including Classes/Classloaders, SparkConf, Driver/Executors, Hive Metastore Client, etc. But each session can still have its own SparkSession instance, which contains separate session state, including temporary views, SQL config, UDFs etc. Setting kyuubi.engine.single.spark.session to true will make SparkSession instance a singleton and share across sessions.

    When closing session, the corresponding engine will not be shutdown. When all sessions are closed, the corresponding engine still has a time-to-live lifespan. This TTL allows new sessions to be established quickly without waiting for the engine to start.

    GROUP Share Level

    An engine will be shared by all sessions created by all users belong to the same primary group name. The engine will be launched by the group name as the effective username, so here the group name is kind of special user who is able to visit the compute resources/data of a team. It follows the Hadoop GroupsMapping to map user to a primary group. If the primary group is not found, it falls back to the USER level.

    The mechanisms of SparkContext, SparkSession and TTL works similarly to USER share level.

    SERVER Share Level

    Literally, this model is similar to Spark Thrift Server with High availability.

    剖析

    架构实现

    Kyuubi引擎的实现逻辑和 Spark Thrift Server 如出一辙,不同的是它可以通过client/cluster模式提交,IP和端口都是随机的,所以额外加上了做服务发现的客户端,用于将自己暴露给Server和对应的租户。

    Kyuubi从整体上可以分为用户层、服务发现层、Kyuubi Server层、Kyuubi Engine层,其整体概述如下:

    • 用户层

    指通过不同方式使用Kyuubi的用户,比如通过JDBC或beeline方式使用Kyuubi的用户。

    • 服务发现层

    服务发现层依赖于Zookeeper实现,其又分为Kyuubi Server层的服务发现和Kyuubi Engine层的服务发现。

    • Kyuubi Server层

    由多个不同的KyuubiServer实例组成,每个KyuubiServer实例本质上为基于Apache Thrift实现的RPC服务端,其接收来自用户的请求,但并不会真正执行该请求的相关SQL操作,只会作为代理转发该请求到Kyuubi Engine层用户所属的SparkSQLEngine实例上。

    • Kyuubi Engine层

    由多个不同的SparkSQLEngine实例组成,每个SparkSQLEngine实例本质上为基于Apache Thrift实现的并且持有一个SparkSession实例的RPC服务端,其接收来自KyuubiServer实例的请求,并通过SparkSession实例来执行。在Kyuubi的USER共享层级上,每个SparkSQLEngine实例都是用户级别的,即不同的用户其会持有不同的SparkSQLEngine实例,以实现用户级别的资源隔离和控制。

    SparkSQLEngine实例是针对不同的用户按需启动的。在Kyuubi整体系统启动之后,如果没有用户访问Kyuubi服务,实际上在整个系统中只有一个或多个KyuubiServer实例,当有用户通过JDBC或beeline的方式连接KyuubiServer实例时,其会在Zookeeper上去查找是否存在用户所属的SparkSQLEngine实例,如果没有,则通过spark-submit提交一个Spark应用,而这个Spark应用本身就是SparkSQLEngine,启动后,基于其内部构建的SparkSession实例,即可为特定用户执行相关SQL操作。

    RPC 相关
    • 站在用户层视角来看,其为RPC客户端,而为其提供RPC服务的是Kyuubi Server层,在这里,Kyuubi Server是RPC服务端;
    • 站在Kyuubi Server层视角来看,其既是为用户层提供RPC服务的RPC服务端,同时也是使用Kyuubi Engine层RPC服务的RPC客户端;
    • 站在Kyuubi Engine层视角来看,其为RPC服务端,其为Kyuubi Server层提供RPC服务;
    关键配置
    超时引擎推出机制
      val ENGINE_IDLE_TIMEOUT: ConfigEntry[Long] = buildConf("kyuubi.session.engine.idle.timeout")
        .doc("engine timeout, the engine will self-terminate when it's not accessed for this " +
          "duration. 0 or negative means not to self-terminate.")
        .version("1.0.0")
        .timeConf
        .createWithDefault(Duration.ofMinutes(30L).toMillis)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    org.apache.kyuubi.engine.spark.SparkSQLEngine

      override def start(): Unit = {
        super.start()
        // Start engine self-terminating checker after all services are ready and it can be reached by
        // all servers in engine spaces.
        backendService.sessionManager.startTerminatingChecker(() => {
          assert(currentEngine.isDefined)
          currentEngine.get.stop()
        })
    
        startLifetimeTerminatingChecker(() => {
          assert(currentEngine.isDefined)
          currentEngine.get.stop()
        })
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    org.apache.kyuubi.session.SessionManager

      private[kyuubi] def startTerminatingChecker(stop: () => Unit): Unit = if (!isServer) {
        // initialize `_latestLogoutTime` at start
        _latestLogoutTime = System.currentTimeMillis()
        val interval = conf.get(ENGINE_CHECK_INTERVAL)
        val idleTimeout = conf.get(ENGINE_IDLE_TIMEOUT)
        if (idleTimeout > 0) {
          val checkTask = new Runnable {
            override def run(): Unit = {
              if (!shutdown && System.currentTimeMillis() - latestLogoutTime > idleTimeout &&
                getOpenSessionCount <= 0) {
                info(s"Idled for more than $idleTimeout ms, terminating")
                stop()
              }
            }
          }
          timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS)
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
  • 相关阅读:
    R语言使用strsplit函数基于指定字符或者字符串分割字符串、使用sub函数进行字符串替换
    京东api接口调用
    在 Solidity 中 ++i 为什么比 i++ 更省 Gas?
    Spark 和 Kafka 处理 API 请求与返回数据DEMO
    CNN复现系列一:基于zcu102的yolov2(part4:sdk部分)
    树莓派学习笔记--Wiring Pi库的安装
    一起来学习vue吧(v-if,v-else,v-else-if)
    Word隐藏批注知识分享,快速提升工作效率!
    [论文评析]MediaPipe Hands: On-device Real-time Hand Tracking, ArXiv,2020
    MyBtis 替换符号 查询 SQL 代替符号
  • 原文地址:https://blog.csdn.net/zhixingheyi_tian/article/details/132720457