TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。
必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子
从 taskmanager.sh 脚本我们 知道 taskmanager的启动类是 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
。
入口 main(), 前几行是检验参数,日志记录,然后调用 runTaskManagerProcessSecurely
public static void main(String[] args) throws Exception {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
if (maxOpenFileHandles != -1L) {
LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
} else {
LOG.info("Cannot determine the maximum number of open file descriptors");
}
runTaskManagerProcessSecurely(args);
}
接下来我们查看 runTaskManagerProcessSecurely 方法, 就是加载 flink 的配置文件 flink-config.yaml.
public static void runTaskManagerProcessSecurely(String[] args) {
Configuration configuration = null;
try {
configuration = loadConfiguration(args);
} catch (FlinkParseException fpe) {
LOG.error("Could not load the configuration.", fpe);
System.exit(FAILURE_EXIT_CODE);
}
runTaskManagerProcessSecurely(checkNotNull(configuration));
}
然后将解析完的配置传入 runTaskManagerProcessSecurely
public static void runTaskManagerProcessSecurely(Configuration configuration) {
// 加载配置
FlinkSecurityManager.setFromConfiguration(configuration);
// 启动插件管理器
final PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(configuration);
FileSystem.initialize(configuration, pluginManager);
StateChangelogStorageLoader.initialize(pluginManager);
int exitCode;
Throwable throwable = null;
ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
try {
// 这个地方我们见了很多次,组件启动,都是通过 SecurityUtils
// 安装 安全模块
SecurityUtils.install(new SecurityConfiguration(configuration));
// 通过安全上下文 启动 taskManager
exitCode =
SecurityUtils.getInstalledContext()
.runSecured(() -> runTaskManager(configuration, pluginManager));
} catch (Throwable t) {
throwable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
exitCode = FAILURE_EXIT_CODE;
}
if (throwable != null) {
LOG.error("Terminating TaskManagerRunner with exit code {}.", exitCode, throwable);
} else {
LOG.info("Terminating TaskManagerRunner with exit code {}.", exitCode);
}
System.exit(exitCode);
}
runTaskManager 通过名字我们不难看出,离TaskManager的构建越来越近了。我们点进runTaskManager方法:
public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
throws Exception {
final TaskManagerRunner taskManagerRunner;
try {
// 创建 TaskManagerRunner
taskManagerRunner =
new TaskManagerRunner(
configuration,
pluginManager,
TaskManagerRunner::createTaskExecutorService); // 创建 TaskManagerRunner 相关服务
// 启动 TaskManagerRunner
taskManagerRunner.start();
} catch (Exception exception) {
throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
}
// 返回结果码
try {
return taskManagerRunner.getTerminationFuture().get().getExitCode();
} catch (Throwable t) {
throw new FlinkException(
"Unexpected failure during runtime of TaskManagerRunner.",
ExceptionUtils.stripExecutionException(t));
}
}
在这个方法里做了两件事:
1、构建了一个TaskManagerRunner
2、启动TaskManagerRunner
实际上,TaskManager启动的所有准备工作,都是在这个TaskManagerRunner中完成的。
// 创建 TaskManagerRunner
taskManagerRunner =
new TaskManagerRunner(
configuration,
pluginManager,
TaskManagerRunner::createTaskExecutorService); // 创建 TaskManagerRunner 相关服务
TaskManagerRunner::createTaskExecutorService
public static TaskExecutorService createTaskExecutorService(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
BlobCacheService blobCacheService,
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
WorkingDirectory workingDirectory,
FatalErrorHandler fatalErrorHandler)
throws Exception {
// 构建 TaskExecutor
final TaskExecutor taskExecutor =
startTaskManager(
configuration,
resourceID,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
localCommunicationOnly,
externalResourceInfoProvider,
workingDirectory,
fatalErrorHandler);
return TaskExecutorToServiceAdapter.createFor(taskExecutor);
}
public void start() throws Exception {
synchronized (lock) {
//
startTaskManagerRunnerServices();
taskExecutorService.start();
}
}
private void startTaskManagerRunnerServices() throws Exception {
synchronized (lock) {
rpcSystem = RpcSystem.load(configuration);
// TaskManager 内部线程池,用来处理从节点内部各个组件的Io的线程池
this.executor =
Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(), // 线程池大小为当前节点的cpu核心数
new ExecutorThreadFactory("taskmanager-future"));
// 高可用服务
highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
AddressResolution.NO_ADDRESS_RESOLUTION,
rpcSystem,
this);
// flink1.12 引入新功能 JMX服务,提供监控信息
JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
// 启动RPC服务,内部为Akka模型的ActorSystem
rpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);
// 为TaskManager生成了一个ResourceID
this.resourceId =
getTaskManagerResourceID(
configuration, rpcService.getAddress(), rpcService.getPort());
this.workingDirectory =
ClusterEntrypointUtils.createTaskManagerWorkingDirectory(
configuration, resourceId);
LOG.info("Using working directory: {}", workingDirectory);
// 初始化心跳服务,主要是初始化心跳间隔和心跳超时参数配置
HeartbeatServices heartbeatServices =
HeartbeatServices.fromConfiguration(configuration);
// 启动Metric(性能监控) 相关服务
metricRegistry =
new MetricRegistryImpl(
MetricRegistryConfiguration.fromConfiguration(
configuration,
rpcSystem.getMaximumMessageSizeInBytes(configuration)),
ReporterSetup.fromConfiguration(configuration, pluginManager));
final RpcService metricQueryServiceRpcService =
MetricUtils.startRemoteMetricsRpcService(
configuration,
rpcService.getAddress(),
configuration.getString(TaskManagerOptions.BIND_HOST),
rpcSystem);
metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId.unwrap());
// 在主节点启动的时候,事实上已经启动了有个BolbServer,
// 从节点启动的时候,会启动一个BlobCacheService,做文件缓存的服务
blobCacheService =
BlobUtils.createBlobCacheService(
configuration,
Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),
highAvailabilityServices.createBlobStore(),
null);
final ExternalResourceInfoProvider externalResourceInfoProvider =
ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(
configuration, pluginManager);
// 创建得到一个TaskExecutorService,内部封装了TaskExecutor,同时TaskExecutor的构建也在内部完成
taskExecutorService =
taskExecutorServiceFactory.createTaskExecutor(
this.configuration,
this.resourceId.unwrap(),
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
false,
externalResourceInfoProvider,
workingDirectory.unwrap(),
this);
handleUnexpectedTaskExecutorServiceTermination();
MemoryLogger.startIfConfigured(
LOG, configuration, terminationFuture.thenAccept(ignored -> {}));
}
}
这里所做的工作和JobManager启动时一样,是一些基础服务的构建和启动,在这里一共做了以下这些工作:
1、初始化了一个TaskManager内部线程池,用来处理从节点内部各个组件的IO,该线程池的大小为当前节点CPU的核心数。
2、构建了一个高可用服务。
3、初始化JMX服务,用于提供监控信息。
4、启动RPC服务,内部为Akka模型的ActorSystem(点此查看Flink 1.13 源码解析前导——Akka通信模型)
4、为TaskManager生成了一个ResourceID。
5、初始化心跳服务,根据配置文件获取心跳间隔时间参数以及心跳超时参数
6、初始化metric服务
7、启动BlobCacheService服务,做文件缓存的服务。
8、构建了一个TaskExecutorService,内部封装了TaskExecutor。
在这个构造方法里,主要做了两件事:
1、初始化了一个持久化Blob缓存服务
2、初始化了一个临时Blob缓存服务
在这两个服务的内部,都会在启动的时候启动一个定时服务,就是将过期的某个Job的对应资源都删除掉。
public BlobCacheService(
final Configuration blobClientConfig,
final Reference<File> storageDir,
final BlobView blobView,
@Nullable final InetSocketAddress serverAddress)
throws IOException {
this(
// 持久化Blob缓存服务
new PermanentBlobCache(blobClientConfig, storageDir, blobView, serverAddress),
// 临时Blob缓存服务
new TransientBlobCache(blobClientConfig, storageDir, serverAddress));
}
以 PermanentBlobCache 为例
@VisibleForTesting
public PermanentBlobCache(
final Configuration blobClientConfig,
final Reference<File> storageDir,
final BlobView blobView,
@Nullable final InetSocketAddress serverAddress,
BlobCacheSizeTracker blobCacheSizeTracker)
throws IOException {
super(
blobClientConfig,
storageDir,
blobView,
LoggerFactory.getLogger(PermanentBlobCache.class),
serverAddress);
// Initializing the clean up task
this.cleanupTimer = new Timer(true);
this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
// TODO 启动定时任务
this.cleanupTimer.schedule(
new PermanentBlobCleanupTask(), // 任务
cleanupInterval, cleanupInterval);
// TODO 为永久BLOB文件提供缓存,包括每个作业的引用计数和分阶段清理。
this.blobCacheSizeTracker = blobCacheSizeTracker;
registerDetectedJobs();
}
我们可以看到有以下操作:
1、首先在方法里通过引用计数的方式,获取所有job引用的资源文件。
2、遍历这些文件,并判断是否过期。
3、如果过期则删除该资源文件夹。
在临时缓存blob服务中也是一样的工作
接下来到了重要环节
// org.apache.flink.runtime.taskexecutor.TaskManagerRunner#createTaskExecutorService
public static TaskExecutorService createTaskExecutorService(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
BlobCacheService blobCacheService,
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
WorkingDirectory workingDirectory,
FatalErrorHandler fatalErrorHandler)
throws Exception {
final TaskExecutor taskExecutor =
startTaskManager(
configuration,
resourceID,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
localCommunicationOnly,
externalResourceInfoProvider,
workingDirectory,
fatalErrorHandler);
/*
TODO 封装了一下TaskExecutor
TaskExecutor是TaskExecutorToServiceAdapter的成员变量
TaskExecutorToServiceAdapter是TaskManagerRunner的成员变量
*/
return TaskExecutorToServiceAdapter.createFor(taskExecutor);
}
可以看到在这里真正初始化了一个TaskExecutor,并将TaskExecutor封装了一下,我们首先来看TaskExecutor的初始化,我们进入startTaskManager方法:
在该方法内部依然是初始化了一些基础服务: