
相关文章:

每种组件都会负责处理一些任务,比如WebRTC Portal负责和WebRTC客户端进行信令交互、Conference Agent负责会议房间的管理和控制、WebRTC Agent负责流的发布和订阅,每一个任务在服务端会对应于一个task。
每种Agent(比如WebRTC Agent)都会有一个主进程(代码里面叫worker进程,也叫agent进程)、多个工作进程(代码里面叫node进程)。
agent进程会提供node进程管理的RPC服务(getNode、recycleNode、queryNode),node进程会提供实际工作的RPC服务(比如WebRTC的publish操作等)。
getNode会返回node进程的rpcId(也就是nodeId),用于调用方直接向node发起RPC。
Cluster Manager负责对agent进行调度,注意不是对node进行调度。
一个agent和多个node是运行在同一台机器上的,同一台机器不用进行调度。它会记录agent与task的关系,作为agent负载的体现,并据此实现调度逻辑。
基础模块架构图:

接下来结合具体代码展开分析上面介绍的进程结构的建立和交互过程,以及几个典型请求的处理流程。
// Only following paths need authentication.
var authPaths = ['/v1/rooms*', '/v1.1/rooms*', '/services*', '/cluster*'];
app.get(authPaths, serverAuthenticator.authenticate);
app.post(authPaths, serverAuthenticator.authenticate);
app.delete(authPaths, serverAuthenticator.authenticate);
app.put(authPaths, serverAuthenticator.authenticate);
app.patch(authPaths, serverAuthenticator.authenticate);
app.post('/services', servicesResource.create);
app.get('/services', servicesResource.represent);
app.get('/services/:service', serviceResource.represent);
app.delete('/services/:service', serviceResource.deleteService);
...
source/portal/v11Client.js socket.on('publish', function(pubReq,
source/portal/portal.js that.publish = = function(participantId, streamId, pubInfo) =>
source/portal/rpcRequest.js that.publish = function(controller, participantId, streamId, Options):向Conference Agent的node发起publish RPC
source/management_api/resource/v1/tokensResource.js generateToken = function(currentRoom, ... =>
source/management_api/resource/v1/tokensResource.js rpc.callRpc(cluster_name, 'schedule', ... =>
source/cluster_manager/scheduler.js that.schedule = function (task, preference, ...
source/portal/portal.js that.join = function(participantId, token) =>
source/portal/rpcRequest.js that.join = function(controller, roomId, participant) =>
source/agent/conference/conference.js that.join = function(roomId, participantInfo, callback) =>
source/agent/conference/conference.js initRoom = function(roomId, origin) => //保存房间信息,创建roomController、accessController
source/agent/conference/conference.js addParticipant = function(participantInfo, permission) => //保存用户信息
source/portal/v11Client.js socket.on('publish', function(pubReq,
source/portal/portal.js that.publish = = function(participantId, streamId, pubInfo) =>
source/portal/rpcRequest.js that.publish = function(controller, participantId, streamId, Options) =>
source/agent/conference/conference.js that.publish = function(participantId, streamId, pubInfo, callback) =>
source/agent/conference/accessController.js that.initiate = function(participantId, sessionId, direction, origin, sessionOptions, ... =>
source/agent/conference/rpcRequest.js that.getWorkerNode = function(clusterManager, purpose, ... => //发起RPC,拿到WebRTC Agent的nodeId
source/agent/conference/rpcRequest.js that.initiate = function(accessNode, sessionId, ... => //向WebRTC Agent的node发起publish RPC
# WebRTC Agent的RPC服务定义在webrtc/index.js中
source/agent/conference/rpcRequest.js that.initiate = function(accessNode, sessionId, ... => //向WebRTC Agent的node发起publish RPC
source/agent/webrtc/index.js that.publish = function (operationId, connectionType, ... =>
source/agent/webrtc/index.js createWebRTCConnection = function (transportId, ... => //创建WrtcConnection(`webrtc/wrtcConnection.js`),它负责和C++ 代码进行交互
# WebRTC PC连接成功后,会向Conference Agent的node发起onSessionProgress RPC
source/agent/conference/conference.js conference.js that.onSessionProgress =>
source/agent/conference/accessController.js that.onSessionStatus =>
source/agent/conference/accessController.js accessController.js onReady =>
source/agent/conference/conference.js onSessionEstablished =>
source/agent/conference/conference.js addStream =>
source/agent/conference/roomController.js that.publish
source/portal/rpcRequest.js that.onSessionSignaling =>
source/agent/conference/conference.js that.onSessionSignaling =>
source/agent/conference/rtcController.js onClientTransportSignaling =>
source/agent/conference/rpcRequest.js that.onTransportSignaling =>
source/agent/webrtc/index.js that.onTransportSignaling =>
source/agent/webrtc/wrtcConnection.js that.onSignalling
// 其他资料上述显示过程如下:
WebRTC Portal rpcRequest.js that.onSessionSignaling =>
conference.js that.onSessionSignaling =>
accessController.js that.onSessionSignaling:向WebRTC Agent的node发起onSessionSignaling RPC
source/portal/rpcRequest.js that.publish = function(controller, participantId, streamId, Options) =>
source/agent/conference/conference.js that.subscribe = function(controller, participantId, ... =>
source/agent/conference/rtcController.js initiate(ownerId, sessionId, direction, origin, ... =>
source/agent/conference/rpcRequest.js that.getWorkerNode:发起RPC,拿到WebRTC Agent的nodeId
source/agent/conference/rpcRequest.js that.initiate = function(accessNode, sessionId, ... => //向WebRTC Agent的node发起publish RPC
# WebRTC PC连接成功后,会向Conference Agent的node发起onSessionProgress RPC
source/agent/conference/conference.js conference.js that.onSessionProgress =>
source/agent/conference/accessController.js that.onSessionStatus =>
source/agent/conference/accessController.js onReady =>
source/agent/conference/conference.js onSessionEstablished =>
source/agent/conference/conference.js addSubscription =>
source/agent/conference/roomController.js that.subscribe =>
source/agent/conference/roomController.js spreadStream:如果流的发布和订阅由不同的WebRTC Agent处理,就需要把流从发布Agent扩散到订阅Agent
source/agent/conference/roomController.js linkup(subscribe的内部函数):向WebRTC Agent的node发起linkup RPC
source/agent/conference/rpcRequest.js that.initiate =>
source/agent/webrtc/index.js that.publish =>
source/agent/webrtc/index.js createWebRTCConnection:创建WrtcConnection(`webrtc/wrtcConnection.js`),它负责和C++ 代码进行交互
source/agent/connections.js that.addConnection:保存创建的连接对象
source/portal/rpcRequest.js that.onSessionSignaling =>
source/agent/conference/conference.js that.onSessionSignaling =>
source/agent/conference/rtcController.js onClientTransportSignaling =>
source/agent/conference/rpcRequest.js that.onTransportSignaling =>
source/agent/webrtc/index.js that.onSessionSignaling =>
source/agent/webrtc/wrtcConnection.js that.onSignalling:把SDP传给C++ 层的代码进行处理
source/portal/portal.js that.subscribe = function(participantId, ... =>
source/portal/rpcRequest.js that.publish = function(controller, participantId, streamId, Options) =>
source/agent/conference/conference.js that.subscribe = function(controller, participantId, ... =>
source/agent/conference/rtcController.js initiate(ownerId, sessionId, direction, origin, ... =>
source/agent/conference/rpcRequest.js that.initiate = function(accessNode, sessionId, ... => //向WebRTC Agent的node发起publish RPC
source/agent/webrtc/index.js that.subscribe = function (operationId, connectionType, ... =>
source/agent/webrtc/index.js createWebRTCConnection = function (transportId, ... => //创建WrtcConnection(`webrtc/wrtcConnection.js`),它负责和C++ 代码进行交互
source/agent/connections.js that.addConnection:保存创建的连接对象
source/agent/conference/roomController.js that.subscribe =>
source/agent/conference/roomController.js spreadStream:如果流的发布和订阅由不同的WebRTC Agent处理,就需要把流从发布Agent扩散到订阅Agent
source/agent/conference/roomController.js linkup(subscribe的内部函数):向WebRTC Agent的node发起linkup RPC
source/agent/webrtc/index.js that.linkup =>
source/agent/internalConnectionRouter.js linkup(dstId, from) =>
source/agent/connections.js that.linkupConnection =>
source/agent/webrtc/wrtcConnection.js sender.addDestination =>
source/agent/webrtc/wrtcConnection.js addDestination 调用C++ 接口关联发布端和订阅端
var spreadStream = function (stream_id, target_node, target_node_type, on_ok, on_error) {
log.debug('spreadStream, stream_id:', stream_id,
'target_node:', target_node, 'target_node_type:', target_node_type);
if (!streams[stream_id] || !terminals[streams[stream_id].owner]) {
return on_error('Cannot spread a non-existing stream');
}
const stream_owner = streams[stream_id].owner;
const original_node = terminals[stream_owner].locality.node;
const audio = ((streams[stream_id].audio && target_node_type !== 'vmixer' && target_node_type !== 'vxcoder') ? true : false);
const video = ((streams[stream_id].video && target_node_type !== 'amixer' && target_node_type !== 'axcoder') ? true : false);
const spread_id = stream_id + '@' + target_node;
const data = !!streams[stream_id].data;
if (!audio && !video && !data) {
return on_error('Cannot spread stream without audio, video or data.');
}
if (original_node === target_node) {
log.debug('no need to spread');
return on_ok();
}
const on_spread_start = function () {
log.debug('spread start:', spread_id);
const i = streams[stream_id].spread.findIndex((s) => (s.target === target_node));
if (i >= 0) {
if (streams[stream_id].spread[i].status === 'connected') {
log.debug('spread already exists:', spread_id);
on_ok();
} else if (streams[stream_id].spread[i].status === 'connecting') {
log.debug('spread is connecting:', spread_id);
streams[stream_id].spread[i].waiting.push({onOK: on_ok, onError: on_error});
return true;
} else {
log.error('spread status is ambiguous:', spread_id);
on_error('spread status is ambiguous');
}
return false;
}
streams[stream_id].spread.push({target: target_node, status: 'connecting', waiting: []});
return true;
};
const on_spread_failed = function(reason) {
log.error('spreadStream failed, stream_id:', stream_id, 'reason:', reason);
const i = (streams[stream_id] ? streams[stream_id].spread.findIndex((s) => {return s.target === target_node;}) : -1);
if (i > -1) {
streams[stream_id].spread[i].waiting.forEach((e) => {
e.onError(reason);
});
streams[stream_id].spread.splice(i, 1);
}
on_error(reason);
};
const on_spread_ok = function () {
log.debug('spread ok:', spread_id);
const i = streams[stream_id].spread.findIndex((s) => {return s.target === target_node;});
if (i >= 0) {
streams[stream_id].spread[i].status = 'connected';
process.nextTick(() => {
streams[stream_id].spread[i].waiting.forEach((e) => {
e.onOK();
});
streams[stream_id].spread[i].waiting = [];
});
on_ok();
} else {
on_error('spread record missing');
}
}
if (!on_spread_start()) {
return;
}
if (['vmixer', 'amixer', 'vxcoder', 'axcoder', 'aselect'].includes(target_node_type)) {
const locality = terminals[stream_owner].locality;
if (!locality.ip || !locality.port) {
log.error('No internal address for locality:', locality);
on_spread_failed('No internal address for locality');
} else {
makeRPC(
rpcClient,
target_node,
'publish',
[
stream_id,
'internal',
{
controller: selfRpcId,
publisher: (terminals[stream_owner].owner || 'common'),
audio: (audio ? {codec: streams[stream_id].audio.format} : false),
video: (video ? {codec: streams[stream_id].video.format} : false),
data: data,
ip: locality.ip,
port: locality.port,
}
],
function pubOk() { on_spread_ok(); },
function pubError(e) { on_spread_failed(e); }
);
}
} else {
on_spread_ok();
}
};
source/agent/audio/index.js that.createInternalConnection =>
source/agent/InternalConnectionFactory.js that.create:根据方向,创建InConnection或OutConnection
// Create the connection and return the port info
that.create = function (connId, direction, internalOpt) {
// Get internal connection's arguments
var prot = internalOpt.protocol;
var minport = internalOpt.minport || 0;
var maxport = internalOpt.maxport || 0;
var ticket = internalOpt.ticket;
if (preparedSet[connId]) {
log.warn('Internal Connection already prepared:', connId);
// FIXME: Correct work flow should not reach here, when a connection
// is in use, it should not be created again. we should ensure the
// right call sequence in upper layer.
return preparedSet[connId].connection.getListeningPort();
}
var conn = (direction === 'in')
? InConnection(prot, minport, maxport, ticket)
: OutConnection(prot, minport, maxport, ticket);
preparedSet[connId] = {connection: conn, direction: direction};
return conn.getListeningPort();
};
# 创建clusterWorker触发
source/common/clusterWorker.js joinCluster =>
source/common/clusterWorker.js join:向Cluster Manager发起join RPC
source/cluster_manager/clusterManager.js join =>
source/cluster_manager/clusterManager.js workerJoin =>
source/cluster_manager/scheduler.js that.add:保存worker进程的信息
source/management_api/resource/v1/tokensResource.js generateToken =>
source/management_api/requestHandler.js exports.schedulePortal =>
source/management_api/requestHandler.js scheduleAgent('portal', ... =>
source/cluster_manager/scheduler.js that.schedule:根据调度策略和负载信息,选择出一个worker,回调回去
source/portal/portal.js that.join
source/portal/rpcRequest.js that.getController =>
source/cluster_manager/scheduler.js that.schedule => //拿到Conference Agent的worker进程id
source/agent/index.js (rpcAPI)getNode: function(task, callback) =>
source/agent/nodeManager.js that.getNode => 根据房间号查找已启动的node,或启动新的node
source/agent/index.js (rpcAPI)getNode: function(task, callback) 把Conference Agent的nodeId回调回去
source/agent/conference/conference.js that.publish = function(participantId, streamId, pubInfo, callback) =>
source/agent/conference/accessController.js that.initiate = function(participantId, sessionId, direction, origin, sessionOptions, ... =>
source/agent/conference/rpcRequest.js that.getWorkerNode = function(clusterManager, purpose, ... => //发起RPC,拿到WebRTC Agent的nodeId
source/cluster_manager/scheduler.js that.schedule => //拿到WebRTC Agent的worker进程id
source/agent/conference/rpcRequest.js //回到that.getWorkerNode,向WebRTC Agent的worker进程发起getNode RPC
source/agent/index.js (rpcAPI)getNode: function(task, callback) =>
source/agent/nodeManager.js that.getNode => 根据房间号查找已启动的node,或启动新的node
source/agent/index.js (rpcAPI)getNode: function(task, callback) 把WebRTC Agent的nodeId回调回去
source/agent/nodeManager.js that.getNode =>
source/agent/nodeManager.js addTask = function(nodeId, task) =>
index.js创建nodeManager时提供的onTaskAdded回调 =>
source/common/clusterWorker.js that.addTask =>
source/common/clusterWorker.js pickUpTasks = function (taskList) =>
source/cluster_manager/clusterManager.js pickUpTasks = function (worker, tasks) =>
source/cluster_manager/scheduler.js that.pickUpTasks =>
source/cluster_manager/scheduler.js executeTask = function (worker, task):把task添加到worker的tasks数组中,表示worker开始执行这个task
source/agent/nodeManager.js that.recycleNode =>
source/agent/nodeManager.js removeTask = function(nodeId, task, ... =>
index.js创建nodeManager时提供的onTaskRemoved回调 =>
source/common/clusterWorker.js that.removeTask =>
source/common/clusterWorker.js layDownTask = function (worker, task) =>
source/cluster_manager/clusterManager.js layDownTask =>
source/cluster_manager/scheduler.js that.layDownTask =>
source/cluster_manager/scheduler.js cancelExecution:把task从worker的tasks数组中移除,表示worker结束执行这个task

参考:
《WebRTC Native开发实战》
OWT Server 整体架构分析 里的基础架构图