前言
在上一篇文章【Netflix Hollow系列】深入分析Hollow生产消费模型中详细分析了Hollow的生产消费模型,但是Hollow是如何处理具体的Blob和数据流的呢?
本文针对以上问题,对Hollow的底层读写核心读写功能进行详细的阐述以及源码分析。
主要包含四部分:
- Hollow底层如何实现写数据
- Hollow底层如何实现读数据
- Hollow的Blob数据流
- Hollow底层数据ByteBuff的实现。
本文会附带众多关键的源码,希望通过本文可以详细能让大家理解Hollow底层数据流的流转过程以及生产消费是如何实现的。更进一步的是能够在设计和实现自己系统时有更多的思路。
@空歌白石 原创。
MemoryMode
在介绍数据流之前,让我们先来看下Hollow内存数据模型的分类。
Hollow 将内存模型分为ON_HEAP
和SHARED_MEMORY_LAZY
两种,预留第三种定义SHARED_MEMORY_EAGER
,方式目前并未实现。枚举定义如下:
- ON_HEAP:立即加载到主内存,在JVM堆上。
- SHARED_MEMORY_LAZY:映射到虚拟内存和延迟加载到主内存,离开堆
- SHARED_MEMORY_EAGER:(将来)映射到虚拟内存,并立即加载到主内存,离开堆
这里有点需要注意,并不是所有的MemoryMode都是支持TypeFilter
,目前仅仅ON_HEAP
支持。关于什么是TypeFilter
,将在后续的文章中展开介绍。
public enum MemoryMode {
ON_HEAP,
SHARED_MEMORY_LAZY;
// SHARED_MEMORY_EAGER
/*
* 返回是否支持某个内存模式
*/
public boolean consumerSupported() {
return this.equals(ON_HEAP) || this.equals(SHARED_MEMORY_LAZY);
}
/*
* 返回内存模式是否支持类型Filter
*/
public boolean supportsFiltering() {
return this.equals(ON_HEAP);
}
}
Write
首先我们查看HollowProducer
的核心方法runCycle
的具体实现。runCycle
是 Producer 循环执行生成全量和增量逻辑。
核心流程:
- prepare
- populate
- produce new state
- publish
- checksum
- validation
- announce
long runCycle(
ProducerListeners listeners,
HollowProducer.Incremental.IncrementalPopulator incrementalPopulator, HollowProducer.Populator populator,
Status.StageWithStateBuilder cycleStatus, long toVersion) {
// 1. Begin a new cycle
Artifacts artifacts = new Artifacts();
HollowWriteStateEngine writeEngine = getWriteEngine();
try {
// 1a. Prepare the write state
writeEngine.prepareForNextCycle();
// save timestamp in ms of when cycle starts
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START, String.valueOf(System.currentTimeMillis()));
// 2. Populate the state
populate(listeners, incrementalPopulator, populator, toVersion);
// 3. Produce a new state if there's work to do
if (writeEngine.hasChangedSinceLastCycle()) {
boolean schemaChangedFromPriorVersion = readStates.hasCurrent() &&
!writeEngine.hasIdenticalSchemas(readStates.current().getStateEngine());
if (schemaChangedFromPriorVersion) {
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE, Boolean.TRUE.toString());
} else {
writeEngine.getHeaderTags().remove(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE);
}
// 3a. Publish, run checks & validation, then announce new state consumers
publish(listeners, toVersion, artifacts);
ReadStateHelper candidate = readStates.roundtrip(toVersion);
cycleStatus.readState(candidate.pending());
candidate = doIntegrityCheck ?
checkIntegrity(listeners, candidate, artifacts, schemaChangedFromPriorVersion) :
noIntegrityCheck(candidate, artifacts);
try {
validate(listeners, candidate.pending());
announce(listeners, candidate.pending());
readStates = candidate.commit();
cycleStatus.readState(readStates.current()).success();
} catch (Throwable th) {
if (artifacts.hasReverseDelta()) {
applyDelta(artifacts.reverseDelta, candidate.pending().getStateEngine());
readStates = candidate.rollback();
}
throw th;
}
lastSuccessfulCycle = toVersion;
} else {
// 3b. Nothing to do; reset the effects of Step 2
// Return the lastSucessfulCycle to the caller thereby
// the callee can track that version against consumers
// without having to listen to events.
// Consistently report the version that would be used if
// data had been published for the events. This
// is for consistency in tracking
writeEngine.resetToLastPrepareForNextCycle();
cycleStatus.success();
listeners.fireNoDelta(toVersion);
log.info("Populate stage completed with no delta in output state; skipping publish, announce, etc.");
}
} catch (Throwable th) {
writeEngine.resetToLastPrepareForNextCycle();
cycleStatus.fail(th);
if (th instanceof RuntimeException) {
throw (RuntimeException) th;
}
throw new RuntimeException(th);
} finally {
artifacts.cleanup();
}
return lastSuccessfulCycle;
}
populate
Hollow写数据,主要由填充方法populate
实现。由firePopulateStart
,populate
,success
三个阶段组成。
void populate(
ProducerListeners listeners,
HollowProducer.Incremental.IncrementalPopulator incrementalPopulator, HollowProducer.Populator populator,
long toVersion) throws Exception {
assert incrementalPopulator != null ^ populator != null;
Status.StageBuilder populateStatus = listeners.firePopulateStart(toVersion);
try {
if (incrementalPopulator != null) {
// Incremental population is a sub-stage of the population stage
// This ensures good integration with existing population listeners if this sub-stage fails
// then the population stage will fail
populator = incrementalPopulate(listeners, incrementalPopulator, toVersion);
}
try (CloseableWriteState writeState = new CloseableWriteState(toVersion, objectMapper,
readStates.current())) {
populator.populate(writeState);
populateStatus.success();
}
} catch (Throwable th) {
populateStatus.fail(th);
throw th;
} finally {
listeners.firePopulateComplete(populateStatus);
}
}
HollowTypeMapper
HollowTypeMapper
负责数据从特定的类型转换为bytes。HollowTypeMapper
作为一个接口,针对不同的数据类型有不同的实现,具体的依赖关系如下图:
核心的记录写入方法如下
protected void addTypeState(HollowWriteStateEngine stateEngine) {
if(stateEngine.getTypeState(getTypeName()) == null)
stateEngine.addTypeState(getTypeWriteState());
}
protected HollowWriteRecord writeRecord() {
HollowWriteRecord rec = writeRec.get();
if(rec == null) {
rec = newWriteRecord();
writeRec.set(rec);
}
rec.reset();
return rec;
}
protected ByteDataArray flatRecBuffer() {
ByteDataArray buf = flatRecBuffer.get();
if(buf == null) {
buf = new ByteDataArray();
flatRecBuffer.set(buf);
}
buf.reset();
return buf;
}
以Object类的实现HollowObjectTypeMapper
为例展开介绍下:
@Override
protected HollowWriteRecord newWriteRecord() {
return new HollowObjectWriteRecord(schema);
}
HollowObjectWriteRecord
的构造方法中,依据SMALL_ARRAY_RECYCLER
初始化字段数据。
public HollowObjectWriteRecord(HollowObjectSchema schema) {
this.schema = schema;
this.fieldData = new ByteDataArray[schema.numFields()];
this.isNonNull = new boolean[schema.numFields()];
for (int i = 0; i < fieldData.length; i++) {
fieldData[i] = new ByteDataArray(WastefulRecycler.SMALL_ARRAY_RECYCLER);
}
}
当需要添加记录时,需要调用add
方法,将Object写入到Mapper。
/**
* Adds the specified POJO to the state engine.
* <p>
* Unless previously initialized with {@link #initializeTypeState(Class)},
* the first time an instance of a particular type is added
* its schema is derived and added to the data model.
*
* @param o the POJO to add
* @return the ordinal assigned to the newly added object
*/
public int add(Object o) {
HollowTypeMapper typeMapper = getTypeMapper(o.getClass(), null, null);
return typeMapper.write(o);
}
HollowWriteRecord
HollowWriteRecord定义了每一条数据记录写入到Blob的行为,包括writeDataTo
和reset
两个方法。继承关系如下图:
package com.netflix.hollow.core.write;
import com.netflix.hollow.core.memory.ByteDataArray;
public interface HollowWriteRecord {
public void writeDataTo(ByteDataArray buf);
public void reset();
}
针对不同的数据类型需要对HollowWriteRecord
进行特殊的实现。我们以Object的实现HollowObjectWriteRecord
说明。
private final HollowObjectSchema schema;
private final ByteDataArray fieldData[];
private final boolean isNonNull[];
public HollowObjectWriteRecord(HollowObjectSchema schema) {
this.schema = schema;
this.fieldData = new ByteDataArray[schema.numFields()];
this.isNonNull = new boolean[schema.numFields()];
for (int i = 0; i < fieldData.length; i++) {
fieldData[i] = new ByteDataArray(WastefulRecycler.SMALL_ARRAY_RECYCLER);
}
}
具体writeDataTo
的实现,可以看出Hollow在写数据时,会将Object
的每一个Filed
根据字段具体数据类型依次写入到ByteBuff中。如果为NULL
则会特殊处理。
public void writeDataTo(ByteDataArray buf) {
for (int i = 0; i < fieldData.length; i++) {
writeField(buf, i);
}
}
public void writeDataTo(ByteDataArray buf, HollowObjectSchema translate) {
for(int i=0; i < translate.numFields(); i++) {
int fieldIndex = schema.getPosition(translate.getFieldName(i));
if(fieldIndex != -1) {
writeField(buf, fieldIndex);
} else {
writeNull(buf, translate.getFieldType(i));
}
}
}
private void writeField(ByteDataArray buf, int fieldIndex) {
if (isNonNull[fieldIndex]) {
if (getSchema().getFieldType(fieldIndex).isVariableLength())
VarInt.writeVInt(buf, (int)fieldData[fieldIndex].length());
fieldData[fieldIndex].copyTo(buf);
} else {
writeNull(buf, schema.getFieldType(fieldIndex));
}
}
数据模型可以用以下图形为例,加以理解。有兴趣的朋友也可以查阅之前的文章:# 【Netflix Hollow系列】深入分析Hollow内存布局,可以有更详细的理解。
Read
接下来,将阐述Consumer是如何Read数据,首先让我们看下Read数据相关的核心类的依赖关系。
HollowBlobReader
HollowBlobReader
负责填充全量和消费增量数据,也可以称之为更新数据到HollowReadStateEngine
。在方法中可以根据具体的MemoryMode
选择不同的堆内模型。
private void applySnapshotTransition(HollowConsumer.Blob snapshotBlob,
HollowConsumer.RefreshListener[] refreshListeners,
Runnable apiInitCallback) throws Throwable {
try (HollowBlobInput in = HollowBlobInput.modeBasedSelector(memoryMode, snapshotBlob);
OptionalBlobPartInput optionalPartIn = snapshotBlob.getOptionalBlobPartInputs()) {
applyStateEngineTransition(in, optionalPartIn, snapshotBlob, refreshListeners);
initializeAPI(apiInitCallback);
for (HollowConsumer.RefreshListener refreshListener : refreshListeners) {
if (refreshListener instanceof TransitionAwareRefreshListener)
((TransitionAwareRefreshListener)refreshListener).snapshotApplied(currentAPI, stateEngine, snapshotBlob.getToVersion());
}
} catch (Throwable t) {
failedTransitionTracker.markFailedTransition(snapshotBlob);
throw t;
}
}
private void applyDeltaTransition(HollowConsumer.Blob blob, boolean isSnapshotPlan, HollowConsumer.RefreshListener[] refreshListeners) throws Throwable {
if (!memoryMode.equals(MemoryMode.ON_HEAP)) {
LOG.warning("Skipping delta transition in shared-memory mode");
return;
}
try (HollowBlobInput in = HollowBlobInput.modeBasedSelector(memoryMode, blob);
OptionalBlobPartInput optionalPartIn = blob.getOptionalBlobPartInputs()) {
applyStateEngineTransition(in, optionalPartIn, blob, refreshListeners);
if(objLongevityConfig.enableLongLivedObjectSupport()) {
HollowDataAccess previousDataAccess = currentAPI.getDataAccess();
HollowHistoricalStateDataAccess priorState = new HollowHistoricalStateCreator(null).createBasedOnNewDelta(currentVersion, stateEngine);
HollowProxyDataAccess newDataAccess = new HollowProxyDataAccess();
newDataAccess.setDataAccess(stateEngine);
currentAPI = apiFactory.createAPI(newDataAccess, currentAPI);
if(previousDataAccess instanceof HollowProxyDataAccess)
((HollowProxyDataAccess)previousDataAccess).setDataAccess(priorState);
wireHistoricalStateChain(priorState);
} else {
if(currentAPI.getDataAccess() != stateEngine)
currentAPI = apiFactory.createAPI(stateEngine);
priorHistoricalDataAccess = null;
}
if(!staleReferenceDetector.isKnownAPIHandle(currentAPI))
staleReferenceDetector.newAPIHandle(currentAPI);
for(HollowConsumer.RefreshListener refreshListener : refreshListeners) {
if(!isSnapshotPlan)
refreshListener.deltaUpdateOccurred(currentAPI, stateEngine, blob.getToVersion());
if (refreshListener instanceof TransitionAwareRefreshListener)
((TransitionAwareRefreshListener)refreshListener).deltaApplied(currentAPI, stateEngine, blob.getToVersion());
}
} catch (Throwable t) {
failedTransitionTracker.markFailedTransition(blob);
throw t;
}
}
private void applyStateEngineTransition(HollowBlobInput in, OptionalBlobPartInput optionalPartIn, HollowConsumer.Blob transition, HollowConsumer.RefreshListener[] refreshListeners) throws IOException {
if (transition.isSnapshot()) {
if (filter == null) {
reader.readSnapshot(in, optionalPartIn);
}
else {
reader.readSnapshot(in, optionalPartIn, filter);
}
} else {
reader.applyDelta(in, optionalPartIn);
}
setVersion(transition.getToVersion());
for (HollowConsumer.RefreshListener refreshListener : refreshListeners)
refreshListener.blobLoaded(transition);
}
读取全量数据的方法。
public void readSnapshot(HollowBlobInput in, OptionalBlobPartInput optionalParts, TypeFilter filter) throws IOException {
validateMemoryMode(in.getMemoryMode());
Map<String, HollowBlobInput> optionalPartInputs = null;
if (optionalParts != null)
optionalPartInputs = optionalParts.getInputsByPartName(in.getMemoryMode());
HollowBlobHeader header = readHeader(in, false);
List<HollowBlobOptionalPartHeader> partHeaders = readPartHeaders(header, optionalPartInputs, in.getMemoryMode());
List<HollowSchema> allSchemas = combineSchemas(header, partHeaders);
filter = filter.resolve(allSchemas);
notifyBeginUpdate();
long startTime = System.currentTimeMillis();
int numStates = VarInt.readVInt(in);
Collection<String> typeNames = new TreeSet<>();
for (int i=0;i<numStates;i++) {
String typeName = readTypeStateSnapshot(in, filter);
typeNames.add(typeName);
}
if (optionalPartInputs != null) {
for (Map.Entry<String, HollowBlobInput> optionalPartEntry : optionalPartInputs.entrySet()) {
numStates = VarInt.readVInt(optionalPartEntry.getValue());
for (int i=0;i<numStates;i++) {
String typeName = readTypeStateSnapshot(optionalPartEntry.getValue(), filter);
typeNames.add(typeName);
}
}
}
stateEngine.wireTypeStatesToSchemas();
long endTime = System.currentTimeMillis();
log.info("SNAPSHOT COMPLETED IN " + (endTime - startTime) + "ms");
log.info("TYPES: " + typeNames);
notifyEndUpdate();
stateEngine.afterInitialization();
}
private String readTypeStateSnapshot(HollowBlobInput in, TypeFilter filter) throws IOException {
HollowSchema schema = HollowSchema.readFrom(in);
int numShards = readNumShards(in);
String typeName = schema.getName();
if (schema instanceof HollowObjectSchema) {
if (!filter.includes(typeName)) {
HollowObjectTypeReadState.discardSnapshot(in, (HollowObjectSchema)schema, numShards);
} else {
HollowObjectSchema unfilteredSchema = (HollowObjectSchema)schema;
HollowObjectSchema filteredSchema = unfilteredSchema.filterSchema(filter);
populateTypeStateSnapshot(in, new HollowObjectTypeReadState(stateEngine, memoryMode, filteredSchema, unfilteredSchema, numShards));
}
} else if (schema instanceof HollowListSchema) {
if (!filter.includes(typeName)) {
HollowListTypeReadState.discardSnapshot(in, numShards);
} else {
populateTypeStateSnapshot(in, new HollowListTypeReadState(stateEngine, memoryMode, (HollowListSchema)schema, numShards));
}
} else if(schema instanceof HollowSetSchema) {
if (!filter.includes(typeName)) {
HollowSetTypeReadState.discardSnapshot(in, numShards);
} else {
populateTypeStateSnapshot(in, new HollowSetTypeReadState(stateEngine, memoryMode, (HollowSetSchema)schema, numShards));
}
} else if(schema instanceof HollowMapSchema) {
if (!filter.includes(typeName)) {
HollowMapTypeReadState.discardSnapshot(in, numShards);
} else {
populateTypeStateSnapshot(in, new HollowMapTypeReadState(stateEngine, memoryMode, (HollowMapSchema)schema, numShards));
}
}
return typeName;
}
private void populateTypeStateSnapshot(HollowBlobInput in, HollowTypeReadState typeState) throws IOException {
stateEngine.addTypeState(typeState);
typeState.readSnapshot(in, stateEngine.getMemoryRecycler());
}
@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException {
if (shards.length > 1)
maxOrdinal = VarInt.readVInt(in);
for (int i = 0; i < shards.length; i++) {
HollowObjectTypeDataElements snapshotData = new HollowObjectTypeDataElements(getSchema(), memoryMode, memoryRecycler);
snapshotData.readSnapshot(in, unfilteredSchema);
shards[i].setCurrentData(snapshotData);
}
if (shards.length == 1)
maxOrdinal = shards[0].currentDataElements().maxOrdinal;
SnapshotPopulatedOrdinalsReader.readOrdinals(in, stateListeners);
}
从数据流HollowBlobInput
读取数据,同时指定了当前数据流是否为增量,HollowObjectSchema
定义了HollowBlobInput
的数据格式。
void readFromInput(HollowBlobInput in, boolean isDelta, HollowObjectSchema unfilteredSchema) throws IOException {
maxOrdinal = VarInt.readVInt(in);
if (isDelta) {
encodedRemovals = GapEncodedVariableLengthIntegerReader.readEncodedDeltaOrdinals(in, memoryRecycler);
encodedAdditions = GapEncodedVariableLengthIntegerReader.readEncodedDeltaOrdinals(in, memoryRecycler);
}
readFieldStatistics(in, unfilteredSchema);
fixedLengthData = FixedLengthDataFactory.get(in, memoryMode, memoryRecycler);
removeExcludedFieldsFromFixedLengthData();
readVarLengthData(in, unfilteredSchema);
}
public static GapEncodedVariableLengthIntegerReader readEncodedDeltaOrdinals(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException {
SegmentedByteArray arr = new SegmentedByteArray(memoryRecycler);
long numBytesEncodedOrdinals = VarInt.readVLong(in);
arr.loadFrom(in, numBytesEncodedOrdinals);
return new GapEncodedVariableLengthIntegerReader(arr, (int)numBytesEncodedOrdinals);
}
@Override
public void loadFrom(HollowBlobInput is, long length) throws IOException {
int segmentSize = 1 << log2OfSegmentSize;
int segment = 0;
byte scratch[] = new byte[segmentSize];
while (length > 0) {
ensureCapacity(segment);
long bytesToCopy = Math.min(segmentSize, length);
long bytesCopied = 0;
while (bytesCopied < bytesToCopy) {
bytesCopied += is.read(scratch, (int)bytesCopied, (int)(bytesToCopy - bytesCopied));
}
orderedCopy(scratch, 0, segments[segment++], 0, (int)bytesCopied);
length -= bytesCopied;
}
}
private void orderedCopy(byte[] src, int srcPos, byte[] dest, int destPos, int length) {
int endSrcPos = srcPos + length;
destPos += Unsafe.ARRAY_BYTE_BASE_OFFSET;
while(srcPos < endSrcPos) {
unsafe.putByteVolatile(dest, destPos++, src[srcPos++]);
}
}
/**
* Reads up to {@code len} bytes of data from the HollowBlobInput by relaying the call to the underlying
* {@code DataInputStream} or {@code RandomAccessFile} into an array of bytes. This method blocks until at
* least one byte of input is available.
*
* @return an integer in the range 0 to 255
* @throws IOException if underlying {@code DataInputStream} or {@code RandomAccessFile}
* @throws UnsupportedOperationException if the input type wasn't one of {@code DataInputStream} or {@code RandomAccessFile}
*/
public int read(byte b[], int off, int len) throws IOException {
if (input instanceof RandomAccessFile) {
return ((RandomAccessFile) input).read(b, off, len);
} else if (input instanceof DataInputStream) {
return ((DataInputStream) input).read(b, off, len);
} else {
throw new UnsupportedOperationException("Unknown Hollow Blob Input type");
}
}
HollowTypeReadState
HollowTypeReadState
定义了两个抽象方法,readSnapshot
和applyDelta
。分别负责读取全量和消费增量数据。
public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler) throws IOException;
public abstract void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler) throws IOException;
从继承关系图中可以看出,HollowObjectTypeReadState
,HollowListTypeReadState
,HollowSetTypeReadState
,HollowMapTypeReadState
,分别实现了HollowTypeReadState
。我们以为例查看下子类是如何实现增量和全量的消费的。
@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException {
if(shards.length > 1)
maxOrdinal = VarInt.readVInt(in);
for(int i=0;i<shards.length;i++) {
HollowObjectTypeDataElements snapshotData = new HollowObjectTypeDataElements(getSchema(), memoryMode, memoryRecycler);
snapshotData.readSnapshot(in, unfilteredSchema);
shards[i].setCurrentData(snapshotData);
}
if(shards.length == 1)
maxOrdinal = shards[0].currentDataElements().maxOrdinal;
SnapshotPopulatedOrdinalsReader.readOrdinals(in, stateListeners);
}
@Override
public void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmentRecycler memoryRecycler) throws IOException {
if(shards.length > 1)
maxOrdinal = VarInt.readVInt(in);
for(int i=0;i<shards.length;i++) {
HollowObjectTypeDataElements deltaData = new HollowObjectTypeDataElements((HollowObjectSchema)deltaSchema, memoryMode, memoryRecycler);
deltaData.readDelta(in);
if(stateEngine.isSkipTypeShardUpdateWithNoAdditions() && deltaData.encodedAdditions.isEmpty()) {
if(!deltaData.encodedRemovals.isEmpty())
notifyListenerAboutDeltaChanges(deltaData.encodedRemovals, deltaData.encodedAdditions, i, shards.length);
HollowObjectTypeDataElements currentData = shards[i].currentDataElements();
GapEncodedVariableLengthIntegerReader oldRemovals = currentData.encodedRemovals == null ? GapEncodedVariableLengthIntegerReader.EMPTY_READER : currentData.encodedRemovals;
if(oldRemovals.isEmpty()) {
currentData.encodedRemovals = deltaData.encodedRemovals;
oldRemovals.destroy();
} else {
if(!deltaData.encodedRemovals.isEmpty()) {
currentData.encodedRemovals = GapEncodedVariableLengthIntegerReader.combine(oldRemovals, deltaData.encodedRemovals, memoryRecycler);
oldRemovals.destroy();
}
deltaData.encodedRemovals.destroy();
}
deltaData.encodedAdditions.destroy();
} else {
HollowObjectTypeDataElements nextData = new HollowObjectTypeDataElements(getSchema(), memoryMode, memoryRecycler);
HollowObjectTypeDataElements oldData = shards[i].currentDataElements();
nextData.applyDelta(oldData, deltaData);
shards[i].setCurrentData(nextData);
notifyListenerAboutDeltaChanges(deltaData.encodedRemovals, deltaData.encodedAdditions, i, shards.length);
oldData.destroy();
}
deltaData.destroy();
stateEngine.getMemoryRecycler().swap();
}
if(shards.length == 1)
maxOrdinal = shards[0].currentDataElements().maxOrdinal;
}
针对全量数据,我们继续查看SnapshotPopulatedOrdinalsReader.readOrdinals(in, stateListeners);
的实现。
public static void readOrdinals(HollowBlobInput in, HollowTypeStateListener[] listeners) throws IOException {
int numLongs = in.readInt();
int currentOrdinal = 0;
for(int i=0;i<numLongs;i++) {
long l = in.readLong();
notifyPopulatedOrdinals(l, currentOrdinal, listeners);
currentOrdinal += 64;
}
}
private static void notifyPopulatedOrdinals(long l, int ordinal, HollowTypeStateListener[] listeners) {
if(l == 0)
return;
int stopOrdinal = ordinal + 64;
while(ordinal < stopOrdinal) {
long mask = 1L << ordinal;
if((l & mask) != 0) {
for(int i=0; i<listeners.length; i++) {
listeners[i].addedOrdinal(ordinal);
}
}
ordinal++;
}
}
HollowTypeStateListener
最终可以看到在HollowTypeStateListener
的addedOrdinal
方法中实现了数据的读取。HollowTypeStateListener
是一个接口。下图展示了接口的实现情况。
HollowTypeStateListener
注册到特定的HollowTypeReadState
以在将增量应用于HollowReadStateEngine
时接收回调通知。
以HollowObjectCacheProvider
为例详细看下具体源码。
@Override
public void addedOrdinal(int ordinal) {
// guard against being detached (or constructed without a HollowTypeReadState)
if (factory == null)
return;
for (int i = cachedItems.size(); i <= ordinal; ++i)
cachedItems.add(null);
cachedItems.set(ordinal, instantiateCachedObject(factory, typeReadState, typeAPI, ordinal));
}
private T instantiateCachedObject(HollowFactory<T> factory, HollowTypeDataAccess typeDataAccess, HollowTypeAPI typeAPI, int ordinal) {
try {
return factory.newCachedHollowObject(typeDataAccess, typeAPI, ordinal);
} catch(Throwable th) {
log.log(Level.SEVERE, "Cached object instantiation failed", th);
return null;
}
}
factory.newCachedHollowObject
就可以和MomoryMode
小节的阐述相关联起来。
/**
* A HollowFactory is responsible for returning objects in a generated Hollow Object API. The HollowFactory for individual
* types can be overridden to return hand-coded implementations of specific record types.
*/
public abstract class HollowFactory<T> {
public abstract T newHollowObject(HollowTypeDataAccess dataAccess, HollowTypeAPI typeAPI, int ordinal);
public T newCachedHollowObject(HollowTypeDataAccess dataAccess, HollowTypeAPI typeAPI, int ordinal) {
return newHollowObject(dataAccess, typeAPI, ordinal);
}
}
HollowFactory
HollowFactory
的实现类继承关系如下图:
以String的实现为例看下源码。
public class StringHollowFactory extends HollowFactory<HString> {
@Override
public HString newHollowObject(HollowTypeDataAccess dataAccess, HollowTypeAPI typeAPI, int ordinal) {
return new HString(((StringTypeAPI)typeAPI).getDelegateLookupImpl(), ordinal);
}
@Override
public HString newCachedHollowObject(HollowTypeDataAccess dataAccess, HollowTypeAPI typeAPI, int ordinal) {
return new HString(new StringDelegateCachedImpl((StringTypeAPI)typeAPI, ordinal), ordinal);
}
}
HollowBlobInput
为支持不同的内存模式以及使用DataInputStream
或RandomAccessFile
作为Hollow Producer/Consumer Blob
的底层资源之间进行传递,Hollow封装了HollowBlobInput
。可以认为此类是Hollow数据传输的核心数据流。
Constructor
HollowBlobInput
包含了三个属性:MemoryMode
,inputStream
,BlobByteBuffer
。
private final MemoryMode memoryMode;
private Object input;
private BlobByteBuffer buffer;
private HollowBlobInput(MemoryMode memoryMode) {
this.memoryMode = memoryMode;
}
public MemoryMode getMemoryMode() {
return memoryMode;
}
Blob To HollowBlobInput
本小节将分析Hollow是如何将Blob
转换为数据流HollowBlobInput
的。
modeBasedSelector
modeBasedSelector
方法将数据从Blob文件中读取到内存流中,此方法主要作为一个选择器,基于不同的MemoryMode
选择randomAccess
还是serial
方式读取数据流。
/**
* Initialize the Hollow Blob Input object from the Hollow Consumer blob's Input Stream or Random Access File,
* depending on the configured memory mode. The returned HollowBlobInput object must be closed to free up resources.
*
* @param mode Configured memory mode
* @param blob Hollow Consumer blob
* @return the initialized Hollow Blob Input
* @throws IOException if the Hollow Blob Input couldn't be initialized
*/
public static HollowBlobInput modeBasedSelector(MemoryMode mode, HollowConsumer.Blob blob) throws IOException {
if (mode.equals(ON_HEAP)) {
return serial(blob.getInputStream());
} else if (mode.equals(SHARED_MEMORY_LAZY)) {
return randomAccess(blob.getFile());
} else {
throw new UnsupportedOperationException();
}
}
/**
* Initialize the Hollow Blob Input object from the Hollow Consumer blob's Input Stream or Random Access File,
* depending on the configured memory mode. The returned HollowBlobInput object must be closed to free up resources.
*
* @param mode Configured memory mode
* @param input Hollow Consumer blob
* @param partName the name of the optional part
* @return the initialized Hollow Blob Input
* @throws IOException if the Hollow Blob Input couldn't be initialized
*/
public static HollowBlobInput modeBasedSelector(MemoryMode mode, OptionalBlobPartInput input, String partName) throws IOException {
if (mode.equals(ON_HEAP)) {
return serial(input.getInputStream(partName));
} else if (mode.equals(SHARED_MEMORY_LAZY)) {
return randomAccess(input.getFile(partName));
} else {
throw new UnsupportedOperationException();
}
}
randomAccess
使用RandomAccessFile
的方式读取Blob文件,在BlobByteBuffer.mmapBlob
方法中基于JDK的MappedByteBuffer
类实现mmap
的数据读取。
/**
* Initialize a random access Hollow Blob input object from a file. The returned HollowBlobInput object must be
* closed to free up resources.
*
* @param f file containing the Hollow blob
* @return a random access HollowBlobInput object
* @throws IOException if the mmap operation reported an IOException
*/
public static HollowBlobInput randomAccess(File f) throws IOException {
return randomAccess(f, MAX_SINGLE_BUFFER_CAPACITY);
}
/**
* Useful for testing with custom buffer capacity
*/
public static HollowBlobInput randomAccess(File f,int singleBufferCapacity) throws IOException {
HollowBlobInput hbi = new HollowBlobInput(SHARED_MEMORY_LAZY);
RandomAccessFile raf = new RandomAccessFile(f, "r");
hbi.input = raf;
FileChannel channel = ((RandomAccessFile) hbi.input).getChannel();
hbi.buffer = BlobByteBuffer.mmapBlob(channel, singleBufferCapacity);
return hbi;
}
serial
serial
的实现方式,直接将InputStrem
转换为DataInputStream
,赋值到HollowBlobInput
的input
属性中。
/**
* Shorthand for calling {@link HollowBlobInput#serial(InputStream)} on a byte[]
*/
public static HollowBlobInput serial(byte[] bytes) {
InputStream is = new ByteArrayInputStream(bytes);
return serial(is);
}
/**
* Initialize a serial access Hollow Blob input object from an input stream. The returned HollowBlobInput object
* must be closed to free up resources.
*
* @param is input stream containing for Hollow blob data
* @return a serial access HollowBlobInput object
*/
public static HollowBlobInput serial(InputStream is) {
HollowBlobInput hbi = new HollowBlobInput(ON_HEAP);
hbi.input = new DataInputStream(is);
return hbi;
}
read stream
HollowBlobInput
借助于read
开头的方法,加载来自于不同的数据流以及不同的数据类型的数据。借助于RandomAccessFile
或DataInputStream
的底层数据read能力。
/**
* Reads the next byte of data from the input stream by relaying the call to the underlying {@code DataInputStream} or
* {@code RandomAccessFile}. The byte is returned as an integer in the range 0 to 255.
*
* @return an integer in the range 0 to 255
* @throws IOException if underlying {@code DataInputStream} or {@code RandomAccessFile}
* @throws UnsupportedOperationException if the input type wasn't one of {@code DataInputStream} or {@code RandomAccessFile}
*/
public int read() throws IOException {
if (input instanceof RandomAccessFile) {
return ((RandomAccessFile) input).read();
} else if (input instanceof DataInputStream) {
return ((DataInputStream) input).read();
} else {
throw new UnsupportedOperationException("Unknown Hollow Blob Input type");
}
}
/**
* Reads up to {@code len} bytes of data from the HollowBlobInput by relaying the call to the underlying
* {@code DataInputStream} or {@code RandomAccessFile} into an array of bytes. This method blocks until at
* least one byte of input is available.
*
* @return an integer in the range 0 to 255
* @throws IOException if underlying {@code DataInputStream} or {@code RandomAccessFile}
* @throws UnsupportedOperationException if the input type wasn't one of {@code DataInputStream} or {@code RandomAccessFile}
*/
public int read(byte b[], int off, int len) throws IOException {
if (input instanceof RandomAccessFile) {
return ((RandomAccessFile) input).read(b, off, len);
} else if (input instanceof DataInputStream) {
return ((DataInputStream) input).read(b, off, len);
} else {
throw new UnsupportedOperationException("Unknown Hollow Blob Input type");
}
}
/**
* Sets the file-pointer to the desired offset measured from the beginning of the file by relaying the call to the
* underlying {@code RandomAccessFile}. Operation not supported if the Hollow Blob Input is an {@code DataInputStream}.
*
* @param pos the position in bytes from the beginning of the file at which to set the file pointer to.
* @exception IOException if originated in the underlying {@code RandomAccessFile} implementation
* @exception UnsupportedOperationException if called when Hollow Blob Input is not a {@code RandomAccessFile}
*/
public void seek(long pos) throws IOException {
if (input instanceof RandomAccessFile) {
((RandomAccessFile) input).seek(pos);
} else if (input instanceof DataInputStream) {
throw new UnsupportedOperationException("Can not seek on Hollow Blob Input of type DataInputStream");
} else {
throw new UnsupportedOperationException("Unknown Hollow Blob Input type");
}
}
/**
* Reads two bytes from the input (at the current file pointer) into a signed 16-bit short, and advances the offset
* in input.
*
* @return short value read from current offset in input
* @exception IOException if an I/O error occurs.
*/
public final short readShort() throws IOException {
if (input instanceof RandomAccessFile) {
return ((RandomAccessFile) input).readShort();
} else if (input instanceof DataInputStream) {
return ((DataInputStream) input).readShort();
} else {
throw new UnsupportedOperationException("Unknown Hollow Blob Input type");
}
}
/**
* Reads 4 bytes from the input (at the current file pointer) into a signed 32-bit int, and advances the offset
* in input.
*
* @return int value read from current offset in input
* @exception IOException if an I/O error occurs.
*/
public final int readInt() throws IOException {
if (input instanceof RandomAccessFile) {
return ((RandomAccessFile) input).readInt();
} else if (input instanceof DataInputStream) {
return ((DataInputStream) input).readInt();
} else {
throw new UnsupportedOperationException("Unknown Hollow Blob Input type");
}
}
/**
* Reads 8 bytes from the input (at the current file pointer) into a signed 64-bit long, and advances the offset
* in input.
*
* @return long value read from current offset in input
* @exception IOException if an I/O error occurs.
*/
public final long readLong() throws IOException {
if (input instanceof RandomAccessFile) {
return ((RandomAccessFile) input).readLong();
} else if (input instanceof DataInputStream) {
return ((DataInputStream) input).readLong();
} else {
throw new UnsupportedOperationException("Unknown Hollow Blob Input type");
}
}
/**
* Reads in a string from this file, encoded using <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
* format, and advances the offset in input.
* @return UTF-8 string read from current offset in input
* @exception IOException if an I/O error occurs.
*/
public final String readUTF() throws IOException {
if (input instanceof RandomAccessFile) {
return ((RandomAccessFile) input).readUTF();
} else if (input instanceof DataInputStream) {
return ((DataInputStream) input).readUTF();
} else {
throw new UnsupportedOperationException("Unknown Hollow Blob Input type");
}
}
close
close负责关闭数据流。
/**
* Closes underlying InputStream/RandomAccessFile and releases any system resources associated with the Hollow Blob Input.
* @throws IOException
*/
@Override
public void close() throws IOException {
if (input instanceof RandomAccessFile) {
((RandomAccessFile) input).close();
} else if (input instanceof DataInputStream) {
((DataInputStream) input).close();
} else {
throw new UnsupportedOperationException("Unknown Hollow Blob Input type");
}
}
BlobByteBuffer
在HollowBlobInput
中可以看到BlobByteBuffer
这个属性。本小节着重探讨下BlobByteBuffer
的实现。
BlobByteBuffer
负责所有在内存中数据的存储,BlobByteBuffer
实现了将类似于BLOB这样的大文件和MappedByteBuffer
之间的桥梁,原因是MappedByteBuffer
仅限于映射整数大小的内存。
请注意: JDK 14 将引入改进的 API 来访问外部内存并替换MappedByteBuffer
。
BlobByteBuffer
不是线程安全的,但共享底层字节缓冲区以进行并行读取是安全的。支持的最大blob
大小约为2的16次方。 在遇到此限制之前,可能会达到 Hollow 中的其他限制或实际限制。
fields
BlobByteBuffer
的数据最终都存储在名为spine
的ByteBuffer
数组中。
public static final int MAX_SINGLE_BUFFER_CAPACITY = 1 << 30; // largest, positive power-of-two int
private final ByteBuffer[] spine; // array of MappedByteBuffers
private final long capacity; // in bytes
private final int shift;
private final int mask;
private long position; // within index 0 to capacity-1 in the underlying ByteBuffer
mmapBlob
BlobByteBuffer
的mmapBlob
方法实现了从File通过mmap方式读取数据。
/**
* mmap the entire contents of FileChannel into an array of {@code MappedByteBuffer}s, each of size singleBufferCapacity.
* @param channel FileChannel for file to be mmap-ed
* @param singleBufferCapacity Size of individual MappedByteBuffers in array of {@code MappedByteBuffer}s required
* to map the entire file channel. It must be a power of 2, and due to {@code MappedByteBuffer}
* constraints it is limited to the max integer that is a power of 2.
* @return BlobByteBuffer containing an array of {@code MappedByteBuffer}s that mmap-ed the entire file channel
* @throws IOException
*/
public static BlobByteBuffer mmapBlob(FileChannel channel, int singleBufferCapacity) throws IOException {
long size = channel.size();
if (size == 0) {
throw new IllegalStateException("File to be mmap-ed has no data");
}
if ((singleBufferCapacity & (singleBufferCapacity - 1)) != 0) { // should be a power of 2
throw new IllegalArgumentException("singleBufferCapacity must be a power of 2");
}
// divide into N buffers with an int capacity that is a power of 2
final int bufferCapacity = size > (long) singleBufferCapacity
? singleBufferCapacity
: Integer.highestOneBit((int) size);
long bufferCount = size % bufferCapacity == 0
? size / (long)bufferCapacity
: (size / (long)bufferCapacity) + 1;
if (bufferCount > Integer.MAX_VALUE)
throw new IllegalArgumentException("file too large; size=" + size);
int shift = 31 - Integer.numberOfLeadingZeros(bufferCapacity); // log2
int mask = (1 << shift) - 1;
ByteBuffer[] spine = new MappedByteBuffer[(int)bufferCount];
for (int i = 0; i < bufferCount; i++) {
long pos = (long)i * bufferCapacity;
int cap = i == (bufferCount - 1)
? (int)(size - pos)
: bufferCapacity;
ByteBuffer buffer = channel.map(READ_ONLY, pos, cap);
/*
* if (!((MappedByteBuffer) buffer).isLoaded()) // TODO(timt): make pre-fetching configurable
* ((MappedByteBuffer) buffer).load();
*/
spine[i] = buffer;
}
return new BlobByteBuffer(size, shift, mask, spine);
}
总结
本文从通过详细的分析源码,深入分析了Hollow是如何实现数据Record的读写。最后探讨了Hollow对于ByteBuff的实现,对于ByteBuff我相信各位都不默认,几乎所有的框架都会用到,可见其重要性。
结束语
深入阅读源码,能够让自己更深刻的理解系统的设计架构,而不仅仅是停留在使用的层面上。
不要犹豫,不要彷徨,一起来阅读源码吧。