• openstack nova 源码分析


    1. 下载nova源码

    从 github 下载 Victoria 版本的 Nova 源码

    git clone https://github.com/openstack/nova.git --branch stable/victoria --single-branch
    
    • 1

    nova/ 文件夹下的目录

    accelerator/    # Cyborg 加速器
    api/            # Nova API 服务
    cmd/            # 各个 Nova 服务的入口程序
    compute/        # Nova Compute 服务
    conductor/      # Nova Conductor 服务
    conf/           # 所有的配置选项
    console/        # nova-console 服务
    db/             # 封装数据库操作
    hacking/        # 编码规范检查
    image/          # 封装镜像操作,Glance 接口抽象
    keymgr/         # 密钥管理器实现
    locale/         # 国际化相关文件
    network/        # nova-network 服务
    notifications/  # 通知相关功能
    objects/        # 封装实体对象的 CURD 操作
    pci/            # PCI/SR-IOV 支持
    policies/       # 所有 Policy 的默认规则
    privsep/        # oslo_privsep 相关
    scheduler/      # Nova Scheduler 服务
    servicegroup/   # 成员服务(membership),服务组
    storage/        # Ceph 存储支持
    tests/          # 单元测试
    virt/           # 支持的 hypervisor 驱动
    volume/         # 封装卷访问接口,Cinder 接口抽象
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    最新版的源码如下:
    在这里插入图片描述
    nova/ 文件夹下的 python 文件:

    __init__.py
    availability_zones.py   # 区域设置的工具函数
    baserpc.py              # 基础 RPC 客户端/服务端实现
    block_device.py         # 块设备映射
    cache_utils.py          # oslo_cache 封装
    config.py               # 解析命令行参数
    context.py              # 贯穿 Nova 的所有请求的上下文
    crypto.py               # 包装标准加密数据元素
    debugger.py             # pydev 调试
    exception.py            # 基础异常类
    exception_wrapper.py    # 封装异常类
    filters.py              # 基础过滤器
    i18n.py                 # 集成 oslo_i18n
    loadables.py            # 可加载类
    manager.py              # 基础 Manager 类
    middleware.py           # 更新 oslo_middleware 的默认配置选项
    monkey_patch.py         # eventlet 猴子补丁
    policy.py               # 策略引擎
    profiler.py             # 调用 OSProfiler
    quota.py                # 每个项目的资源配额
    rpc.py                  # RPC 操作相关的工具函数
    safe_utils.py           # 不会导致循环导入的工具函数
    service.py              # 通用节点基类,用于在主机上运行的所有工作者
    service_auth.py         # 身份认证插件
    test.py                 # 单元测试基础类
    utils.py                # 工具函数
    version.py              # 版本号管理
    weights.py              # 权重插件
    wsgi.py                 # 管理 WSGI 应用的服务器类
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    setup.cfg 配置文件,[entry_points] 小节指定了 nova 各个组件入口

    console_scripts =
        nova-api = nova.cmd.api:main
        nova-api-metadata = nova.cmd.api_metadata:main
        nova-api-os-compute = nova.cmd.api_os_compute:main
        nova-compute = nova.cmd.compute:main
        nova-conductor = nova.cmd.conductor:main
        nova-manage = nova.cmd.manage:main
        nova-novncproxy = nova.cmd.novncproxy:main
        nova-policy = nova.cmd.policy:main
        nova-rootwrap = oslo_rootwrap.cmd:main
        nova-rootwrap-daemon = oslo_rootwrap.cmd:daemon
        nova-scheduler = nova.cmd.scheduler:main
        nova-serialproxy = nova.cmd.serialproxy:main
        nova-spicehtml5proxy = nova.cmd.spicehtml5proxy:main
        nova-status = nova.cmd.status:main
    wsgi_scripts =
        nova-api-wsgi = nova.api.openstack.compute.wsgi:init_application
        nova-metadata-wsgi = nova.api.metadata.wsgi:init_application
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    2. nova-api

    nova-api 对外提供 RESTful API,没有对内的 RPC 。

    nova/api/ 目录结构

    __init__.py
    auth.py             # 身份认证中间件
    compute_req_id.py   # x-compute-request-id 中间件(oslo_middleware)
    metadata/           # Metadata API
    openstack/          # Nova v2.1 API
    validation/         # 请求体验证
    wsgi.py             # WSGI 原语(请求、应用、中间件、路由、加载器)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    openstack 目录中包含 WSGI 基础架构的代码,一些 WSGI 中间件,以及如何解析请求与分发请求的核心代码。
    nova/api/openstack/compute/ 包含 Controller 实现,Resource 对象将 API 映射到相应的 Controller 方法上。

    __init__.py
    api_version_request.py  # 版本验证
    auth.py                 # noauth 中间件
    common.py               # 信息查询的工具函数
    compute/                # 每个 API 的入口点
    identity.py             # 验证项目是否存在
    requestlog.py           # 请求日志中间件
    urlmap.py               # url 映射
    versioned_method.py     # 版本信息
    wsgi.py                 # WSGI 相关抽象类
    wsgi_app.py             # WSGI 应用程序初始化方法
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    API 请求路由

    nova-api 读取 etc/nova/api-paste.ini 并加载 WSGI 程序,最终 API 入口点都位于 nova.api.openstack.compute

    [composite:osapi_compute]
    use = call:nova.api.openstack.urlmap:urlmap_factory
    /: oscomputeversions                # version API
    /v2: oscomputeversion_legacy_v2     # v2 API
    /v2.1: oscomputeversion_v2          # v2.1 API
    # v21 is an exactly feature match for v2, except it has more stringent
    # input validation on the wsgi surface (prevents fuzzing early on the
    # API). It also provides new features via API microversions which are
    # opt into for clients. Unaware clients will receive the same frozen
    # v2 API feature set, but with some relaxed validation
    /v2/+: openstack_compute_api_v21_legacy_v2_compatible
    /v2.1/+: openstack_compute_api_v21
    
    [composite:openstack_compute_api_v21]
    use = call:nova.api.auth:pipeline_factory_v21       # 加载中间件
    keystone = cors http_proxy_to_wsgi compute_req_id faultwrap request_log sizelimit osprofiler bees_profiler authtoken keystonecontext osapi_compute_app_v21
    # DEPRECATED: The [api]auth_strategy conf option is deprecated and will be
    # removed in a subsequent release, whereupon this pipeline will be unreachable.
    noauth2 = cors http_proxy_to_wsgi compute_req_id faultwrap request_log sizelimit osprofiler bees_profiler noauth2 osapi_compute_app_v21
    
    [app:osapi_compute_app_v21]
    paste.app_factory = nova.api.openstack.compute:APIRouterV21.factory  # 入口
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    nova/api/openstack/compute/routes.py 中的 APIRouterV21 主要用来完成路由规则的创建,其中 ROUTE_LIST 保存了 URL 与 Controller 之间的映射关系。

    APIRouterV21 基于 ROUTE_LIST,使用 Routes 模块作为 URL 映射的工具,将各个模块所实现的 API 对应的 URL 注册到 mapper 中,并把每个资源都封装成 nova.api.openstack.wsgi.Resource 对象,当解析 URL 请求时,可以通过 URL 映射找到 API 对应的 Resource 对象。

    # Router 类对 WSGI routes 模块进行了简单的封装
    class APIRouterV21(base_wsgi.Router):
        """Routes requests on the OpenStack API to the appropriate controller
        and method. The URL mapping based on the plain list `ROUTE_LIST` is built
        at here.
        """
        def __init__(self, custom_routes=None):
            """:param custom_routes: the additional routes can be added by this
                   parameter. This parameter is used to test on some fake routes
                   primarily.
            """
            super(APIRouterV21, self).__init__(nova.api.openstack.ProjectMapper())
    
            if custom_routes is None:
                custom_routes = tuple()
    
            for path, methods in ROUTE_LIST + custom_routes:
                # NOTE(alex_xu): The variable 'methods' is a dict in normal, since
                # the dict includes all the methods supported in the path. But
                # if the variable 'method' is a string, it means a redirection.
                # For example, the request to the '' will be redirect to the '/' in
                # the Nova API. To indicate that, using the target path instead of
                # a dict. The route entry just writes as "('', '/)".
                if isinstance(methods, six.string_types):
                    self.map.redirect(path, methods)
                    continue
    
                for method, controller_info in methods.items():
                    # TODO(alex_xu): In the end, I want to create single controller
                    # instance instead of create controller instance for each
                    # route.
                    controller = controller_info[0]()
                    action = controller_info[1]
                    self.map.create_route(path, method, controller, action)
    
        @classmethod
        def factory(cls, global_config, **local_config):
            """Simple paste factory, :class:`nova.wsgi.Router` doesn't have one."""
            return cls()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    nova/api/wsgi.py 解析 URL 映射,通过 _dispatch 回调,调用 Resource 对象的 call 方法,最终通过请求调用 API 对应的模块中的方法。

    # 路由
    class Router(object):
        """WSGI middleware that maps incoming requests to WSGI apps."""
    
        def __init__(self, mapper):
            """Create a router for the given routes.Mapper.
    
            Each route in `mapper` must specify a 'controller', which is a
            WSGI app to call.  You'll probably want to specify an 'action' as
            well and have your controller be an object that can route
            the request to the action-specific method.
    
            Examples:
              mapper = routes.Mapper()
              sc = ServerController()
    
              # Explicit mapping of one route to a controller+action
              mapper.connect(None, '/svrlist', controller=sc, action='list')
    
              # Actions are all implicitly defined
              mapper.resource('server', 'servers', controller=sc)
    
              # Pointing to an arbitrary WSGI app.  You can specify the
              # {path_info:.*} parameter so the target app can be handed just that
              # section of the URL.
              mapper.connect(None, '/v1.0/{path_info:.*}', controller=BlogApp())
    
            """
            self.map = mapper
            # 使用 routes 模块关联 mapper 和 _dispatch
            # routes.middleware.RoutesMiddleware 设置 environ 信息
            self._router = routes.middleware.RoutesMiddleware(self._dispatch,
                                                              self.map)
    
        @webob.dec.wsgify(RequestClass=Request)
        def __call__(self, req):
            """Route the incoming request to a controller based on self.map.
    
            If no match, return a 404.
    
            """
            # 根据 mapper 将请求路由到 WSGI 应用(资源)
            # 每个资源会在 __call__ 方法中根据 HTTP 请求的 URL 路由到对应 Controller 上的方法(Action)
            return self._router
    
        @staticmethod
        @webob.dec.wsgify(RequestClass=Request)
        def _dispatch(req):
            """Dispatch the request to the appropriate controller.
    
            Called by self._router after matching the incoming request to a route
            and putting the information into req.environ.  Either returns 404
            or the routed WSGI app's response.
    
            """
            # 根据 HTTP 请求的 environ 信息找到 URL 对应的 Controller
            match = req.environ['wsgiorg.routing_args'][1]
            if not match:
                return webob.exc.HTTPNotFound()
            app = match['controller']
            return app
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    API 实现

    nova/api/openstack/compute/ 目录包含每个 API 对应的 Controller 实现,Resource 对象将请求的 API 映射到相应的 Controller 方法上。

    keypairs.py (密钥对管理扩展)为例,公共方法包含 create、delete、show、index,多个实现对应不同的 Microversion(使用 @wsgi.Controller.api_version 装饰器)

    @wsgi.expected_errors:API 允许的错误返回码
    @validation.query_schema:请求对应的 json schema
    @wsgi.response:API 请求正常返回码
    @wsgi.action:注册 action
    
    • 1
    • 2
    • 3
    • 4

    Microversion 用于实现兼容性。
    nova/api/openstack/compute/schemas 包含允许的 json schema,表示接受的键值对及其类型。
    通过方法接口可以得到 webob.Request 对象,从 Request 对象中可以获取其他请求参数,用于执行对应的操作。
    keypairs.py

    class KeypairController(wsgi.Controller):
    
        """Keypair API controller for the OpenStack API."""
    
        _view_builder_class = keypairs_view.ViewBuilder
    
        def __init__(self):
            super(KeypairController, self).__init__()
            self.api = compute_api.KeypairAPI()
    
        @wsgi.Controller.api_version("2.10")
        @wsgi.response(201)
        @wsgi.expected_errors((400, 403, 409))
        @validation.schema(keypairs.create_v210)
        def create(self, req, body):
            ...
    
        @wsgi.Controller.api_version("2.2", "2.9")  # noqa
        @wsgi.response(201)
        @wsgi.expected_errors((400, 403, 409))
        @validation.schema(keypairs.create_v22)
        def create(self, req, body):  # noqa
            ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    nova-conductor

    使用 RPC 的子组件通常包含以下文件:

    • api.py 对 RPC 接口进行封装,类似提供 SDK
    • rpcapi.py 暴露给其他内部组件的 RPC 接口,RPC 客户端
    • manager.py 处理 RPC API 调用

    nova-compute 访问数据库的操作都要由 nova-conductor 代理,用 nova/conductor/manager.py 的 ConductorManager 类完成,出于安全性考虑,nova-conductor 和 nova-compute 不能部署在同一服务器上。

    nova/objects 定义了 nova object,封装数据库 CURD 操作,每个类对应数据库中的一张表。

    nova-scheduler

    nova-scheduler 执行调度决策,nova-compute 收集并更新主机数据,实时写入数据库(周期任务)。

    nova/scheduler/filters 包含所有的过滤器实现,用于过滤不符合条件的主机;
    nova/scheduler/weights 包含所有的权重实现,用于计算权重并排序。

    3. 启动流程

    nova-api 启动入口 nova.cmd.api:main

    def main():
        config.parse_args(sys.argv) # 解析参数
        logging.setup(CONF, "nova") # 设置日志
        objects.register_all()      # 注册 nova object
        gmr_opts.set_defaults(CONF) # 设置 oslo_reports
        if 'osapi_compute' in CONF.enabled_apis:
            # NOTE(mriedem): This is needed for caching the nova-compute service
            # version.
            objects.Service.enable_min_version_cache()
        log = logging.getLogger(__name__)
    
        # 生成报告的机制 Guru Meditation Report (GMR)
        gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
    
        # oslo_service.ProcessLauncher
        launcher = service.process_launcher()
        started = 0
        # 根据 paste-ini 文件创建 WSGI 应用
        for api in CONF.enabled_apis:
            should_use_ssl = api in CONF.enabled_ssl_apis
            try:
                # nova.service.WSGIService 初始化 WSGI 程序
                server = service.WSGIService(api, use_ssl=should_use_ssl)
                # oslo_service.ProcessLauncher 创建子进程启动服务
                launcher.launch_service(server, workers=server.workers or 1)
                started += 1
            except exception.PasteAppNotFound as ex:
                log.warning("%s. ``enabled_apis`` includes bad values. "
                            "Fix to remove this warning.", ex)
    
        if started == 0:
            log.error('No APIs were started. '
                      'Check the enabled_apis config option.')
            sys.exit(1)
    
        # 等待子进程终止
        launcher.wait()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    nova.service.WSGIService 的初始化函数实例化 nova.wsgi.Server ,启动函数实际调用了 nova.wsgi.Server 的 start 方法。

    其中的 self._socket 使用 eventlet.listen 创建,最后使用 utils 中封装的 spawn 函数启动 WSGI 程序

    class Server(service.ServiceBase):
        """Server class to manage a WSGI server, serving a WSGI application."""
        
        ...
    
        def start(self):
            """Start serving a WSGI application.
    
            :returns: None
            """
            # The server socket object will be closed after server exits,
            # but the underlying file descriptor will remain open, and will
            # give bad file descriptor error. So duplicating the socket object,
            # to keep file descriptor usable.
    
            dup_socket = self._socket.dup()
            dup_socket.setsockopt(socket.SOL_SOCKET,
                                  socket.SO_REUSEADDR, 1)
            # sockets can hang around forever without keepalive
            dup_socket.setsockopt(socket.SOL_SOCKET,
                                  socket.SO_KEEPALIVE, 1)
    
            ...
    
            self._server = utils.spawn(**wsgi_kwargs)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    nova-conductor 启动入口 nova.cmd.conductor:main

    def main():
        config.parse_args(sys.argv)
        logging.setup(CONF, "nova")
        objects.register_all()
        gmr_opts.set_defaults(CONF)
        objects.Service.enable_min_version_cache()
    
        gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
    
        # nova.service.Service 实例化 Service 对象
        server = service.Service.create(binary='nova-conductor',
                                        topic=rpcapi.RPC_TOPIC)
        workers = CONF.conductor.workers or processutils.get_worker_count()
        # oslo_service.launch 创建 launcher
        service.serve(server, workers=workers)
        # 调用 launcher.wait 等待子进程终止
        service.wait()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    nova.service.Service 初始化函数接受 manager 对象,通过监听消息队列启用 RPC 服务;设置定期任务报告状态,并写入数据库。

    • nova-compute
    • nova-conductor
    • nova-scheduler
      RPC 服务启动时创建 rpc_client 用于发送消息,创建 rpc_server 用于接收消息,分派执行。
    1. rpc_client

    nova/cmd/conductor.py 实际创建 Service 实例

    server = service.Service.create(binary='nova-conductor',
                                    topic=rpcapi.RPC_TOPIC)
    
    • 1
    • 2

    nova/service.py 初始化函数

    # 创建 _driver
    self.servicegroup_api = servicegroup.API()
    
    # 动态导入 manager 类
    manager_class = importutils.import_class(self.manager_class_name)
    
    if objects_base.NovaObject.indirection_api:
        # 创建 RPCClient
        conductor_api = conductor.API()
        # 等待 nova-conductor 启动
        conductor_api.wait_until_ready(context.get_admin_context())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    nova/servicegroup/api.py 创建 _driver

    driver_class = _driver_name_class_mapping[CONF.servicegroup_driver]
    self._driver = importutils.import_object(driver_class,
                                             *args, **kwargs)
    
    • 1
    • 2
    • 3

    nova/conductor/rpcapi.py 设置 rpc_client

    def __init__(self):
        super(ConductorAPI, self).__init__()
        target = messaging.Target(topic=RPC_TOPIC, version='3.0')
        version_cap = self.VERSION_ALIASES.get(CONF.upgrade_levels.conductor,
                                               CONF.upgrade_levels.conductor)
        serializer = objects_base.NovaObjectSerializer()
        # rpc client
        self.client = rpc.get_client(target,
                                     version_cap=version_cap,
                                     serializer=serializer)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    nova/baserpc.py 设置 rpc_client

    def __init__(self, topic):
        super(BaseAPI, self).__init__()
        target = messaging.Target(topic=topic,
                                  namespace=_NAMESPACE,
                                  version='1.0')
        version_cap = self.VERSION_ALIASES.get(CONF.upgrade_levels.baseapi,
                                               CONF.upgrade_levels.baseapi)
        self.client = rpc.get_client(target, version_cap=version_cap)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    2. rpc_server

    nova/cmd/conductor.py 使用 Service 实例启动服务

    # oslo_service.launch 创建 launcher
    service.serve(server, workers=workers)
    # 调用 launcher.wait 等待子进程终止
    service.wait()
    
    • 1
    • 2
    • 3
    • 4

    nova/service.py 实际调用 oslo_service 的 launch 函数,创建绿色线程(greenthread)或进程,最终调用 Service 实例的 start 方法

    def serve(server, workers=None):
        global _launcher
        if _launcher:
            raise RuntimeError(_('serve() can only be called once'))
    
        _launcher = service.launch(CONF, server, workers=workers,
                                   restart_method='mutate')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    nova/service.py Service 实例的 start 方法创建 rpc_server 和 dispatcher;设置周期任务

    # 创建 rpc server 以及 dispatcher
    self.rpcserver = rpc.get_server(target, endpoints, serializer)
    self.rpcserver.start()
    
    ...
    
    if self.periodic_enable:
        if self.periodic_fuzzy_delay:
            initial_delay = random.randint(0, self.periodic_fuzzy_delay)
        else:
            initial_delay = None
    
        self.tg.add_dynamic_timer(self.periodic_tasks,
                                 initial_delay=initial_delay,
                                 periodic_interval_max=
                                    self.periodic_interval_max)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    收到消息后主要由 oslo_messaging 进行解析和处理,核心是 oslo_messaging/rpc/dispatcher.py

    incoming 是 AMQP 消息格式

    def dispatch(self, incoming):
        """Dispatch an RPC message to the appropriate endpoint method.
    
        :param incoming: incoming message
        :type incoming: IncomingMessage
        :raises: NoSuchMethod, UnsupportedVersion
        """
        message = incoming.message
        ctxt = incoming.ctxt
    
        method = message.get('method')
        args = message.get('args', {})
        namespace = message.get('namespace')
        version = message.get('version', '1.0')
    
        # NOTE(danms): This event and watchdog thread are used to send
        # call-monitoring heartbeats for this message while the call
        # is executing if it runs for some time. The thread will wait
        # for the event to be signaled, which we do explicitly below
        # after dispatching the method call.
        completion_event = eventletutils.Event()
        watchdog_thread = threading.Thread(target=self._watchdog,
                                           args=(completion_event, incoming))
        if incoming.client_timeout:
            # NOTE(danms): The client provided a timeout, so we start
            # the watchdog thread. If the client is old or didn't send
            # a timeout, we just never start the watchdog thread.
            watchdog_thread.start()
    
        found_compatible = False
        for endpoint in self.endpoints:
            target = getattr(endpoint, 'target', None)
            if not target:
                target = self._default_target
    
            if not (self._is_namespace(target, namespace) and
                    self._is_compatible(target, version)):
                continue
    
            if hasattr(endpoint, method):
                if self.access_policy.is_allowed(endpoint, method):
                    try:
                        # 分派,调用函数
                        return self._do_dispatch(endpoint, method, ctxt, args)
                    finally:
                        completion_event.set()
                        if incoming.client_timeout:
                            watchdog_thread.join()
    
            found_compatible = True
    
        if found_compatible:
            raise NoSuchMethod(method)
        else:
            raise UnsupportedVersion(version, method=method)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    oslo_messaging/rpc/dispatcher.py 调用函数

    def _do_dispatch(self, endpoint, method, ctxt, args):
        ctxt = self.serializer.deserialize_context(ctxt)
        new_args = dict()
        for argname, arg in args.items():
            new_args[argname] = self.serializer.deserialize_entity(ctxt, arg)
        func = getattr(endpoint, method)
        result = func(ctxt, **new_args)
        return self.serializer.serialize_entity(ctxt, result)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    发送消息的实现都在 nova/conductor/rpcapi.py 中,cctxt.call 同步调用,cctxt.cast 异步调用

    def object_class_action_versions(self, context, objname, objmethod,
                                     object_versions, args, kwargs):
        cctxt = self.client.prepare()
        return cctxt.call(context, 'object_class_action_versions',
                          objname=objname, objmethod=objmethod,
                          object_versions=object_versions,
                          args=args, kwargs=kwargs)
    
    def cache_images(self, ctxt, aggregate, image_ids):
        version = '1.21'
        if not self.client.can_send_version(version):
            raise exception.NovaException('Conductor RPC version pin does not '
                                          'allow cache_images() to be called')
        cctxt = self.client.prepare(version=version)
        cctxt.cast(ctxt, 'cache_images', aggregate=aggregate,
                   image_ids=image_ids)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    oslo_messaging/rpc/client.py 实现

    def cast(self, ctxt, method, **kwargs):
        """Invoke a method and return immediately. See RPCClient.cast()."""
        msg = self._make_message(ctxt, method, kwargs)
        msg_ctxt = self.serializer.serialize_context(ctxt)
    
        self._check_version_cap(msg.get('version'))
    
        try:
            self.transport._send(self.target, msg_ctxt, msg,
                                 retry=self.retry,
                                 transport_options=self.transport_options)
        except driver_base.TransportDriverError as ex:
            raise ClientSendError(self.target, ex)
    
    def call(self, ctxt, method, **kwargs):
        """Invoke a method and wait for a reply. See RPCClient.call()."""
        if self.target.fanout:
            raise exceptions.InvalidTarget('A call cannot be used with fanout',
                                           self.target)
    
        msg = self._make_message(ctxt, method, kwargs)
        msg_ctxt = self.serializer.serialize_context(ctxt)
    
        timeout = self.timeout
        if self.timeout is None:
            timeout = self.conf.rpc_response_timeout
    
        cm_timeout = self.call_monitor_timeout
    
        self._check_version_cap(msg.get('version'))
    
        try:
            result = \
                self.transport._send(self.target, msg_ctxt, msg,
                                     wait_for_reply=True, timeout=timeout,
                                     call_monitor_timeout=cm_timeout,
                                     retry=self.retry,
                                     transport_options=self.transport_options)
        except driver_base.TransportDriverError as ex:
            raise ClientSendError(self.target, ex)
    
        return self.serializer.deserialize_entity(ctxt, result)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    关于周期任务,nova/scheduler/manager.py 中使用 @periodic_task.periodic_task 装饰的方法将会被周期调用,从 scheduler 的调试日志可以看到周期任务的运行

    ********************************************************************* log_opt_values /home/jck/.local/lib/python3.6/site-packages/oslo_config/cfg.py:2591
    2021-05-18 05:53:17.030 3501 DEBUG oslo_service.periodic_task [req-66b43add-49c7-4f33-8f6b-1e33cb9f0123 - - - - -] Running periodic task SchedulerManager._run_periodic_tasks run_periodic_tasks /home/jck/.local/lib/python3.6/site-packages/oslo_service/periodic_task.py:211
    2021-05-18 05:53:39.072 3500 DEBUG oslo_service.periodic_task [req-8436b3e2-96d1-4f15-8ae8-b596cee05536 - - - - -] Running periodic task SchedulerManager._run_periodic_tasks run_periodic_tasks /home/jck/.local/lib/python3.6/site-packages/oslo_service/periodic_task.py:211
    ...
    
    • 1
    • 2
    • 3
    • 4

    对应于 nova/scheduler/manager.py

    @periodic_task.periodic_task(spacing=CONF.scheduler.periodic_task_interval,
                                 run_immediately=True)
    def _run_periodic_tasks(self, context):
        self.driver.run_periodic_tasks(context)
    
    • 1
    • 2
    • 3
    • 4

    执行周期任务的有 nova-scheduler 和 nova-compute ,主要功能是计算节点 nova-compute 上报资源信息,nova-scheduler 读取数据库,更新资源信息缓存。

  • 相关阅读:
    Python3,9行代码,对比两个Excel数据差异,并把差异结果重新保存。
    orm连接mysql
    为什么你做自媒体赚不到钱?今天就教你几招
    2022 年辽宁省大学生程序设计竞赛 个人题解
    Mysql安装配置
    大数据之Java基础(6)
    拍摄的vlog视频画质模糊怎么办?视频画质高清修复
    基于大数据的工业设备故障诊断模型设计
    前端登录退出:处理Token问题(获取、缓存、失效处理)以及代码实现
    IDEA实现命令行传参快捷方法传参
  • 原文地址:https://blog.csdn.net/xili2532/article/details/126406972