• 「Netflix Hollow系列」深入分析Hollow生产消费模型


    前言

    在前面的文章中为大家详细介绍了Hollow的数据模型以及深入分析了Hollow的内存布局。本文将详细介绍Hollow的生产消费模型。其中,Producer和Consumer概念是整个项目的核心,而HollowProducer和HollowConsumer分别代表Hollow的生产者和消费者。

    希望大家通过本文可以充分的了解Hollow的生产消费模型。本文中会涉及很多Hollow中定义的名词,可以参考【Netflix Hollow系列】Hollow的一些术语。

    生产者与消费者

    在进程世界里,生产者就是生产数据的进程,消费者就是消费数据的进程。在多进程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。

    为了解决这个问题于是引入了生产者和消费者的模式。本小节,首先将通过介绍操作系统和Kafka来介绍典型的生产消费模型。

    操作系统的生产者消费者模型

    首先我们来看下操作系统中的生产者消费者模型,生产者消费者问题是多个进程因共享一个缓存区而产生的相互依赖的问题。生产者进程需要向缓冲区中存放数据,消费者进程从共享缓存中取数据。当缓存区满的时候,生产者进程需要等待消费者进程取走数据,当缓存区空的时候,消费者进程需要等待生产者放入数据。

    上述进程间的合作和依赖关系在整个计算机系统中非常普遍,比较典型的就是磁盘驱动程序和上层应用通过共享缓存区来交换数据。具体来讲,磁盘驱动程序是生产者进程,要将磁盘中读取的数据存放在共享缓存区中,上层应用程序是消费者进程,要从共享缓存区中取走数据并交给更上层的应用或用户来处理。

    生产者和消费者之间如何通信呢?是的,信号量,操作系统通过信号量来解决互斥和同步的问题。

    • 互斥:有节缓冲区是一个临界资源,对临界资源的访问需要设置一个信号量mutex。
    • 同步:设置两个同步信号量empty和full,其初值分别为n、0,用于临界资源的具体数目,同时也可以表示等待条件。

    kafka的生产者消费者模型

    kafka流行的消息中间件,最初由LinkedIn开源,生产者生产(produce)各种信息,消费者消费(consume)(处理分析)这些信息,而在生产者与消费者之间,需要一个沟通两者的桥梁-消息系统。从一个微观层面来说,这种需求也可理解为不同的系统之间如何传递消息。

    kafka实现了两种消息订阅模式:

    点对点模式

    基于拉取或者轮询的消息传送模型,这个模型的特点是发送到队列的消息被一个且只有一个消费者进行处理。生产者将消息放入消息队列后,由消费者主动地去拉取消息进行消费。 优点是消费者拉取消息的频率可以由自己控制。但是消息队列是否有消息需要消费,在消费者端无法感知,所以在消费者端需要额外的线程去监控。

    发布订阅模式

    发布订阅模式是一个基于消息推送的消息模型,这种模型可以有多种不同的订阅者。生产者将消息放入消息队列后,队列会将消息推送给订阅过该类消息的消费者。由于是消费者被动接收推送,所以无需感知消息队列是否有待消费的消息!

    但是各个消费者由于机器性能不一样,所以处理消息的能力也会不一样,但消息队列却无法感知消费者消费的速度!所以推送的速度成了发布订阅模式的一个问题!假设三个消费者处理速度分别是3M/s、2M/s、1M/s,如果队列推送的速度为2M/s,则第三个消费者将无法承受!如果队列推送的速度为1M/s,则前两个消费者会出现资源的极大浪费!

    生产者消费者模型实现方式

    实现生产者消费者模型会有两种方式:

    • wait—notify 方式实现生产者消费者模型,也就是上文中kafka中实现的点对点模式(注意这里需要加同步锁 synchronized)。
    • 阻塞队列 方式实现生产者消费者模式,也就是上文中kafka实现的发布订阅模型。

    以上两种方式也称之为 pull 和 push 方式,在不同的系统中可能有不同的名称或概念。

    Hollow实现的生产者消费者

    HollowProducer和HollowConsumer分别代表Hollow的生产者和消费者,可以直接使用HollowProducer和HollowConsumer完成生产和消费的任务。

    其中HollowReadStateEngine和HollowWriteStateEngine分别是HollowProducer和HollowConsumer的底层数据处理类,如果觉得Hollow提供的Producer和Consumer不满足需求,可以基于HollowReadStateEngine和HollowWriteStateEngine实现自己的Producer和Consumer。

    Producer与Consumer交互

    通过上文介绍可以看出,Producer与Consumer之间的联系首先通过Announcer和AnnouncementWatcher相互关联的。但是具体的两者之间如何实现交互呢?

    Producer通过Announcer发出通知,但是Hollow并没有限定如何通知,可以通过Producer主动推送消息到Consumer,也可以由Consumer通过不断的轮训获取通知。

    Consumer借助于AnnouncementWatcher接收通知消息,Consumer接收消息的通知方式由Producer提供的消息发送方式决定,可以通过Pull或Push的方式完成。

    如Hollow默认提供的
    HollowFilesystemAnnouncementWatcher就是通过定义一个名为Watch的Runnable,不断的扫描Producer存放文件的文件夹判断是否有新的Blob产生。我们也可以自己实现如S3AnnouncementWatcher、MySQLAnnouncementWatcher等,实现HollowConsumer.AnnouncementWatcher的subscribeToUpdates方法,并定义自己的通知机制即可。

    Push方式

    使用Push方式Producer和Consumer之间通信,通常会有使用消息队列的方式完成。其中流行的消息中间件包括:

    • RabbitMQ
    • Kafka
    • RocketMQ
    • QMQ

    Pull方式

    使用Pull方式Producer和Consumer之间通信,在Consumer轮询Producer的特定接口。这里会涉及两部分中间件:

    轮询方式

    Consumer的轮询可以借助于定时器实现,比较典型的包括:

    • JDK的ScheduledExecutorServiceTimer
    • 定时器中间件:quartz

    Consumer使用定时器轮询Producer提供的RPC接口。

    RPC

    RPC的方式有很多,典型的直接使用HTTP、TCP访问,也可以使用中间件Dubbo等实现。

    HollowProducer

    HollowProducer的负责数据的生产工作,Hollow在HollowProducer中定义了Producer职责,除了核心的Publisher外,还定义了额外的附加机制,如下:

    • 发布者(BlobPublisher)
    • 通知(Announcer)
    • Validator
    • Listener
    • 文件存储目录
    • BLOB压缩器
    • 按照BLOB版本查询器
    • 自定义Executor
    • 全量SNAPSHOT版本保留的份数
    • 清理机制
    • 监控收集器

    Producer按照一定周期生成全量以及增量数据,在每个周期结束前会创建一个数据集(可以称之为BLOB),并将这个数据集合发送到某种存储介质上(可以是本地磁盘、S3、DB、Redis等),最后发布一个通知,告知Consumer数据已经Ready,可以来获取数据了。

    可以使用如下方式初始化Producer,除Publisher外,其他都是可选项。

    1. HollowProducer producer = HollowProducer
    2. .withPublisher(publisher) /// required: a BlobPublisher
    3. .withAnnouncer(announcer) /// optional: an Announcer
    4. .withValidators(validators) /// optional: one or more Validator
    5. .withListeners(listeners) /// optional: one or more HollowProducerListeners
    6. .withBlobStagingDir(dir) /// optional: a java.io.File
    7. .withBlobCompressor(compressor) /// optional: a BlobCompressor
    8. .withBlobStager(stager) /// optional: a BlobStager
    9. .withSnapshotPublishExecutor(e) /// optional: a java.util.concurrent.Executor
    10. .withNumStatesBetweenSnapshots(n) /// optional: an int
    11. .withTargetMaxTypeShardSize(size) /// optional: a long
    12. .withBlobStorageCleaner(blobStorageCleaner) /// optional: a BlobStorageCleaner
    13. .withMetricsCollector(hollowMetricsCollector) /// optional: a HollowMetricsCollector<HollowProducerMetrics>
    14. .build();

    接下来我们详细介绍HollowProducer的属性

    Properties

    首先让我们看下HollowProducer的构造方法。可以看出属性在上文中已经有所阐述,还请大家特别留意HollowWriteStateEngineHollowWriteStateEngineHollowProducer的底层核心类,负责数据的写入职责。本文中不再详细展开,后续会有专门的文章进行分析,也包括下文中HollowConsumer的底层类HollowReadStateEngine

    1. private AbstractHollowProducer(
    2. HollowProducer.BlobStager blobStager,
    3. HollowProducer.Publisher publisher,
    4. HollowProducer.Announcer announcer,
    5. List<? extends HollowProducerEventListener> eventListeners,
    6. HollowProducer.VersionMinter versionMinter,
    7. Executor snapshotPublishExecutor,
    8. int numStatesBetweenSnapshots,
    9. long targetMaxTypeShardSize,
    10. boolean focusHoleFillInFewestShards,
    11. HollowMetricsCollector<HollowProducerMetrics> metricsCollector,
    12. HollowProducer.BlobStorageCleaner blobStorageCleaner,
    13. SingleProducerEnforcer singleProducerEnforcer,
    14. HollowObjectHashCodeFinder hashCodeFinder,
    15. boolean doIntegrityCheck) {
    16. this.publisher = publisher;
    17. this.announcer = announcer;
    18. this.versionMinter = versionMinter;
    19. this.blobStager = blobStager;
    20. this.singleProducerEnforcer = singleProducerEnforcer;
    21. this.snapshotPublishExecutor = snapshotPublishExecutor;
    22. this.numStatesBetweenSnapshots = numStatesBetweenSnapshots;
    23. this.hashCodeFinder = hashCodeFinder;
    24. this.doIntegrityCheck = doIntegrityCheck;
    25. HollowWriteStateEngine writeEngine = hashCodeFinder == null
    26. ? new HollowWriteStateEngine()
    27. : new HollowWriteStateEngine(hashCodeFinder);
    28. writeEngine.setTargetMaxTypeShardSize(targetMaxTypeShardSize);
    29. writeEngine.setFocusHoleFillInFewestShards(focusHoleFillInFewestShards);
    30. this.objectMapper = new HollowObjectMapper(writeEngine);
    31. if (hashCodeFinder != null) {
    32. objectMapper.doNotUseDefaultHashKeys();
    33. }
    34. this.readStates = ReadStateHelper.newDeltaChain();
    35. this.blobStorageCleaner = blobStorageCleaner;
    36. this.listeners = new ProducerListenerSupport(eventListeners.stream().distinct().collect(toList()));
    37. this.metrics = new HollowProducerMetrics();
    38. this.metricsCollector = metricsCollector;
    39. }

    HollowProducer有很多的get或set方法,本文着重介绍两个重载的initializeDataModel方法。当服务启动时,HollowProducer可以基于指定的HolowSchema初始化数据模型,也可以使用具体的class初始化数据模型。

    这确保恢复可以正确地将生产者的当前数据模型与恢复的数据状态的数据模型进行比较,并管理这些模型中的任何差异(例如不恢复生产者当前数据模型中不存在的恢复数据模型中的任何类型的状态) )。初始化后,将向所有已注册的数据模型初始化侦听器
    DataModelInitializationListener发出数据模型初始化事件。

    1. @Override
    2. public void initializeDataModel(Class<?>... classes) {
    3. super.initializeDataModel(classes);
    4. }
    5. @Override
    6. public void initializeDataModel(HollowSchema... schemas) {
    7. super.initializeDataModel(schemas);
    8. }

    Populator

    表示在 HollowProducer 循环中填充新数据状态的任务。

    1. @FunctionalInterface
    2. public interface Populator {
    3. void populate(HollowProducer.WriteState newState) throws Exception;
    4. }

    WriteState

    WriteState负责向StateEngine写入数据,

    1. /**
    2. * Representation of new write state.
    3. */
    4. public interface WriteState {
    5. int add(Object o) throws IllegalStateException;
    6. HollowObjectMapper getObjectMapper() throws IllegalStateException;
    7. HollowWriteStateEngine getStateEngine() throws IllegalStateException;
    8. ReadState getPriorState() throws IllegalStateException;
    9. long getVersion() throws IllegalStateException;
    10. }

    ReadState

    从新写入状态的数量计算的读取状态表示。负责提供HollowProducer读取当前数据状态的接口。

    1. public interface ReadState {
    2. long getVersion();
    3. HollowReadStateEngine getStateEngine();
    4. }

    BlobStager

    BlobStager提供了读取全量、增量、反向增量以及头数据的接口,参数是具体的版本号。

    1. public interface BlobStager {
    2. HollowProducer.Blob openSnapshot(long version);
    3. HollowProducer.HeaderBlob openHeader(long version);
    4. HollowProducer.Blob openDelta(long fromVersion, long toVersion);
    5. HollowProducer.Blob openReverseDelta(long fromVersion, long toVersion);
    6. }

    BlobCompressor

    1. public interface BlobCompressor {
    2. BlobCompressor NO_COMPRESSION = new BlobCompressor() {
    3. @Override
    4. public OutputStream compress(OutputStream os) {
    5. return os;
    6. }
    7. @Override
    8. public InputStream decompress(InputStream is) {
    9. return is;
    10. }
    11. };
    12. /**
    13. * This method provides an opportunity to wrap the OutputStream used to write the blob (e.g. with a GZIPOutputStream).
    14. *
    15. * @param is the uncompressed output stream
    16. * @return the compressed output stream
    17. */
    18. OutputStream compress(OutputStream is);
    19. /**
    20. * This method provides an opportunity to wrap the InputStream used to write the blob (e.g. with a GZIPInputStream).
    21. *
    22. * @param is the compressed input stream
    23. * @return the uncompressed input stream
    24. */
    25. InputStream decompress(InputStream is);
    26. }

    BlobPublisher(Publisher)

    BlobPublisher负责将数据集打包并发布到存储中。Hollow提供的Publisher默认实现为HollowFilesystemPublisher。 举例来讲,通过BlobPublisher可以将一份全量的城市数据,写入到文件中,并将文件上传的S3.

    1. public interface Publisher {
    2. /**
    3. * Deprecated - Should use {@link Publisher#publish(PublishArtifact)} instead.<p>
    4. * Publish the blob specified to this publisher's blobstore.
    5. * <p>
    6. * It is guaranteed that {@code blob} was created by calling one of
    7. * {@link BlobStager#openSnapshot(long)}, {@link BlobStager#openDelta(long, long)}, or
    8. * {@link BlobStager#openReverseDelta(long, long)} on this publisher.
    9. *
    10. * @param blob the blob to publish
    11. */
    12. @Deprecated
    13. default void publish(HollowProducer.Blob blob) {
    14. publish((PublishArtifact)blob);
    15. }
    16. /**
    17. * Publish the blob specified to this publisher's blobstore.
    18. * <p>
    19. * It is guaranteed that {@code blob} was created by calling one of
    20. * {@link BlobStager#openSnapshot(long)}, {@link BlobStager#openDelta(long, long)},
    21. * {@link BlobStager#openReverseDelta(long, long)}, or
    22. * {@link BlobStager#openHeader(long)} on this publisher.
    23. *
    24. * @param publishArtifact the blob to publish
    25. */
    26. default void publish(HollowProducer.PublishArtifact publishArtifact) {
    27. if (publishArtifact instanceof HollowProducer.Blob) {
    28. publish((HollowProducer.Blob)publishArtifact);
    29. }
    30. }
    31. }

    PublishArtifact

    1. public interface PublishArtifact {
    2. void cleanup();
    3. void write(HollowBlobWriter blobWriter) throws IOException;
    4. InputStream newInputStream() throws IOException;
    5. @Deprecated
    6. default File getFile() {
    7. throw new UnsupportedOperationException("File is not available");
    8. }
    9. default Path getPath() {
    10. throw new UnsupportedOperationException("Path is not available");
    11. }
    12. }
    13. public static abstract class HeaderBlob implements PublishArtifact{
    14. protected final long version;
    15. protected HeaderBlob(long version) {
    16. this.version = version;
    17. }
    18. public long getVersion() {
    19. return version;
    20. }
    21. }
    22. public static abstract class Blob implements PublishArtifact {
    23. protected final long fromVersion;
    24. protected final long toVersion;
    25. protected final Blob.Type type;
    26. protected final ProducerOptionalBlobPartConfig optionalPartConfig;
    27. protected Blob(long fromVersion, long toVersion, Blob.Type type) {
    28. this(fromVersion, toVersion, type, null);
    29. }
    30. protected Blob(long fromVersion, long toVersion, Blob.Type type, ProducerOptionalBlobPartConfig optionalPartConfig) {
    31. this.fromVersion = fromVersion;
    32. this.toVersion = toVersion;
    33. this.type = type;
    34. this.optionalPartConfig = optionalPartConfig;
    35. }
    36. public InputStream newOptionalPartInputStream(String partName) throws IOException {
    37. throw new UnsupportedOperationException("The provided HollowProducer.Blob implementation does not support optional blob parts");
    38. }
    39. public Path getOptionalPartPath(String partName) {
    40. throw new UnsupportedOperationException("The provided HollowProducer.Blob implementation does not support optional blob parts");
    41. }
    42. public Type getType() {
    43. return this.type;
    44. }
    45. public long getFromVersion() {
    46. return this.fromVersion;
    47. }
    48. public long getToVersion() {
    49. return this.toVersion;
    50. }
    51. public ProducerOptionalBlobPartConfig getOptionalPartConfig() {
    52. return optionalPartConfig;
    53. }
    54. /**
    55. * Hollow blob types are {@code SNAPSHOT}, {@code DELTA} and {@code REVERSE_DELTA}.
    56. */
    57. public enum Type {
    58. SNAPSHOT("snapshot"),
    59. DELTA("delta"),
    60. REVERSE_DELTA("reversedelta");
    61. public final String prefix;
    62. Type(String prefix) {
    63. this.prefix = prefix;
    64. }
    65. }
    66. }

    Announcer

    Announcer负责通知的职责,可以通过Announcer获取到当前最准确的数据集信息(包括全量版本、增量版本、反向增量版本)。

    Announcer被定义为一个Interface,在使用过程中需要实现具体的Announcer的announce方法。Hollow提供的Announcer默认实现为HollowFilesystemAnnouncer。

    举例来讲,可以借助QMQ实现一个Announcer,当数据集构建完成后,通过QMQ将消息发送给consumer;另外一个例子,也可以将Announcer具体实现为一个RPC,由consumer来调用。

    1. public interface Announcer {
    2. void announce(long stateVersion);
    3. default void announce(long stateVersion, Map<String, String> metadata) {
    4. announce(stateVersion);
    5. }
    6. }

    HollowWriteStateEngine 在两个阶段之间来回循环:

    1. 添加记录(add HollowRecord)
    2. 写状态(write State)

    Version

    Hollow默认提供的版本管理是由Producer的VersionMinuter负责管理的。version = 时间戳 + 重复的 3 位序列号

    1. public interface VersionMinter {
    2. /**
    3. * Create a new state version.
    4. * <p>
    5. * State versions should be ascending -- later states have greater versions.<p>
    6. *
    7. * @return a new state version
    8. */
    9. long mint();
    10. }

    BlobStorageCleaner

    BlobStorageCleaner负责BLOB文件的清理工作,清理的规则可以通过实现接口定义。

    1. public static abstract class BlobStorageCleaner {
    2. public void clean(Blob.Type blobType) {
    3. switch (blobType) {
    4. case SNAPSHOT:
    5. cleanSnapshots();
    6. break;
    7. case DELTA:
    8. cleanDeltas();
    9. break;
    10. case REVERSE_DELTA:
    11. cleanReverseDeltas();
    12. break;
    13. }
    14. }
    15. /**
    16. * This method provides an opportunity to remove old snapshots.
    17. */
    18. public abstract void cleanSnapshots();
    19. /**
    20. * This method provides an opportunity to remove old deltas.
    21. */
    22. public abstract void cleanDeltas();
    23. /**
    24. * This method provides an opportunity to remove old reverse deltas.
    25. */
    26. public abstract void cleanReverseDeltas();
    27. }

    Incremental

    Incremental是处理Hollow增量数据的核心接口,填充给定数据更改(添加、修改或删除)的空心数据的生产者,称为增量生产。 HollowProducer填充所有给定的数据,然后确定更改。

    通常在其他方面HollowProducer 和
    HollowProducer.Incremental 在Publish和Announcer的实现上具有相同的行为。如果需要特殊的指定,则可以通过实现Incremental接口完成即可。

    1. public static class Incremental extends AbstractHollowProducer {
    2. protected Incremental(Builder<?> b) {
    3. super(b);
    4. }
    5. @Override
    6. public HollowProducer.ReadState restore(long versionDesired, HollowConsumer.BlobRetriever blobRetriever) {
    7. return super.hardRestore(versionDesired, blobRetriever);
    8. }
    9. public long runIncrementalCycle(Incremental.IncrementalPopulator task) {
    10. return runCycle(task, null);
    11. }
    12. @FunctionalInterface
    13. public interface IncrementalPopulator {
    14. void populate(IncrementalWriteState state) throws Exception;
    15. }
    16. public interface IncrementalWriteState {
    17. void addOrModify(Object o);
    18. void addIfAbsent(Object o);
    19. void delete(Object o);
    20. void delete(RecordPrimaryKey key);
    21. }
    22. }

    至此,HollowProducer的全部功能基本都已经介绍完了,接下来,将介绍HollowConsumer的实现。

    HollowConsumer

    HollowConsumer的负责数据的消费工作,Hollow在HollowConsumer中定义了Consumer职责,除了核心的Retriever外,还定义了额外的附加机制,如下:

    • 文件接收器(BlobRetriever)
    • 通知观察者,监听器(AnnouncementWatcher)
    • 本地磁盘目录,接收到BLOB文件后,需要存储的路径
    • RefreshListener
    • generated client,这个是一个重点的概念,简单理解为本身HollowConsumer会提供HollowAPI,通过HollowAPI可以访问Hollow编码或压缩后的数据,无需自己再实现类似于Map的缓存结构。
    • FilterConfig,过滤器
    • DoubleSnapshotConfig,是否保存双份全量数据
    • 自定义Executor
    • 监控收集器

    Consumer需要始终与Producer保持一定的联系,以便于让自身的数据保持与Producer同步。Consumer在初始化时需要从Producer获取一份全量数据,再初始化完成后,为保持数据的新鲜度,需要不断的消费增量数据。

    可以使用如下方式初始化HollowConsumer,除BlobRetriever外,其他都是可选项。

    1. HollowConsumer consumer = HollowConsumer
    2. .withBlobRetriever(blobRetriever) /// required: a BlobRetriever
    3. .withLocalBlobStore(localDiskDir) /// optional: a local disk location
    4. .withAnnouncementWatcher(announcementWatcher) /// optional: a AnnouncementWatcher
    5. .withRefreshListener(refreshListener) /// optional: a RefreshListener
    6. .withGeneratedAPIClass(MyGeneratedAPI.class) /// optional: a generated client API class
    7. .withFilterConfig(filterConfig) /// optional: a HollowFilterConfig
    8. .withDoubleSnapshotConfig(doubleSnapshotCfg) /// optional: a DoubleSnapshotConfig
    9. .withObjectLongevityConfig(objectLongevityCfg) /// optional: an ObjectLongevityConfig
    10. .withObjectLongevityDetector(detector) /// optional: an ObjectLongevityDetector
    11. .withRefreshExecutor(refreshExecutor) /// optional: an Executor
    12. .withMetricsCollector(hollowMetricsCollector) /// optional: a HollowMetricsCollector<HollowConsumerMetrics>
    13. .build();

    Properties

    HollowConsumer的构造方法。

    1. protected <B extends Builder<B>> HollowConsumer(B builder) {
    2. // duplicated with HollowConsumer(...) constructor above. We cannot chain constructor calls because that
    3. // constructor subscribes to the announcement watcher and we have more setup to do first
    4. this.metrics = new HollowConsumerMetrics();
    5. this.updater = new HollowClientUpdater(builder.blobRetriever,
    6. builder.refreshListeners,
    7. builder.apiFactory,
    8. builder.doubleSnapshotConfig,
    9. builder.hashCodeFinder,
    10. builder.memoryMode,
    11. builder.objectLongevityConfig,
    12. builder.objectLongevityDetector,
    13. metrics,
    14. builder.metricsCollector);
    15. updater.setFilter(builder.typeFilter);
    16. if(builder.skipTypeShardUpdateWithNoAdditions)
    17. updater.setSkipShardUpdateWithNoAdditions(true);
    18. this.announcementWatcher = builder.announcementWatcher;
    19. this.refreshExecutor = builder.refreshExecutor;
    20. this.refreshLock = new ReentrantReadWriteLock();
    21. this.memoryMode = builder.memoryMode;
    22. if (announcementWatcher != null)
    23. announcementWatcher.subscribeToUpdates(this);
    24. }

    HollowConsumer也有众多的get和set方法,本文着重介绍getInitialLoad方法,此方式实现类在Consumer端的异步刷新功能,对于消费者消费数据至关重要。

    1. /**
    2. * Returns a {@code CompletableFuture} that completes after the initial data load succeeds. Also triggers the initial
    3. * load asynchronously, to avoid waiting on a polling interval for the initial load.
    4. * <p>
    5. * Callers can use methods like {@link CompletableFuture#join()} or {@link CompletableFuture#get(long, TimeUnit)}
    6. * to block until the initial load is complete.
    7. * <p>
    8. * A failure during the initial load <em>will not</em> cause the future to complete exceptionally; this allows
    9. * for a subsequent data version to eventually succeed.
    10. * <p>
    11. * In a consumer without published or announced versions – or one that always fails the initial load – the future
    12. * will remain incomplete indefinitely.
    13. *
    14. * @return a future which, when completed, has a value set to the data version that was initially loaded
    15. */
    16. public CompletableFuture<Long> getInitialLoad() {
    17. try {
    18. triggerAsyncRefresh();
    19. } catch (RejectedExecutionException | NullPointerException e) {
    20. LOG.log(Level.INFO, "Refresh triggered by getInitialLoad() failed; future attempts might succeed", e);
    21. }
    22. return updater.getInitialLoad();
    23. }

    Refresh Trigger

    Refresh Trigger负责触发刷新到
    HollowConsumer.AnnouncementWatcher 指定的最新版本。

    • 如果已经在最新版本上,则此操作是无操作的。
    • 如果 HollowConsumer.AnnouncementWatcher 不存在,则此调用会触发刷新到 Blob 存储中可用的最新版本。

    特别注意Refresh Trigger都是阻塞的操作。

    1. public void triggerRefresh() {
    2. refreshLock.writeLock().lock();
    3. try {
    4. updater.updateTo(announcementWatcher == null ? Long.MAX_VALUE : announcementWatcher.getLatestVersion());
    5. } catch (Error | RuntimeException e) {
    6. throw e;
    7. } catch (Throwable t) {
    8. throw new RuntimeException(t);
    9. } finally {
    10. refreshLock.writeLock().unlock();
    11. }
    12. }
    13. public void triggerAsyncRefresh() {
    14. triggerAsyncRefreshWithDelay(0);
    15. }
    16. public void triggerAsyncRefreshWithDelay(int delayMillis) {
    17. final long targetBeginTime = System.currentTimeMillis() + delayMillis;
    18. refreshExecutor.execute(() -> {
    19. try {
    20. long delay = targetBeginTime - System.currentTimeMillis();
    21. if (delay > 0)
    22. Thread.sleep(delay);
    23. } catch (InterruptedException e) {
    24. // Interrupting, such as shutting down the executor pool,
    25. // cancels the trigger
    26. LOG.log(Level.INFO, "Async refresh interrupted before trigger, refresh cancelled", e);
    27. return;
    28. }
    29. try {
    30. triggerRefresh();
    31. } catch (Error | RuntimeException e) {
    32. // Ensure exceptions are propagated to the executor
    33. LOG.log(Level.SEVERE, "Async refresh failed", e);
    34. throw e;
    35. }
    36. });
    37. }
    38. public void triggerRefreshTo(long version) {
    39. if (announcementWatcher != null)
    40. throw new UnsupportedOperationException("Cannot trigger refresh to specified version when a HollowConsumer.AnnouncementWatcher is present");
    41. try {
    42. updater.updateTo(version);
    43. } catch (Error | RuntimeException e) {
    44. throw e;
    45. } catch (Throwable t) {
    46. throw new RuntimeException(t);
    47. }
    48. }

    BlobRetriever

    BlobRetriever与BlobPublisher相对应,BlobRetriever负责从存储介质中接收数据,需要与BlobPublisher有相对应的读数据的能力,BlobRetriever的方法参数都是具体的数据集版本(可以是全量版本,也可以是增量版本)。Hollow提供的BlobRetriever默认实现为
    HollowFilesystemBlobRetriever。

    举例来讲,我们将文件上传的S3后,将有BlobRetriever负责从S3上下载文件,并加载到内存中。

    1. /**
    2. * An interface which defines the necessary interactions of Hollow with a blob data store.
    3. * <p>
    4. * Implementations will define how to retrieve blob data from a data store.
    5. */
    6. public interface BlobRetriever {
    7. /**
    8. * Returns the snapshot for the state with the greatest version identifier which is equal to or less than the desired version
    9. * @param desiredVersion the desired version
    10. * @return the blob of the snapshot
    11. */
    12. HollowConsumer.Blob retrieveSnapshotBlob(long desiredVersion);
    13. /**
    14. * Returns a delta transition which can be applied to the specified version identifier
    15. * @param currentVersion the current version
    16. * @return the blob of the delta
    17. */
    18. HollowConsumer.Blob retrieveDeltaBlob(long currentVersion);
    19. /**
    20. * Returns a reverse delta transition which can be applied to the specified version identifier
    21. * @param currentVersion the current version
    22. * @return the blob of the reverse delta
    23. */
    24. HollowConsumer.Blob retrieveReverseDeltaBlob(long currentVersion);
    25. default Set<String> configuredOptionalBlobParts() {
    26. return null;
    27. }
    28. default HollowConsumer.HeaderBlob retrieveHeaderBlob(long currentVersion) {
    29. throw new UnsupportedOperationException();
    30. }
    31. }

    Blob

    Blob 的全称是Binary large Object。在Hollow中特指一个经过压缩编码后的数据集合。HollowConsumer中特指的是带有版本号的BLOB文件,为什么要带版本号呢?作为消费端如果没有Version概念,数据本身是没有意义的,以为我们不知道收到的这份数据是应该叠加在哪个已有数据版本的基础上。

    1. protected interface VersionedBlob {
    2. InputStream getInputStream() throws IOException;
    3. default File getFile() throws IOException {
    4. throw new UnsupportedOperationException();
    5. }
    6. }
    1. public static abstract class HeaderBlob implements VersionedBlob{
    2. private final long version;
    3. protected HeaderBlob(long version) {
    4. this.version = version;
    5. }
    6. public long getVersion() {
    7. return this.version;
    8. }
    9. }
    1. public static abstract class Blob implements VersionedBlob{
    2. protected final long fromVersion;
    3. protected final long toVersion;
    4. private final BlobType blobType;
    5. public Blob(long toVersion) {
    6. this(HollowConstants.VERSION_NONE, toVersion);
    7. }
    8. public Blob(long fromVersion, long toVersion) {
    9. this.fromVersion = fromVersion;
    10. this.toVersion = toVersion;
    11. if (this.isSnapshot())
    12. this.blobType = BlobType.SNAPSHOT;
    13. else if (this.isReverseDelta())
    14. this.blobType = BlobType.REVERSE_DELTA;
    15. else
    16. this.blobType = BlobType.DELTA;
    17. }
    18. public abstract InputStream getInputStream() throws IOException;
    19. public OptionalBlobPartInput getOptionalBlobPartInputs() throws IOException {
    20. return null;
    21. }
    22. public enum BlobType {
    23. SNAPSHOT("snapshot"),
    24. DELTA("delta"),
    25. REVERSE_DELTA("reversedelta");
    26. private final String type;
    27. BlobType(String type) {
    28. this.type = type;
    29. }
    30. public String getType() {
    31. return this.type;
    32. }
    33. }
    34. public boolean isSnapshot() {
    35. return fromVersion == HollowConstants.VERSION_NONE;
    36. }
    37. public boolean isReverseDelta() {
    38. return toVersion < fromVersion;
    39. }
    40. public boolean isDelta() {
    41. return !isSnapshot() && !isReverseDelta();
    42. }
    43. public long getFromVersion() {
    44. return fromVersion;
    45. }
    46. public long getToVersion() {
    47. return toVersion;
    48. }
    49. public BlobType getBlobType() {
    50. return blobType;
    51. }
    52. }

    AnnouncementWatcher

    AnnouncementWatcher与Producer中介绍的Announcer相对应,Announcer负责发送通知,AnnouncementWatcher负责监控通知内容,一旦收到通知,即可调用BlobRetriever将数据接收到内存。Hollow提供的AnnouncementWatcher默认实现为
    HollowFilesystemAnnouncementWatcher。

    举例来讲,AnnouncementWatcher的实现可以由QMQ的consumer来完成。

    1. /**
    2. * Implementations of this class are responsible for two things:
    3. * <p>
    4. * 1) Tracking the latest announced data state version.
    5. * 2) Keeping the client up to date by calling triggerAsyncRefresh() on self when the latest version changes.
    6. * <p>
    7. * If an AnnouncementWatcher is provided to a HollowConsumer, then calling HollowConsumer#triggerRefreshTo() is unsupported.
    8. */
    9. public interface AnnouncementWatcher {
    10. long NO_ANNOUNCEMENT_AVAILABLE = HollowConstants.VERSION_NONE;
    11. /**
    12. * @return the latest announced version.
    13. */
    14. long getLatestVersion();
    15. /**
    16. * Implementations of this method should subscribe a HollowConsumer to updates to announced versions.
    17. * <p>
    18. * When announcements are received via a push mechanism, or polling reveals a new version, a call should be placed to one
    19. * of the flavors of {@link HollowConsumer#triggerRefresh()} on the provided HollowConsumer.
    20. *
    21. * @param consumer the hollow consumer
    22. */
    23. void subscribeToUpdates(HollowConsumer consumer);
    24. }

    DoubleSnapshotConfig

    是否保留双份全量数据,以及最大增量版本数量配置。

    1. public interface DoubleSnapshotConfig {
    2. boolean allowDoubleSnapshot();
    3. int maxDeltasBeforeDoubleSnapshot();
    4. DoubleSnapshotConfig DEFAULT_CONFIG = new DoubleSnapshotConfig() {
    5. @Override
    6. public int maxDeltasBeforeDoubleSnapshot() {
    7. return 32;
    8. }
    9. @Override
    10. public boolean allowDoubleSnapshot() {
    11. return true;
    12. }
    13. };
    14. }

    Longevity

    Hollow池化的配置管理,此章节要结合前一篇文章【Netflix Hollow系列】深入分析Hollow内存布局一起看会更加容易理解些。

    1. public interface ObjectLongevityConfig {
    2. /**
    3. * @return whether or not long-lived object support is enabled.
    4. * <p>
    5. * Because Hollow reuses pooled memory, if references to Hollow records are held too long, the underlying data may
    6. * be overwritten. When long-lived object support is enabled, Hollow records referenced via a {@link HollowAPI} will,
    7. * after an update, be backed by a reserved copy of the data at the time the reference was created. This guarantees
    8. * that even if a reference is held for a long time, it will continue to return the same data when interrogated.
    9. * <p>
    10. * These reserved copies are backed by the {@link HollowHistory} data structure.
    11. */
    12. boolean enableLongLivedObjectSupport();
    13. boolean enableExpiredUsageStackTraces();
    14. /**
    15. * @return if long-lived object support is enabled, the number of milliseconds before the {@link StaleHollowReferenceDetector}
    16. * will begin flagging usage of stale objects.
    17. */
    18. long gracePeriodMillis();
    19. /**
    20. * @return if long-lived object support is enabled, the number of milliseconds, after the grace period, during which
    21. * data is still available in stale references, but usage will be flagged by the {@link StaleHollowReferenceDetector}.
    22. * <p>
    23. * After the grace period + usage detection period have expired, the data from stale references will become inaccessible if
    24. * dropDataAutomatically() is enabled.
    25. */
    26. long usageDetectionPeriodMillis();
    27. /**
    28. * @return whether or not to drop data behind stale references after the grace period + usage detection period has elapsed, assuming
    29. * that no usage was detected during the usage detection period.
    30. */
    31. boolean dropDataAutomatically();
    32. /**
    33. * @return whether data is dropped even if flagged during the usage detection period.
    34. */
    35. boolean forceDropData();
    36. ObjectLongevityConfig DEFAULT_CONFIG = new ObjectLongevityConfig() {
    37. @Override
    38. public boolean enableLongLivedObjectSupport() {
    39. return false;
    40. }
    41. @Override
    42. public boolean dropDataAutomatically() {
    43. return false;
    44. }
    45. @Override
    46. public boolean forceDropData() {
    47. return false;
    48. }
    49. @Override
    50. public boolean enableExpiredUsageStackTraces() {
    51. return false;
    52. }
    53. @Override
    54. public long usageDetectionPeriodMillis() {
    55. return 60 * 60 * 1000;
    56. }
    57. @Override
    58. public long gracePeriodMillis() {
    59. return 60 * 60 * 1000;
    60. }
    61. };
    62. }
    63. /**
    64. * Listens for stale Hollow object usage
    65. */
    66. public interface ObjectLongevityDetector {
    67. /**
    68. * Stale reference detection hint. This will be called every ~30 seconds.
    69. * <p>
    70. * If a nonzero value is reported, then stale references to Hollow objects may be cached somewhere in your codebase.
    71. * <p>
    72. * This signal can be noisy, and a nonzero value indicates that some reference to stale data exists somewhere.
    73. *
    74. * @param count the count of stale references
    75. */
    76. void staleReferenceExistenceDetected(int count);
    77. /**
    78. * Stale reference USAGE detection. This will be called every ~30 seconds.
    79. * <p>
    80. * If a nonzero value is reported, then stale references to Hollow objects are being accessed from somewhere in your codebase.
    81. * <p>
    82. * This signal is noiseless, and a nonzero value indicates that some reference to stale data is USED somewhere.
    83. *
    84. * @param count the count of stale references
    85. */
    86. void staleReferenceUsageDetected(int count);
    87. ObjectLongevityDetector DEFAULT_DETECTOR = new ObjectLongevityDetector() {
    88. @Override
    89. public void staleReferenceUsageDetected(int count) {
    90. }
    91. @Override
    92. public void staleReferenceExistenceDetected(int count) {
    93. }
    94. };
    95. }

    RefreshListener

    RefreshListener将定义在更新Hollow的本地内存副本之前、期间和之后发生各种事件时要做什么。

    1. public interface RefreshListener {
    2. void refreshStarted(long currentVersion, long requestedVersion);
    3. void snapshotUpdateOccurred(HollowAPI api, HollowReadStateEngine stateEngine, long version) throws Exception;
    4. void deltaUpdateOccurred(HollowAPI api, HollowReadStateEngine stateEngine, long version) throws Exception;
    5. void blobLoaded(HollowConsumer.Blob transition);
    6. void refreshSuccessful(long beforeVersion, long afterVersion, long requestedVersion);
    7. void refreshFailed(long beforeVersion, long afterVersion, long requestedVersion, Throwable failureCause);
    8. }


    TransitionAwareRefreshListener是对RefreshListener的丰富和扩展。

    1. public interface TransitionAwareRefreshListener extends RefreshListener {
    2. void snapshotApplied(HollowAPI api, HollowReadStateEngine stateEngine, long version) throws Exception;
    3. void deltaApplied(HollowAPI api, HollowReadStateEngine stateEngine, long version) throws Exception;
    4. default void transitionsPlanned(long beforeVersion, long desiredVersion, boolean isSnapshotPlan, List<HollowConsumer.Blob.BlobType> transitionSequence) {}
    5. }


    RefreshRegistrationListener是监听器的监听器,负责刷新监听器添加和删除的监听器。

    1. public interface RefreshRegistrationListener {
    2. void onBeforeAddition(HollowConsumer c);
    3. void onAfterRemoval(HollowConsumer c);
    4. }

    Consumer API

    如果想要读取Hollow的Blob数据,需要使用借助于HollowAPI实现。

    HollowAPI分为HollowObjectAPI和HollowTypeAPI,当Blob中数据量较大时,推荐直接使用HollowTypeAPI进行解析,性能要比HollowObjectAPI好的多。

    在上文中分析HollowConsumer的构造方法时,我看注意到一个generator client,Hollow提供了API的代码生成工具类HollowAPIGenerator.java,使用HollowAPIGenerator可以方便的生成在DataClient需要的API类。API完全基于在Producer中定义的Schema生成。生成后的API,可以方便的使用包括query、index、hash等方法。

    此外,Hollow还提供了一个通用的数据访问类GenericHollowObject.java,使用这个类,可以无需针对具体的数据模型生成API类,但是使用起来会相对较为繁琐,因为我们无法在IDE中通过“点”的方式准确的获取具体的类和方法。

    总结

    本文从原始的生产消费模型入手,详细阐述Hollow的生产消费模型。并结合Hollow源码,详细分析了HollowProducerHollowConsumer

    通过上文可以看出,HollowProducerHollowConsumer是基于File进行生产和消费管理的,但是这是唯一的方式吗?答案是否定的。我们可以借助于HollowWriteStateEngineHollowReadStateEngine的能力直接使用InputStreamOutputStream操作数据。

     

  • 相关阅读:
    Vue2(十四):replace属性、编程式路由导航、缓存路由组件、两个新钩子、路由守卫、history与hash
    1.7.1、常见的计算机网络体系结构
    Netty——ByteBuffer的基本使用
    java毕业设计网上拍卖系统设计源码+lw文档+mybatis+系统+mysql数据库+调试
    老实说,分支预测,是高手过招的杀手锏,但是对写业务代码没啥帮助。
    Linux网络编程- recvfrom() & sendto()
    Python 使用类的属性和方法
    光伏发电预测(LSTM、CNN_LSTM和XGBoost回归模型,Python代码)
    吃鸡专家教你战无不胜,分享顶级干货!
    高防服务器给企业带来的优势有哪些?
  • 原文地址:https://blog.csdn.net/m0_71777195/article/details/125520478