乐于分享
好东西不私藏

AutoMQ源码(一)读、写、Compaction

AutoMQ源码(一)读、写、Compaction

前言

AutoMQ 在兼容 Kafka 协议与语义的前提下,将本地 Log/Segment 映射为 S3 上的 Stream 存储模型。

本文从 Kafka 数据目录出发,沿 Log → Segment → Stream → S3 这条主线,梳理 AutoMQ 的存储抽象、读写路径与 Compaction 机制。

注:

1)本文基于AutoMQ1.6.6版本,WAL仅支持S3;

2)KIP-1150 Kafka无盘Topic:https://cwiki.apache.org/confluence/display/KAFKA/KIP-1150%3A+Diskless+Topics;

一、Log&Segment&Stream

Kafka的数据目录组织形式如下:

tree -a.├── .kafka_cleanshutdown├── .lock├── Test1-0│   ├── 00000000000000000000.index│   ├── 00000000000000000000.log│   ├── 00000000000000000000.timeindex│   ├── 00000000000000001872.index│   ├── 00000000000000001872.log│   ├── 00000000000000001872.timeindex│   └── leader-epoch-checkpoint├── cleaner-offset-checkpoint├── log-start-offset-checkpoint├── meta.properties├── recovery-point-offset-checkpoint└── replication-offset-checkpoint

Log:数据目录下的唯一目录={Topic}-{PartitionId},如Test1-0,存储了该分区的所有数据。Log数据由多个Segment组成,如0000是第一个segment,1872是第二个segment,文件名是该segment的起始offset,如0000存储了offset=[0,1872)共1872条消息。

Segment:每个Segment由4个文件构成

a)log:消息数据文件,追加写,大小超过segment.bytes=1G触发滚动;

b)index:offset索引文件,通过mmap读写,大小=segment.index.bytes=10M,key=相对segment起始offset的offset(4byte),value=消息在log的写入位置(4byte);

c)timeindex:时间索引文件,通过mmap读写,大小=segment.index.bytes=10M,key=时间戳(8byte),value=相对segment起始offset的offset(4byte);

d)txnindex:事务索引文件;

leader-epoch-checkpoint:每个Log一份,记录每个分区leader任期的起始offset。

节点级别数据文件

a)meta.properties:数据和broker+cluster的关联关系;

b)checkpoint文件:记录每个分区不同业务的offset进度;

  • recovery-point-offset-checkpoint:offset=消息刷盘进度,log.flush.offset.checkpoint.interval.ms=60秒刷盘一次;
  • replication-offset-checkpoint:offset=高水位HW,replica.high.watermark.checkpoint.interval.ms=5秒刷盘一次;
  • log-start-offset-checkpoint:offset=Log Start Offset,log.flush.start.offset.checkpoint.interval.ms=60秒刷盘一次;
  • cleaner-offset-checkpoint:offset=压缩topic分区的处理进度;

c)kafka_cleanshutdown:broker正常关闭标记文件,如果不存在,代表kafka进程异常关闭。

d)lock:文件锁,一个数据目录同时只能有一个进程访问。

Broker侧AutoMQ和Kafka的对应关系如下:

Stream:一个分区Log对应4个Stream,分别是:元数据数据log时间索引事务索引

publicclassS3StreamimplementsStreamStreamMetadataListener{// idprivatefinallong streamId;// 分区LeaderEpochprivatefinallong epoch;// 起始offsetprivatelong startOffset;// 下一个写入offsetprivatefinal AtomicLong nextOffset;// 已经写入成功的offsetfinal AtomicLong confirmOffset;// S3Storage读写Streamprivatefinal Storage storage;}

元数据MetaStream包装Stream,kv存储如:

1)ElasticLogMeta:每个segment的元数据;

2)ElasticPartitionProducerSnapshotsMeta:事务状态快照;

3)ElasticPartitionMeta:对应Kafka数据目录下的checkpoint,只是按照分区拆分了,另外存储cleanedShutdown标记;

4)ElasticLeaderEpochCheckpointMeta:对应Log目录下的leader-epoch-checkpoint,每个LeaderEpoch的起始offset;

publicclassMetaStreamimplementsStream{// ElasticLogMeta publicstaticfinal String LOG_META_KEY = "LOG";// ElasticPartitionProducerSnapshotsMeta publicstaticfinal String PRODUCER_SNAPSHOTS_META_KEY = "PRODUCER_SNAPSHOTS";// ElasticPartitionMetapublicstaticfinal String PARTITION_META_KEY = "PARTITION";// ElasticLeaderEpochCheckpointMetapublicstaticfinal String LEADER_EPOCH_CHECKPOINT_KEY = "LEADER_EPOCH_CHECKPOINT";// S3Stream 实际写S3privatefinal Stream innerStream;// key valueprivatefinal Map<String, MetadataValue> metaCache;}publicclassElasticLogMeta{// log/tim/txn -> segment里的3个stream对应StreamIdprivate Map<String, Long> streamMap = new HashMap<>();// n个segment的元数据private List<ElasticStreamSegmentMeta> segmentMetas = new LinkedList<>();}publicclassElasticStreamSegmentMeta{// segment的baseOffset,以前是看文件名privatelong baseOffset;privatelong createTimestamp;privatelong lastModifiedTimestamp;private String streamSuffix = "";privateint logSize;// log的offset区间private SliceRange log = new SliceRange();// time索引的offset区间private SliceRange time = new SliceRange();// 事务索引的offset区间private SliceRange txn = new SliceRange();privatelong firstBatchTimestamp;private TimestampOffsetData timeIndexLastEntry = new TimestampOffsetData();}

StreamSlice:Stream切片,一个Stream包含多个StreamSlice,一个StreamSlice对应一段数据区间。

对于Segment来说,一个Segment里的一个文件(如log文件)对应一个StreamSlice,只有最新的Slice可以写入(sealed=false)。

publicclassDefaultElasticStreamSliceimplementsElasticStreamSlice{privatefinallong startOffsetInStream;// 通过Stream写入数据privatefinal Stream stream;privatelong endOffset = Offsets.NOOP_OFFSET;// 是否封存privateboolean sealed = false;// 写public CompletableFuture<AppendResult> append(AppendContext context, RecordBatch recordBatch){if (sealed) {return FutureUtil.failedFuture();        }return stream.append(context, recordBatch).thenApply(AppendResultWrapper::new);    }// 读public CompletableFuture<FetchResult> fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint){long fixedStartOffset = Utils.max(startOffset, 0);return stream.fetch(context, startOffsetInStream + fixedStartOffset, startOffsetInStream + endOffset, maxBytesHint)                .thenApply(FetchResultWrapper::new);    }}

二、Stream管理

Stream元数据会存储在Controller里,内存中的形式如下:

publicclassStreamControlManager{// 自增streamId -> createStreamprivatefinal TimelineLong nextAssignedStreamId;// streamId -> Stream元数据privatefinal TimelineHashMap<Long, StreamRuntimeMetadata> streamsMetadata;}publicclassStreamRuntimeMetadata{privatefinallong streamId;// leaderEpochprivatefinal TimelineLong currentEpoch;// rangeIndex自增生成器privatefinal TimelineInteger currentRangeIndex;// 起始offsetprivatefinal TimelineLong startOffset;// 结束offsetprivatefinal TimelineLong endOffset;// CLOSE or OPENprivatefinal TimelineObject<StreamState> currentState;// tagsprivate Map<String, String> tags;// rangeIndex -> range元数据privatefinal TimelineHashMap<Integer, RangeMetadata> ranges;// objectId -> streamId+startOffset+endOffsetprivatefinal TimelineHashMap<Long, S3StreamObject> streamObjects;}publicclassRangeMetadataimplementsComparable<RangeMetadata{privatelong streamId;// 创建这个range的leaderEpochprivatelong epoch;privateint rangeIndex;privatelong startOffset;privatelong endOffset;// 独占这个range的节点idprivateint nodeId;}publicclassS3StreamObject{privatefinallong objectId;privatefinallong streamId;privatefinallong startOffset;privatefinallong endOffset;}

ElasticLog.apply:重写Kafka的LocalLog,Broker成为某个分区Leader,需要创建Partition,触发创建&开启MetaStream,用于读写分区元数据。

defapply(...): ElasticLog = {val key = formatStreamKey(namespace, topicPartition, topicId)// 【1】topicPartition->streamId(MetaStream)val value = client.kvClient().getKV(KeyValue.Key.of(key)).get()val topicIdStr: String = topicId.map(u => u.toString).getOrElse(topicPartition.topic())var partitionMeta: ElasticPartitionMeta = nullval metaNotExists = value.isNullvar metaStream: MetaStream = nullvar logStreamManager: ElasticLogStreamManager = nullval replicationFactor = 1val streamTags = new util.HashMap[StringString]()    streamTags.put(StreamTags.Topic.KEY, topicIdStr)    streamTags.put(StreamTags.Partition.KEYStreamTags.Partition.encode(topicPartition.partition()))try {// 【2】MetaStream(S3Stream) kv存储stream        metaStream = if (metaNotExists) {// 没创建stream,分配streamId并存储,开启Streamval stream = createMetaStream(client, key, replicationFactor, leaderEpoch, streamTags, logIdent = logIdent)            stream        } else {// 已经创建了stream,从kv获取分区对应MetaStream的streamIdval metaStreamId = Unpooled.wrappedBuffer(value.get()).readLong()// 请求Controller开启Stream,当前节点独占Stream CLOSE->OPEN val stream = client.streamClient().openStream(metaStreamId, OpenStreamOptions.builder().epoch(leaderEpoch).tags(streamTags).build())// S3Stream -> MetaStream(S3Stream)                .thenApply(stream => newMetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent))                .get()            stream        }// MetaStream 读当前分区的元数据流,恢复内存元数据val metaMap = metaStream.replay().asScala// MetaStream checkpoint数据val partitionMetaOpt = metaMap.get(MetaStream.PARTITION_META_KEY).map(m => m.asInstanceOf[ElasticPartitionMeta])// MetaStream 事务状态快照val producerSnapshotsMeta = metaMap.get(MetaStream.PRODUCER_SNAPSHOTS_META_KEY).map(m => m.asInstanceOf[ElasticPartitionProducerSnapshotsMeta]).getOrElse(newElasticPartitionProducerSnapshotsMeta())// MetaStream 每个segment的元数据val logMeta: ElasticLogMeta = metaMap.get(MetaStream.LOG_META_KEY).map(m => m.asInstanceOf[ElasticLogMeta]).getOrElse(newElasticLogMeta())// streamName(log/tim/txn) -> S3Stream        logStreamManager = newElasticLogStreamManager(...)// streamName(log/tim/txn) -> DefaultElasticStreamSlice(S3Stream、startOffsetInStream、endOffset)val streamSliceManager = newElasticStreamSliceManager(logStreamManager)// Map<Long, ElasticLogSegment> segmentsval logSegmentManager = newElasticLogSegmentManager(...)val segments = newCachedLogSegments(topicPartition)// 【3】重写kafka的LogLoader,加载Segment(StreamSlice)val offsets = newElasticLogLoader(...).load()// MetaStream 分区leader的checkpointval leaderEpochCheckpointMetaOpt = metaMap.get(MetaStream.LEADER_EPOCH_CHECKPOINT_KEY).map(m => m.asInstanceOf[ElasticLeaderEpochCheckpointMeta])val elasticLog = newElasticLog(...)if (partitionMeta.getCleanedShutdown) {            partitionMeta.setCleanedShutdown(false)            elasticLog.persistPartitionMeta()        }        elasticLog    }}

ElasticLog.createMetaStream:因为MetaStream只是在S3Stream上封装了自己的KV存储逻辑,所以和普通Stream创建和使用逻辑一致。

1)createStream:请求Controller申请唯一streamId;

2)openStream:请求Controller,传入streamId/nodeId/leaderEpoch,Controller标记Stream开启,创建一个新的StreamRange区间,该区间属于当前nodeId,返回Stream的起始offset和写入offset(比如对于log的Stream,就是logStartOffset和logEndOffset-LEO),Broker构建S3Stream;

3)putKVIfAbsent:针对MetaStream,把partition和streamId的关系存储到Controller,下次分区Leader可以再次获取到该MetaStream。其他如log和索引Stream,通过MetaStream中的ElasticLogMeta可以定位自己的streamId,不需要这一步;

private[streamaspect] defcreateMetaStream(client: Client, key: String, replicaCount: Int, leaderEpoch: Long, streamTags: util.Map[StringString],    logIdent: String): MetaStream = {val options = CreateStreamOptions.builder().replicaCount(replicaCount).epoch(leaderEpoch)    streamTags.forEach((k, v) => options.tag(k, v))val metaStream = client.streamClient().createAndOpenStream(options.build())        .thenApply(stream => newMetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent))        .get()val streamId = metaStream.streamId()val valueBuf = ByteBuffer.allocate(8)    valueBuf.putLong(streamId)    valueBuf.flip()    client.kvClient().putKVIfAbsent(KeyValue.of(key, valueBuf)).get()    metaStream}// S3StreamClient#createAndOpenStreampublic CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options) {return runInLock(() -> {        checkState();TimerUtil timerUtil = newTimerUtil();// 1. 从controller申请streamIdreturnFutureUtil.exec(() -> streamManager.createStream(options.tags()).thenCompose(streamId -> {// 2. 请求controller stream open,内存创建S3Streamreturn openStream0(streamId, options.epoch(), options.tags(), OpenStreamOptions.builder().epoch(options.epoch()).tags(options.tags()).build());        }), LOGGER"createAndOpenStream");    });}

下面看一下对应Segment纬度的StreamSlice,多种场景下会创建Segment:

1)Log下没有Segment,即MetaStream中ElasticLogMeta为空,创建新的Segment;

2)与1相反,从ElasticLogMeta中恢复n个Segment;

3)写Log触发Segment滚动;

ElasticLog.createAndSaveSegment:创建Segment,baseOffset=Segment的起始offset,Kafka中Segment的baseOffset提现在文件名上。

privatedefcreateAndSaveSegment(logSegmentManager: ElasticLogSegmentManager, suffix: String = "", logIdent: String)        (baseOffset: Long, dir: File, config: LogConfig, streamSliceManager: ElasticStreamSliceManager, time: Time)    : (ElasticLogSegmentCompletableFuture[Void]) = {val meta = newElasticStreamSegmentMeta()    meta.baseOffset(baseOffset)    meta.streamSuffix(suffix)    meta.createTimestamp(time.milliseconds())// 创建Segmentval segment: ElasticLogSegment = newElasticLogSegment(dir, meta, streamSliceManager, config, time, logSegmentManager.logSegmentEventListener(), logIdent)var metaSaveCf: CompletableFuture[Void] = CompletableFuture.completedFuture(null)// 持久化segment元数据到metastream      metaSaveCf = logSegmentManager.create(baseOffset, segment)    (segment, metaSaveCf)}//  ElasticLogSegmentManagerfinalMap<LongElasticLogSegment> segments = newHashMap<>();public CompletableFuture<Void> create(long baseOffset, ElasticLogSegment segment) {// 内存管理segments    segments.put(baseOffset, segment);// 持久化MetaStreamreturn asyncPersistLogMeta();}

ElasticLogSegment:一个Segment=log+时间索引+事务索引3个StreamSlice=3个分区纬度的Stream里的一段。

publicclassElasticLogSegmentextendsLogSegmentimplementsComparable<ElasticLogSegment{// segment元数据privatefinal ElasticStreamSegmentMeta meta;privatefinallong baseOffset;// log StreamSliceprivatefinal ElasticLogFileRecords log;// 时间索引 StreamSliceprivatefinal ElasticTimeIndex timeIndex;// 事务索引 StreamSliceprivatefinal ElasticTransactionIndex txnIndex;publicElasticLogSegment(        File dir,        ElasticStreamSegmentMeta meta,        ElasticStreamSliceManager sm,        LogConfig logConfig,        Time time,        ElasticLogSegmentEventListener segmentEventListener,        String logIdent    )throws IOException {super(nullnullnullnull, meta.baseOffset(), logConfig.indexInterval, logConfig.segmentJitterMs, time);this.meta = meta;        baseOffset = meta.baseOffset();        String suffix = meta.streamSuffix();// log的StreamSlice        log = new ElasticLogFileRecords(sm.loadOrCreateSlice("log" + suffix, meta.log()), baseOffset, meta.logSize());// 时间索引的StreamSlice        TimestampOffset lastTimeIndexEntry = meta.timeIndexLastEntry().toTimestampOffset();        timeIndex = new ElasticTimeIndex(            LogFileUtils.timeIndexFile(dir, baseOffset, suffix),            baseOffset,            logConfig.maxIndexSize,new DefaultStreamSliceSupplier(sm, "tim" + suffix, meta.time()),            lastTimeIndexEntry,            timeCache        );// 事务索引的StreamSlice        txnIndex = new ElasticTransactionIndex(            baseOffset,            LogFileUtils.transactionIndexFile(dir, baseOffset, suffix),new DefaultStreamSliceSupplier(sm, "txn" + suffix, meta.txn()),            txnCache        );    }

ElasticStreamSliceManager.loadOrCreateSlice:加载或创建StreamSlice。

如果是新Segment(不是从MetaStream恢复的),创建新StreamSlice。

如果Stream不存在,级联触发Stream创建(获取streamId,开启Stream);如果Stream存在,触发Stream开启(当前节点获取一个新的Stream Range区间)。

privatefinal Map<String, ElasticStreamSlice> lastSlices = new ConcurrentHashMap<>();privatefinal ElasticLogStreamManager streamManager;public ElasticStreamSlice loadOrCreateSlice(String streamName, SliceRange sliceRange)throws IOException {if (sliceRange.start() == Offsets.NOOP_OFFSET) {return newSlice(streamName);    }returnnew DefaultElasticStreamSlice(streamManager.getStream(streamName), sliceRange);}public ElasticStreamSlice newSlice(String streamName)throws IOException {// 关闭stream中上一个slice(segment),不能再写入数据    ElasticStreamSlice lastSlice = lastSlices.get(streamName);if (lastSlice != null) {        lastSlice.seal();    }// 创建新slice,区间为[-1,-1]    Stream stream = streamManager.getStream(streamName);    ElasticStreamSlice streamSlice = new DefaultElasticStreamSlice(stream, SliceRange.of(Offsets.NOOP_OFFSET, Offsets.NOOP_OFFSET));    lastSlices.put(streamName, streamSlice);return streamSlice;}// ElasticLogStreamManager// key=log/tim/txn value=s3streamprivatefinal Map<String, LazyStream> streamMap = new ConcurrentHashMap<>();public LazyStream getStream(String name)throws IOException {if (streamMap.containsKey(name)) {return streamMap.get(name);    }    LazyStream lazyStream = new LazyStream(name, LazyStream.NOOP_STREAM_ID, streamClient, replicaCount, epoch, tags, snapshotRead);    lazyStream.setListener(innerListener);// log和时间索引,立即创建或开启S3Streamboolean warmUp = "log".equals(name) || "tim".equals(name);if (warmUp) {        lazyStream.warmUp();    }    streamMap.put(name, lazyStream);return lazyStream;}

后续数据写入,即找到最后一个Segment→3个StreamSlice→3个Stream。

三、写

3-1、写Write-Ahead Log

3-1-1、写入路径:Log→Segment→Stream→S3Storage

Partition.appendRecordsToLeader:AutoMQ写入口

defappendRecordsToLeader(...): LogAppendInfo = {val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {    leaderLogIfLocal match {caseSome(leaderLog) =>// 写logval info = leaderLog.appendAsLeader(...)  }}

ElasticLog.append:写入主流程。

1)找到最后一个Segment写入消息---异步任务;

2)更新LEO,不等Segment写入完成;

3)Segment写入完成后(cf完成),更新confirmOffset;

4)confirmOffset更新成功,回调Partition.handleLeaderConfirmOffsetMove,更新高水位HW=confirmOffset,响应挂起的acks=-1的生产者请求;

// lastOffset=records最后一条消息的offsetoverrideprivate[log] defappend(lastOffset: Long, records: MemoryRecords): Unit = {val activeSegment = segments.activeSegment// 写segment    activeSegment.append(lastOffset, records)val endOffset = lastOffset + 1// 不等wal写入完成,就更新LEO    updateLogEndOffset(endOffset)val cf = activeSegment.asInstanceOf[ElasticLogSegment].asyncLogFlush()    cf.whenComplete((_, _) => {APPEND_PERMIT_SEMAPHORE.release(permit)    })    cf.thenAccept(_ => {        breakable {// 写segment完成后触发cf,更新confirmOffsetwhile (true) {val offset = _confirmOffset.get()if (offset.messageOffset < endOffset) {                    _confirmOffset.compareAndSet(offset, newLogOffsetMetadata(endOffset, activeSegment.baseOffset, activeSegment.size))                    notify = true                } else {break()                }            }        }// 调用Partition.handleLeaderConfirmOffsetMoveif (notify) {            appendAckQueue.offer(endOffset)            lastAppendAckFuture = appendAckThread.submit(newRunnable {overridedefrun(): Unit = {                    appendCallback(startNanos)                }            })        }    })}privatedefappendCallback(startNanos: Long): Unit = {// Partition.handleLeaderConfirmOffsetMove    confirmOffsetChangeListener.foreach(_.apply())}// Partition.handleLeaderConfirmOffsetMoveprivatedefhandleLeaderConfirmOffsetMove(): Unit = {  log match {caseSome(leaderLog) =>// 更新HW=confirmOffsetif (maybeIncrementLeaderHW(leaderLog)) {// 响应客户端(acks=-1)        tryCompleteDelayedRequests()      }  }}privatedefmaybeIncrementLeaderHW(leaderLog: UnifiedLog, currentTimeMs: Long = time.milliseconds): Boolean = {    leaderLog match {case elasticLog: ElasticUnifiedLog =>val confirmOffset = elasticLog.confirmOffset()        newHighWatermark = confirmOffsetthis.confirmOffset = confirmOffset.messageOffsetcase _ =>    }// ...}

ElasticLogSegment.append:Segment写入,最终调用StreamSlice(Stream)写入。

AutoMQ没有offset索引,只有时间索引,且索引间隔从4KB放大到1MB。

// StreamSliceprivatefinal ElasticLogFileRecords log;// StreamSliceprivatefinal ElasticTimeIndex timeIndex;publicvoidappend(long largestOffset,    MemoryRecords records)throws IOException {if (records.sizeInBytes() > 0) {// append the messageslong appendedBytes = log.append(records, largestOffset + 1);for (RecordBatch batch : records.batches()) {// ...// index.interval.bytes=1MB (kafka 4kb)if (bytesSinceLastIndexEntry > indexIntervalBytes) {                timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar());                bytesSinceLastIndexEntry = 0;            }        }    }}// ElasticLogFileRecordsprivatefinal ElasticStreamSlice streamSlice;publicintappend(MemoryRecords records, long lastOffset)throws IOException {int appendSize = records.sizeInBytes();int count = (int) (lastOffset - nextOffset());    com.automq.stream.DefaultRecordBatch batch = new com.automq.stream.DefaultRecordBatch(count, 0, Collections.emptyMap(), records.buffer());    AppendContext context = new AppendContext();    CompletableFuture<?> cf;// streamSlice写    cf = streamSlice.append(context, batch);}// ElasticStreamSliceprivatefinal Stream stream;public CompletableFuture<AppendResult> append(AppendContext context, RecordBatch recordBatch){if (sealed) {return FutureUtil.failedFuture();    }return stream.append(context, recordBatch).thenApply(AppendResultWrapper::new);}

S3Stream.append0:在消息批次外额外封装一层StreamRecordBatch,追加streamId、epoch-leaderEpoch、baseOffset-批次起始offset、count-消息数量,调用S3Storage写入。

// S3Storageprivatefinal Storage storage;private CompletableFuture<AppendResult> append0(AppendContext context, RecordBatch recordBatch){long offset = nextOffset.getAndAdd(recordBatch.count());    StreamRecordBatch streamRecordBatch = StreamRecordBatch.of(streamId, epoch, offset, recordBatch.count(), Unpooled.wrappedBuffer(recordBatch.rawPayload()));    CompletableFuture<AppendResult> cf = storage.append(context, streamRecordBatch).thenApply(nil -> {        updateConfirmOffset(offset + recordBatch.count());returnnew DefaultAppendResult(offset);    });}publicstatic StreamRecordBatch of(long streamId, long epoch, long baseOffset, int count, ByteBuf payload,    ByteBufSupplier alloc){int totalLength = HEADER_SIZE + payload.readableBytes();    ByteBuf buf = alloc.alloc(totalLength);    buf.writeByte(MAGIC_V0);    buf.writeLong(streamId);    buf.writeLong(epoch);    buf.writeLong(baseOffset);    buf.writeInt(count);    buf.writeInt(payload.readableBytes());    buf.writeBytes(payload);    payload.release();returnnew StreamRecordBatch(buf);}

3-1-2、聚合写入&对象布局

S3Storage.append:消息封装WAL请求,写S3。

privatefinal WriteAheadLog deltaWAL;public CompletableFuture<Void> append(AppendContext context, StreamRecordBatch streamRecord){finallong startTime = System.nanoTime();    CompletableFuture<Void> cf = new CompletableFuture<>();    WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, null, cf, context);    append0(context, writeRequest, false);return cf.whenComplete((nil, ex) -> {    });}publicbooleanappend0(AppendContext context, WalWriteRequest request, boolean fromBackoff){    CompletableFuture<AppendResult> appendCf;try {try {          StreamRecordBatch streamRecord = request.record;          streamRecord.retain();// 写s3          appendCf = deltaWAL.append(new TraceContext(context), streamRecord);        } catch (OverCapacityException e) {// 触发wal upload写真实数据            maybeForceUpload();returntrue;        }    } catch (Throwable e) {        request.cf.completeExceptionally(e);returnfalse;    }    appendCf.thenAccept(rst -> {// 缓存到LogCache(L1读缓存)        handleAppendCallback(request);    });returnfalse;}

DefaultWriter.append0:多个Stream会调用公共的WAL服务写入数据,多个Stream的数据可能合并到一个Bulk中。每个Bulk可能由于容量(8MB软限制、64MB硬限制)或延迟(10-250ms),刷写到S3中的某个WAL对象。

private Bulk activeBulk = null;public CompletableFuture<AppendResult> append0(    StreamRecordBatch streamRecordBatch)throws OverCapacityException, WALFencedException {// 1g没wal没写,异常,触发upload,写s3 dataif (bufferedDataBytes.get() > config.maxUnflushedBytes()) {thrownew OverCapacityException();    }int dataSize = streamRecordBatch.encoded().readableBytes() + RecordHeader.RECORD_HEADER_SIZE;    Record record = new Record(streamRecordBatch, new CompletableFuture<>());    lock.writeLock().lock();try {if (activeBulk == null) {            activeBulk = new Bulk(nextOffset.get());        }// 如果加上新批次,超出64mb,老bulk必须先写s3 wal,让新批次加入新bulkif (dataSize + activeBulk.size > DATA_FILE_ALIGN_SIZE) {            uploadActiveBulk();this.activeBulk = new Bulk(nextOffset.get());        }        bufferedDataBytes.addAndGet(dataSize);// wal加入内存bulk,攒批写s3 wal        activeBulk.add(record);// bulk+新批次 > 8mb,bulk写s3 walif (activeBulk.size > config.maxBytesInBatch()) {            uploadActiveBulk();        }    } finally {        lock.writeLock().unlock();    }return record.future.whenComplete((v, throwable) -> {        bufferedDataBytes.addAndGet(-dataSize);    });}classBulk{privateint size;privatefinallong baseOffset;privatefinal List<Record> records = new ArrayList<>(1024);privatefinallong startNanos;final CompletableFuture<ObjectStorage.WriteResult> uploadCf = new CompletableFuture<>();final CompletableFuture<Void> completeCf = new CompletableFuture<>();publicBulk(long baseOffset){this.startNanos = time.nanoseconds();this.baseOffset = baseOffset;long forceUploadDelayNanos = Math.min(            Math.max(// 10ms                minBulkUploadIntervalNanos,                lastBulkForceUploadNanos + batchNanos - startNanos            ),// 250ms            batchNanos);        lastBulkForceUploadNanos = startNanos + forceUploadDelayNanos;        SCHEDULE.schedule(() -> forceUploadBulk(this), forceUploadDelayNanos, TimeUnit.NANOSECONDS);    }}

DefaultWriter.uploadBulk0:构造WAL对象,多个Stream中的数据记录StreamRecordBatch按照顺序排列写入。如果Producer的acks=-1(all),WAL写成功后执行callback,更新confirmOffset,才响应所有相关客户端。

注:acks=1,只要提交wal异步任务,就会更新LEO,直接响应客户端成功,见ElasticLog.append和ReplicaManager.delayedProduceRequestRequired。

privatevoiduploadBulk0(Bulk bulk){try {long startTime = time.nanoseconds();// n个stream的数据按照<streamId, offset>排序      List<Record> records = bulk.records;// ...      CompositeByteBuf objectBuffer = ByteBufAlloc.compositeByteBuffer();// ...// endOffset向上对齐到64MiB      nextOffset = ObjectUtils.ceilAlignOffset(nextOffset);long endOffset = nextOffset;      ObjectStorage.WriteOptions writeOptions = new ObjectStorage.WriteOptions().enableFastRetry(true);// 对象存储key      String path = String.format(OBJECT_PATH_FORMAT, objectPrefix, firstOffset, endOffset);      FutureUtil.propagate(objectStorage.write(writeOptions, path, objectBuffer), bulk.uploadCf);long finalLastRecordOffset = lastRecordOffset;      bulk.uploadCf.whenCompleteAsync((rst, ex) -> {// 这里触发响应客户端          callback();      }, callbackExecutor);  } catch (Throwable ex) {      bulk.uploadCf.completeExceptionally(ex);  }}

WAL对象:

1)key=md5({node.id})/_kafka_{cluster.id}/{node.id}/{启动时间}/wal/{firstOffset}-{endOffset} ,如WAL对象0-67108864可能实际只有8MiB,但是命名会和64MiB对齐;

2)value=WALObjectHeader+RecordHeader[0]+StreamRecordBatch[0]+…+RecordHeader[n]+StreamRecordBatch[n]。

3-1-3、WAL缓存-LogCache

S3Storage.handleAppendCallback:WAL对象写入成功,将数据缓存到LogCache,用于加速读。

privatefinal LogCache deltaWALCache;privatevoidhandleAppendCallback(WalWriteRequest request){    request.record.retain();boolean full;// wal中的数据放到缓存里,加速读synchronized (deltaWALCache) {        full = deltaWALCache.put(request.record);        deltaWALCache.setLastRecordOffset(request.offset);    }// 如果缓存满了,触发真实data写,写完可以清理wal对象和内存缓存if (full) {        uploadDeltaWAL();    }// ...}

LogCache:维护了blocks列表,列表中最后一个LogCacheBlock是可以写入的缓存块,即activeBlock。StreamRecordBatch会整个放入activeBlock缓存。

publicclassLogCache{// 缓存 老wal->新walfinal List<LogCacheBlock> blocks = new ArrayList<>();// 活跃的block,可以把最近的wal加入这里private LogCacheBlock activeBlock;// LogCache缓存字节大小// Math.max(堆内存/2/ 3, (堆内存/2 - 3L * 1024 * 1024 * 1024) / 3 * 2) // 或 s3.wal.cache.sizeprivatefinallong capacity;// 每个LogCacheBlock缓存字节大小// math.min(capacity / 3, 500L * 1024 * 1024)privatefinallong cacheBlockMaxSize;// 每个LogCacheBlock中stream数量上限// s3.max.stream.num.per.stream.set.object=20000privatefinalint maxCacheBlockStreamCount;// 最后一个批次在S3里的起始offset(字节)private RecordOffset lastRecordOffset;publicbooleanput(StreamRecordBatch recordBatch){// 尝试释放已经归档的blocks,保证缓存容量90%且blocks数量小于65      tryRealFree();      size.addAndGet(recordBatch.occupiedSize());      readLock.lock();boolean full;try {// 加入缓存          full = activeBlock.put(recordBatch);      } finally {          readLock.unlock();      }return full;  }}publicstaticclassLogCacheBlock{final Map<Long, StreamCache> map = new ConcurrentHashMap<>();privatefinal AtomicLong size = new AtomicLong();// true-当前block需要upload,wal->datapublicbooleanput(StreamRecordBatch recordBatch){        map.compute(recordBatch.getStreamId(), (id, cache) -> {if (cache == null) {                cache = new StreamCache();            }            cache.add(recordBatch);return cache;        });        size.addAndGet(recordBatch.occupiedSize());return isFull();    }publicbooleanisFull(){// 超出cacheBlockMaxSize 或 maxCacheBlockStreamCountreturn size.get() >= maxSize || map.size() >= maxStreamCount;    }}staticclassStreamCache{    List<StreamRecordBatch> records;long startOffset = NOOP_OFFSET;long endOffset = NOOP_OFFSET;    Map<Long, IndexAndCount> offsetIndexMap = new HashMap<>();synchronizedvoidadd(StreamRecordBatch recordBatch){        records.add(recordBatch);if (startOffset == NOOP_OFFSET) {            startOffset = recordBatch.getBaseOffset();        }        endOffset = recordBatch.getLastOffset();    }}

3-2、写数据对象

activeBlock(LogCacheBlock)包含了多个S3中的wal对象,最终需要将这些wal对象写入data对象,清除wal。

3-2-1、触发时机&主流程

触发路径1,activeBlock满了,S3Storage.handleAppendCallback→S3Storage.uploadDeltaWAL。

privatevoidhandleAppendCallback(WalWriteRequest request){boolean full;synchronized (deltaWALCache) {        full = deltaWALCache.put(request.record);        deltaWALCache.setLastRecordOffset(request.offset);    }if (full) {        uploadDeltaWAL();    }}

触发路径2,每隔s3.wal.upload.interval.ms=60000对当前activeBlock触发一次,S3Storage.maybeForceUpload→S3Storage.uploadDeltaWAL。

publicS3Storage(...){if (config.walUploadIntervalMs() > 0) {this.backgroundExecutor.scheduleWithFixedDelay(this::maybeForceUpload,                config.walUploadIntervalMs(),                config.walUploadIntervalMs(), TimeUnit.MILLISECONDS);    }}

S3Storage.uploadDeltaWAL:wal转data主流程

1)创建新activeBlock用于后续wal缓存写入,老activeBlock需要上传s3;

2)DeltaWALUploadTaskContext-上下文,DefaultUploadWriteAheadLogTask-上传任务;

3)DefaultUploadWriteAheadLogTask上传三步骤:prepare-从controller获取objectId,upload-上传data对象,commit-上传data对象提交元数据到controller;

CompletableFuture<Void> uploadDeltaWAL(long streamId, boolean force){    CompletableFuture<Void> cf;synchronized (deltaWALCache) {// 创建新activeBlock,返回老activeBlock        Optional<LogCache.LogCacheBlock> blockOpt = deltaWALCache.archiveCurrentBlockIfContains(streamId);if (blockOpt.isPresent()) {            LogCache.LogCacheBlock logCacheBlock = blockOpt.get();            DeltaWALUploadTaskContext context = new DeltaWALUploadTaskContext(logCacheBlock);            context.objectManager = this.objectManager;            context.force = force;            cf = uploadDeltaWAL(context);        } else {            cf = CompletableFuture.completedFuture(null);        }    }return cf;}CompletableFuture<Void> uploadDeltaWAL(DeltaWALUploadTaskContext context){    CompletableFuture<Void> cf = new CompletableFuture<>();    context.cf = cf;    backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadDeltaWAL0(context), cf, LOGGER, "uploadDeltaWAL"));return cf;}privatevoiduploadDeltaWAL0(DeltaWALUploadTaskContext context){    context.task = newUploadWriteAheadLogTask(context.cache.records(), objectManager, rate);// prepare -> upload -> commit    prepareDeltaWALUpload(context);}protected UploadWriteAheadLogTask newUploadWriteAheadLogTask(Map<Long, List<StreamRecordBatch>> streamRecordsMap,    ObjectManager objectManager, double rate){return DefaultUploadWriteAheadLogTask.builder().config(config).streamRecordsMap(streamRecordsMap)        .objectManager(objectManager).objectStorage(objectStorage).executor(uploadWALExecutor).rate(rate).build();}privatevoidprepareDeltaWALUpload(DeltaWALUploadTaskContext context){// 1. prepare-从controller获取objectId(自增id)    context.task.prepare().thenAcceptAsync(nil -> {// 2. upload wal转data 上传s3        DeltaWALUploadTaskContext peek = walPrepareQueue.poll();        Objects.requireNonNull(peek).task.upload();// 3. commit wal转data 提交到controller        commitDeltaWALUpload(peek);// 4. 处理下一个任务        DeltaWALUploadTaskContext next = walPrepareQueue.peek();if (next != null) {            prepareDeltaWALUpload(next);        }    }, backgroundExecutor);}

3-2-2、prepare-获取objectId

DefaultUploadWriteAheadLogTask.prepare:第一步,从controller获取objectId(自增id)

这一步非必须,如果本次WAL里只有一个Stream,将在upload阶段获取objectId。见DefaultUploadWriteAheadLogTask.Builder.build,设置forceSplit。

public CompletableFuture<Long> prepare(){    startTimestamp = System.currentTimeMillis();if (forceSplit) {        prepareCf.complete(NOOP_OBJECT_ID);    } else {        objectManager// 1代表获取一个id            .prepareObject(1, TimeUnit.MINUTES.toMillis(60))            .thenAcceptAsync(prepareCf::complete, executor)            .exceptionally(ex -> {                prepareCf.completeExceptionally(ex);returnnull;            });    }return prepareCf;}

3-2-3、upload-wal转data写S3

DefaultUploadWriteAheadLogTask.upload0:第二步,把activeBlock中的数据真实写S3的data bucket,构造CommitStreamSetObjectRequest供第三步提交到Controller记录元数据。

// streamId -> activeBlock中的StreamRecordBatchprivatefinal Map<Long, List<StreamRecordBatch>> streamRecordsMap;voidupload0(long objectId){      uploadTimestamp = System.currentTimeMillis();      List<Long> streamIds = new ArrayList<>(streamRecordsMap.keySet());      Collections.sort(streamIds);      CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest();      ObjectWriter streamSetObject;if (forceSplit) {          streamSetObject = ObjectWriter.noop(objectId);      } else {          streamSetObject = ObjectWriter.writer(objectId, objectStorage, objectBlockSize, objectPartSize);      }      List<CompletableFuture<Void>> streamObjectCfList = new LinkedList<>();      List<CompletableFuture<Void>> streamSetWriteCfList = new LinkedList<>();for (Long streamId : streamIds) {          List<StreamRecordBatch> streamRecords = streamRecordsMap.get(streamId);int streamSize = streamRecords.stream().mapToInt(StreamRecordBatch::size).sum();if (forceSplit || streamSize >= streamSplitSizeThreshold) {// forceSplit=true --- 当前上传的LogCacheBlock里只有一个stream// stream>16MB,1个stream作为StreamObject直接上传,需要获取独立objectId              streamObjectCfList.add(writeStreamObject(streamRecords, streamSize).thenAccept(so -> {synchronized (request) {                      request.addStreamObject(so);                  }              }));          } else {// stream小,每个stream作为一个StreamRange,存储到公共的object              streamSetWriteCfList.add(acquireLimiter(streamSize).thenAccept(nil -> streamSetObject.write(streamId, streamRecords)));long startOffset = streamRecords.get(0).getBaseOffset();long endOffset = streamRecords.get(streamRecords.size() - 1).getLastOffset();              request.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset, streamSize));          }      }      request.setObjectId(objectId);      request.setOrderId(objectId);      CompletableFuture<Void> streamSetObjectCf = CompletableFuture.allOf(streamSetWriteCfList.toArray(new CompletableFuture[0]))// close触发公共object上传          .thenCompose(nil -> streamSetObject.close().thenAccept(nil2 -> {              request.setObjectSize(streamSetObject.size());              request.setAttributes(ObjectAttributes.builder().bucket(streamSetObject.bucketId()).build().attributes());          }));      List<CompletableFuture<?>> allCf = new LinkedList<>(streamObjectCfList);      allCf.add(streamSetObjectCf);      CompletableFuture.allOf(allCf.toArray(new CompletableFuture[0])).thenAccept(nil -> {          commitStreamSetObjectRequest = request;          uploadCf.complete(request);      }).exceptionally(ex -> {          uploadCf.completeExceptionally(ex);returnnull;      });  }

如果activeBlock中只有一个stream或者单个stream较大(超过16MB),1个Stream对应1个S3对象,每个对象称为StreamObject,都要从Controller获取唯一objectId。

publicclassStreamObject{privatelong objectId;privatelong objectSize;privatelong streamId;// log-消息偏移量 索引-字节偏移量privatelong startOffset;privatelong endOffset;privateint attributes = ObjectAttributes.UNSET.attributes();}

如果stream较小,合并到一个S3对象称为StreamSetObject,这个对象的objectId在prepare阶段获取,每个stream对应一段ObjectStreamRange

publicclassObjectStreamRange{privatelong streamId;privatelong epoch; // -1// log-消息偏移量 索引-字节偏移量privatelong startOffset;privatelong endOffset;privateint size;}

后续第三步使用CommitStreamSetObjectRequest提交到Controller存储元数据。

publicclassCommitStreamSetObjectRequest{// n个ObjectStreamRange合并到一个s3对象,StreamSetObject对象的idprivatelong objectId;// compact相关,这里就是objectIdprivatelong orderId;// StreamSetObject下streamRanges整体字节大小privatelong objectSize;// StreamSetObject下n个小stream的区间private List<ObjectStreamRange> streamRanges;// n个大stream 对应n个 S3大对象StreamObject private List<StreamObject> streamObjects;// compact相关private List<Long> compactedObjectIds;privateint attributes = ObjectAttributes.UNSET.attributes();}

S3对象的key={reverseHex(objectId)}/_kafka_{clusterId}/{objectId},其中reverseHex=8位十六进制并反转。如clusterId=123,objectId=42,key=a2000000/_kafka_123/42

ObjectWriter.DefaultObjectWriter.close:无论多个小Stream合并为一个大对象,还是每个大Stream一个对象,S3数据对象布局都如下

1)数据按照Blocks→Indexes→Footer写入,读取则相反;

2)Block存放真实数据。一个Stream可分布在多个 Block中,但一个 Block只属于一个 Stream。单 Stream内按StreamRecordBatch顺序累加字节大小,累计≥512KB切分为一个Block;

3-2-4、commit-提交元数据到Controller

假设3个stream,2个小stream进StreamSetObject,1 个大stream独立StreamObject。

Controller侧生成5条Record,并更新内存:

1)S3ObjectRecord:2条,对应S3Object;

publicclassS3ObjectControlManager{privatefinal TimelineHashMap<Long/*objectId*/, S3Object> objectsMetadata;}publicclassS3ObjectimplementsComparable<S3Object{privatefinallong objectId;privatelong objectSize = -1;privatelong timestamp;// COMMITTEDprivate S3ObjectState s3ObjectState = S3ObjectState.UNINITIALIZED;privateint attributes;}

2)S3StreamEndOffsetsRecord:1条,更新3个StreamRuntimeMetadata的endOffset;

3)S3StreamSetObjectRecord:1条,对应S3StreamSetObject;

4)S3StreamObjectRecord:1条,对应S3StreamObject;

publicclassStreamControlManager{// streamId -> Stream元数据privatefinal TimelineHashMap<Long/*streamId*/, StreamRuntimeMetadata> streamsMetadata;// nodeId -> Node元数据privatefinal TimelineHashMap<Integer/*nodeId*/, NodeRuntimeMetadata> nodesMetadata;}publicclassStreamRuntimeMetadata{privatefinallong streamId;// leaderEpochprivatefinal TimelineLong currentEpoch;// rangeIndex生成器privatefinal TimelineInteger currentRangeIndex;// 起始offsetprivatefinal TimelineLong startOffset;// 结束offsetprivatefinal TimelineLong endOffset;// CLOSE or OPENprivatefinal TimelineObject<StreamState> currentState;// tagsprivate Map<String, String> tags;// rangeIndex->segment元数据privatefinal TimelineHashMap<Integer/*rangeIndex*/, RangeMetadata> ranges;// objectId -> streamId+startOffset+endOffsetprivatefinal TimelineHashMap<Long/*objectId*/, S3StreamObject> streamObjects;publicclassS3StreamObject{privatefinallong objectId;privatefinallong streamId;privatefinallong startOffset;privatefinallong endOffset;    }}publicclassNodeRuntimeMetadata{privatefinalint nodeId;privatefinal TimelineLong nodeEpoch;privatefinal TimelineObject<Boolean> failoverMode;privatefinal TimelineHashMap<Long/*objectId*/, S3StreamSetObject> streamSetObjects;publicclassS3StreamSetObjectimplementsComparable<S3StreamSetObject{privatefinallong objectId;privatefinalint nodeId;privatefinallong orderId;privatefinallong dataTimeInMs;    }}

S3Storage.commitDeltaWALUpload:回到Broker侧

1)删除S3上的wal对象;2)可以尝试释放wal对应LogCache

privatevoidcommitDeltaWALUpload(DeltaWALUploadTaskContext context){    context.task.commit().thenAcceptAsync(nil -> {// 已经转为data,删除wal        delayTrim.trim(context.cache.lastRecordOffset(), context.trimCf);// 尝试释放LogCache        freeCache(context.cache);    }, backgroundExecutor)}

LogCache.markFree:尝试释放LogCache

1)tryRealFree:刚才wal虽然已经转换为data放入S3,但是为了加速读,wal对应LogCacheBlock不会立即从内存移除。只有LogCache缓存到达90%,或blocks列表超过64,才会触发内存回收;

2)tryMerge:此外,因为wal有序,可以尝试合并多个LogCacheBlock,加速后续缓存读取;

final List<LogCacheBlock> blocks = new ArrayList<>();// LogCache缓存容量(字节)// Math.max(堆内存/2/3, (堆内存/2 - 3L * 1024 * 1024 * 1024) / 3 * 2) privatefinallong capacity;// 当前缓存(字节)privatefinal AtomicLong size = new AtomicLong();// blocks长度64privatefinallong cacheBlockMaxSize;public CompletableFuture<Void> markFree(LogCacheBlock block){    block.free = true;// 如果LogCache超过90%容量 或 blocks数量超过64 实际释放blocks占用内存    tryRealFree();    CompletableFuture<Void> cf = new CompletableFuture<>();    LOG_CACHE_ASYNC_EXECUTOR.execute(() -> {try {// 合并连续blocks,加速后续缓存读取            tryMerge();            cf.complete(null);        } catch (Throwable t) {            cf.completeExceptionally(t);        }    });return cf;}

四、读

4-1、主流程

ElasticReplicaManager.fetchMessages:读消息入口

1)第一次标记fast-read,只读内存LogCache(WAL),fastFetchExecutor采用4线程处理热读;

2)如果fast-read失败,走LogCache→BlockCache→S3,slowFetchExecutor采用12线程处理冷读;

如果每次都走S3,Fetch线程会被 S3 IO 拖住;fast-read用独立小线程池,保证热读不被冷读阻塞。

overridedeffetchMessages(params: FetchParams,    fetchInfos: Seq[(TopicIdPartitionFetchRequest.PartitionData)],    quota: ReplicaQuota,    responseCallback: Seq[(TopicIdPartitionFetchPartitionData)] => Unit): Unit = {    fastFetchExecutor.submit(newRunnable {overridedefrun(): Unit = {try {ReadHint.markReadAll()// 1. 第一次标记fast-readReadHint.markFastRead()          fetchMessages0(params, fetchInfos, quota, fastFetchLimiter, 0, responseCallback)ReadHint.clear()        } catch {case e: Throwable =>val ex = FutureUtil.cause(e)// 2. 如果第一次抛出FastReadFailFastException异常,走slow-readval fastReadFailFast = ex.isInstanceOf[FastReadFailFastException]if (fastReadFailFast) {val timer = Time.SYSTEM.timer(params.maxWaitMs)              slowFetchExecutor.submit(newRunnable {overridedefrun(): Unit = {try {ReadHint.markReadAll()                    fetchMessages0(params, fetchInfos, quota, slowFetchLimiter, timer.remainingMs(), responseCallback)                  } catch {case slowEx: Throwable =>                      handleError(slowEx)                  }                }              })            }        }      }    })  }

S3Storage.read0:startOffset=客户端fetchOffset,endOffset=高水位=confirmOffset=wal写入进度。

// 1级缓存 WALprivatefinal LogCache deltaWALCache;// 2级缓存 持有s3protectedfinal S3BlockCache blockCache;private CompletableFuture<ReadDataBlock> read0(FetchContext context,        @SpanAttribute long streamId,        @SpanAttribute long startOffset,        @SpanAttribute long endOffset,        @SpanAttribute int maxBytes)// 读多少字节    LogCache firstCache = deltaWALCache;// LogCache只取右连续    List<StreamRecordBatch> logCacheRecords = firstCache.get(context, streamId, startOffset, endOffset, maxBytes);if (!logCacheRecords.isEmpty() && logCacheRecords.get(0).getBaseOffset() <= startOffset) {// case1-完全命中LogCache,直接返回return CompletableFuture.completedFuture(new ReadDataBlock(logCacheRecords, CacheAccessType.DELTA_WAL_CACHE_HIT));    }if (context.readOptions().fastRead()) {        releaseRecords(logCacheRecords);        logCacheRecords.clear();// case2-未完全命中LogCache,但fastRead,直接异常return CompletableFuture.failedFuture(FAST_READ_FAIL_FAST_EXCEPTION);    }if (!logCacheRecords.isEmpty()) {        endOffset = logCacheRecords.get(0).getBaseOffset();    }long finalEndOffset = endOffset;// case3-未完全命中LogCache,非fastRead,走S3BlockCache    CompletableFuture<ReadDataBlock> cf = blockCache.read(context, streamId, startOffset, endOffset, maxBytes).thenApply(blockCacheRst -> {// S3BlockCache记录        List<StreamRecordBatch> rst = new ArrayList<>(blockCacheRst.getRecords());// 合并 S3BlockCache记录 + LogCache右连续int remainingBytesSize = maxBytes - rst.stream().mapToInt(StreamRecordBatch::size).sum();int readIndex = -1;for (int i = 0; i < logCacheRecords.size()                  && remainingBytesSize > 0; i++) {            readIndex = i;            StreamRecordBatch record = logCacheRecords.get(i);            rst.add(record);            remainingBytesSize -= record.size();        }if (readIndex < logCacheRecords.size()) {            releaseRecords(logCacheRecords.subList(readIndex + 1, logCacheRecords.size()));        }returnnew ReadDataBlock(rst, blockCacheRst.getCacheAccessType());    })return FutureUtil.timeoutWithNewReturn(cf, 2, TimeUnit.MINUTES, () -> {        cf.thenAccept(readDataBlock -> {            releaseRecords(readDataBlock.getRecords());        });    });}

4-2、读LogCache

LogCache.get:

从头到尾(WAL写入顺序即从老到新)遍历n个LogCacheBlock,每个LogCacheBlock中都可能有streamId对应的n个StreamRecordBatch。

publicclassLogCache{// 缓存,包含activeBlockfinal List<LogCacheBlock> blocks = new ArrayList<>();private LogCacheBlock activeBlock;public List<StreamRecordBatch> get0(Long streamId, long startOffset, long endOffset, int maxBytes){        List<LogCacheBlock> blocks = this.blocks;for (LogCacheBlock archiveBlock : blocks) {            List<StreamRecordBatch> records = archiveBlock.get(streamId, nextStartOffset, endOffset, nextMaxBytes);        }    }publicstaticclassLogCacheBlock{final Map<Long, StreamCache> map = new ConcurrentHashMap<>();public List<StreamRecordBatch> get(Long streamId, long startOffset, long endOffset, int maxBytes){          StreamCache cache = map.get(streamId);if (cache == null) {return Collections.emptyList();          }return cache.get(startOffset, endOffset, maxBytes);      }   }}

LogCache.StreamCache.get:

在每个LogCacheBlock中,对StreamRecordBatch列表使用request.startOffset二分查找起始StreamRecordBatch,读取直到到达request.endOffset。

此外,由于消费一般是顺序读,offsetIndexMap缓存上次从StreamRecordBatchs读到的下标,顺序读可以降低复杂度到O(1)。

// LogCache.StreamCache.javaList<StreamRecordBatch> records;long startOffset = NOOP_OFFSET;long endOffset = NOOP_OFFSET;Map<Long, IndexAndCount> offsetIndexMap = new HashMap<>();synchronized List<StreamRecordBatch> get(long startOffset, long endOffset, int maxBytes){// startOffset不在StreamCache的范围内,直接返回if (this.startOffset > startOffset || this.endOffset <= startOffset) {return Collections.emptyList();    }// 二分查找 or 命中offsetIndexMapint startIndex = searchStartIndex(startOffset);if (startIndex < 0) {// mismatchedreturn Collections.emptyList();    }int endIndex = -1;int remainingBytesSize = maxBytes;long rstEndOffset = NOOP_OFFSET;for (int i = startIndex; i < records.size(); i++) {        StreamRecordBatch record = records.get(i);        endIndex = i + 1;        remainingBytesSize -= Math.min(remainingBytesSize, record.size());        rstEndOffset = record.getLastOffset();if (record.getLastOffset() >= endOffset || remainingBytesSize == 0) {break;        }    }// 如果顺序读取,缓存下一个offset在records中的下标到offsetIndexMap// 在searchStartIndex中可以快速定位,避免二分if (rstEndOffset != NOOP_OFFSET) {        map(rstEndOffset, endIndex);    }returnnew ArrayList<>(records.subList(startIndex, endIndex));}intsearchStartIndex(long startOffset){  IndexAndCount indexAndCount = offsetIndexMap.get(startOffset);if (indexAndCount != null) {// 顺序读优化,直接定位records下标      unmap(startOffset, indexAndCount);return indexAndCount.index;  } else {// 对records二分查找      StreamRecordBatchList search = new StreamRecordBatchList(records);return search.search(startOffset);  }}finalvoidmap(long offset, int index){    offsetIndexMap.compute(offset, (k, v) -> {if (v == null) {returnnew IndexAndCount(index);        } else {            v.inc();return v;        }    });}finalvoidunmap(long startOffset, IndexAndCount indexAndCount){if (indexAndCount.dec() == 0) {        offsetIndexMap.remove(startOffset);    }}staticclassIndexAndCount{// 在records中的下标int index;// 引用次数int count;publicIndexAndCount(int index){this.index = index;this.count = 1;    }}

4-3、读BlockCache

BlockCache的实现是StreamReaders

StreamReaders.read:读请求会根据streamId分发到固定的EventLoop线程处理,EventLoop线程数=核数。

publicclassStreamReadersimplementsS3BlockCache{// 会话级-数据索引缓存privatefinal Cache[] caches;// 数据缓存privatefinal DataBlockCache dataBlockCache;publicStreamReaders(...){// concurrency=核数        EventLoop[] eventLoops = new EventLoop[concurrency];for (int i = 0; i < concurrency; i++) {            eventLoops[i] = new EventLoop("stream-reader-" + i);        }// 会话级-数据索引缓存this.caches = new Cache[concurrency];for (int i = 0; i < concurrency; i++) {            caches[i] = new Cache(eventLoops[i]);        }// 全局-Block数据缓存this.dataBlockCache = new DataBlockCache(size, eventLoops);// 每分钟清理1分钟没访问的StreamReaders.Cache        Threads.COMMON_SCHEDULER.scheduleAtFixedRate(this::triggerExpiredStreamReaderCleanup,            STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS,            STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS,            TimeUnit.MILLISECONDS);    }classCache{privatefinal EventLoop eventLoop;privatefinal Map<StreamReaderKey, StreamReader> streamReaders;    }public CompletableFuture<ReadDataBlock> read(TraceContext context, long streamId, long startOffset, long endOffset,int maxBytes){        Cache cache = caches[Math.abs((int) (streamId % caches.length))];return cache.read(streamId, startOffset, endOffset, maxBytes);    }}

4-3-1、StreamReader-会话级数据索引缓存

StreamReaders.Cache.read:以streamId+读取offset维度创建StreamReader,后续顺序读会复用同一个StreamReader。

publicclassStreamReadersimplementsS3BlockCache{// 会话级-数据索引缓存privatefinal Cache[] caches;// 全局-数据缓存privatefinal DataBlockCache dataBlockCache;public CompletableFuture<ReadDataBlock> read(TraceContext context, long streamId, long startOffset, long endOffset,int maxBytes){        Cache cache = caches[Math.abs((int) (streamId % caches.length))];return cache.read(streamId, startOffset, endOffset, maxBytes);    }staticclassStreamReaderKey{finallong streamId;finallong startOffset;    }classCache{privatefinal EventLoop eventLoop;privatefinal Map<StreamReaderKey, StreamReader> streamReaders;public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset,long endOffset, int maxBytes){            CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();            eventLoop.execute(() -> {// 清理streamReaders1分钟没使用的StreamReader                cleanupExpiredStreamReader();// 如果是顺序读 streamReaders会有缓存上次的StreamReader                StreamReaderKey key = new StreamReaderKey(streamId, startOffset);                StreamReader streamReader = streamReaders.remove(key);if (streamReader == null) {// 非顺序读/过期/新读 创建StreamReader                    streamReader = new StreamReader(...);                }                StreamReader finalStreamReader = streamReader;// 读                CompletableFuture<ReadDataBlock> streamReadCf = streamReader.read(startOffset, endOffset, maxBytes)                    .whenComplete((rst, ex) -> {if (ex != null) {                            finalStreamReader.close();                        } else {// 缓存StreamReader,用于下次顺序读                            StreamReader oldStreamReader = streamReaders.put(new StreamReaderKey(streamId, finalStreamReader.nextReadOffset()), finalStreamReader);if (oldStreamReader != null) {                                oldStreamReader.close();                            }                        }                    });                FutureUtil.propagate(streamReadCf, cf);            });return cf;        }    }}

StreamReader缓存的是,某个会话(streamId+读取offset)的索引和元数据,即blocksMap。通过这个缓存,下次顺序访问可以避免重复查询元数据和Block索引。

publicclassStreamReader{// key=DataBlockIndex.startOffset value=元数据+索引final NavigableMap<Long, Block> blocksMap = new TreeMap<>();// 全局-Block数据缓存privatefinal DataBlockCache dataBlockCache;// 下次读取的offsetlong nextReadOffset;// 上次访问时间privatelong lastAccessTimestamp;// 缓存项classBlock{// 元数据final S3ObjectMetadata metadata;// 索引(见3-2-3对象布局)final DataBlockIndex index;// 临时变量,从BlockCache拿来的实际data数据        DataBlock data;    }}publicclassS3ObjectMetadata{privatefinallong objectId;privatefinal List<StreamOffsetRange> offsetRanges;privatelong objectSize;}publicclassStreamOffsetRangeimplementsComparable<StreamOffsetRange{privatefinallong streamId;privatefinallong startOffset;privatefinallong endOffset;}publicfinalclassDataBlockIndex{privatefinalint blockId;privatefinallong streamId;privatefinallong startOffset;privatefinalint endOffsetDelta;privatefinalint recordCount;privatefinallong startPosition;privatefinalint size;}

StreamReader.getBlocks0:Cache-Aside,读缓存 → miss → 读S3数据对象里的索引 → 写回缓存。

// 索引项startOffset -> 缓存(索引+S3对象元数据)final NavigableMap<Long, Block> blocksMap = new TreeMap<>();// blocksMap最后一个索引的endOffsetlong loadedBlockIndexEndOffset = 0L;privatevoidgetBlocks0(GetBlocksContext ctx, long startOffset, long endOffset, int maxBytes){    Long floorKey = blocksMap.floorKey(startOffset);    CompletableFuture<Boolean> loadMoreBlocksCf;int remainingSize = maxBytes;if (floorKey == null || startOffset >= loadedBlockIndexEndOffset) {// 请求范围超出缓存范围,缓存miss,加载数据到缓存        loadMoreBlocksCf = loadMoreBlocksWithoutData(endOffset);    } else {// 缓存命中,循环索引boolean firstBlock = true;boolean fulfill = false;for (Map.Entry<Long, Block> entry : blocksMap.tailMap(floorKey).entrySet()) {            Block block = entry.getValue();long objectId = block.metadata.objectId();            DataBlockIndex index = block.index;if (!firstBlock || index.startOffset() == startOffset) {                remainingSize -= index.size();            }if (firstBlock) {                firstBlock = false;            }// 根据索引查询数据到Block,加入结果集            block = block.newBlockWithData(ctx.readahead);            ctx.blocks.add(block);if ((endOffset != -1L && index.endOffset() >= endOffset)                    || remainingSize <= 0) {// 索引的结束offset 超过 请求的endOffset 读完了                fulfill = true;break;            }        }if (fulfill) {// 读完了,可以返回            ctx.cf.complete(ctx.blocks);return;        } else {// 没读完,继续加载索引到内存            loadMoreBlocksCf = loadMoreBlocksWithoutData(endOffset);        }    }int finalRemainingSize = remainingSize;// 递归调用自己,直到满足fulfill条件    loadMoreBlocksCf.thenAcceptAsync(moreBlocks -> {// ...        getBlocks0(ctx, nextStartOffset, endOffset, finalRemainingSize);    }, eventLoop);}public Block newBlockWithData(boolean readahead){    Block newBlock = new Block(metadata, index);    ObjectReader objectReader = objectReaderFactory.get(metadata);    DataBlockCache.GetOptions getOptions = DataBlockCache.GetOptions.builder().readahead(readahead).build();// 全局数据缓存,根据索引查数据    loadCf = dataBlockCache.getBlock(getOptions, objectReader, index).thenAccept(newData -> {        newBlock.data = newData;    }).exceptionally(ex -> {        exception = ex;        newBlock.exception = ex;returnnull;    }).whenComplete((nil, ex) -> objectReader.release());    newBlock.loadCf = loadCf;return newBlock;}

StreamReader.loadMoreBlocksWithoutData0:blocksMap缓存未命中

1)根据streamId+offset范围,找到S3对象元数据;

2)根据objectId+offset范围,获取相关DataBlockIndex索引,对应上面写数据对象,BlockIndexes区域;

// 读取offset -> BlockIndex索引+S3对象元数据final NavigableMap<Long, Block> blocksMap = new TreeMap<>();private CompletableFuture<Void> loadMoreBlocksWithoutData0(long endOffset){if (inflightLoadIndexCf != null) {// 如果已经在加载,等上次处理完再处理这次的return inflightLoadIndexCf.thenCompose(rst -> loadMoreBlocksWithoutData0(endOffset));  }long currentBlocksEpoch = blocksEpoch;  inflightLoadIndexCf = new CompletableFuture<>();long nextLoadingOffset = calWindowBlocksEndOffset();  AtomicLong nextFindStartOffset = new AtomicLong(nextLoadingOffset);  TimerUtil time = new TimerUtil();// 1. 查询streamId在offset范围内的S3对象  CompletableFuture<List<S3ObjectMetadata>> getObjectsCf = objectManager.getObjects(streamId, nextLoadingOffset, endOffset, GET_OBJECT_STEP);// 2. 循环S3对象,找BlockIndex  CompletableFuture<Void> findBlockIndexesCf = getObjectsCf.whenComplete((rst, ex) -> {      StorageOperationStats.getInstance().getIndicesTimeGetObjectStats.record(time.elapsedAndResetAs(TimeUnit.NANOSECONDS));  }).thenComposeAsync(objects -> {      CompletableFuture<Void> prevCf = CompletableFuture.completedFuture(null);for (S3ObjectMetadata objectMetadata : objects) {          ObjectReader objectReader = objectReaderFactory.get(objectMetadata);          objectReader.basicObjectInfo();          prevCf = prevCf.thenCompose(nil -> {return objectReader.find(streamId, nextFindStartOffset.get(), -1L, Integer.MAX_VALUE).thenAcceptAsync(findRst -> {                  findRst.streamDataBlocks().forEach(streamDataBlock -> {                      DataBlockIndex index = streamDataBlock.dataBlockIndex();                      Block block = new Block(objectMetadata, index);// S3对象元数据 + BlockIndex 放到 Block 缓存if (!putBlock(block)) {thrownew BlockNotContinuousException();                      }                      nextFindStartOffset.set(streamDataBlock.getEndOffset());                  });              }, eventLoop);          }).whenComplete((nil, ex) -> objectReader.release());      }return prevCf;  }, eventLoop);}privatebooleanputBlock(Block block){// ...Block索引连续性校验    lastBlock = block;    blocksMap.put(block.index.startOffset(), block);    loadedBlockIndexEndOffset = block.index.endOffset();returntrue;}

S3StreamsMetadataImage.getObjects0:根据streamId+offset范围,获取S3对象元数据

1)StreamObject,1个stream-1个object,streamId→S3StreamObject,直接拿到结果;

2)StreamSetObject,n个stream-1个object,streamId→Range元数据(segments)-nodeId→Node元数据-objectId→S3对象索引块→相关offset区间;

publicfinalclassS3StreamsMetadataImage{// streamId->stream元数据privatefinal TimelineHashMap<Long, S3StreamMetadataImage> streamMetadataMap;// nodeId->Node元数据privatefinal TimelineHashMap<Integer, NodeS3StreamSetObjectMetadataImage> nodeMetadataMap; }publicclassS3StreamMetadataImage{// segments offset区间->归属nodeIdprivatefinal List<RangeMetadata> ranges;publicclassRangeMetadataimplementsComparable<RangeMetadata{privatelong startOffset;privatelong endOffset;privateint nodeId;    }// SO 一个stream一个s3对象 直接定位final DeltaList<S3StreamObject> streamObjects;publicclassS3StreamObject{privatefinallong objectId;privatefinallong streamId;privatefinallong startOffset;privatefinallong endOffset;    }}publicclassNodeS3StreamSetObjectMetadataImage{// SSO n个stream一个s3对象privatefinal DeltaList<S3StreamSetObject> s3Objects;publicclassS3StreamSetObjectimplementsComparable<S3StreamSetObject{privatefinallong objectId;    }}

ObjectReader.DefaultObjectReader.basicObjectInfo:S3 range读对象的Footer和Index区域,获取DataBlockIndex索引。ObjectReader有objectId纬度缓存,对于StreamSetObject,getObjects0已经读到Index了,不会重复调用S3。

S3StreamObject不能像StreamObject一样,直接获取S3对象元数据。需要通过扫描nodeIds(NodeS3StreamSetObjectMetadataImage)→objectIds(S3StreamObject)→S3对象索引块→streamId和offset相关索引块。

为了避免每次遍历所有Node元数据里的S3StreamSetObject,找streamId对应的S3对象索引块,这里还有3个缓存索引

索引查询入口为S3StreamsMetadataImage.getStartSearchIndex0,入参是nodeId和offset,出参是offset在这个节点元数据的S3StreamSetObject集合中的下标,这样只要从指定位置开始扫描即可。

private CompletableFuture<Integer> getStartSearchIndex0(        NodeS3StreamSetObjectMetadataImage node, long startOffset,        GetObjectsContext ctx){}

NodeS3StreamSetObjectMetadataImage:node元数据LRU缓存offsetIndexMap,用于顺序读。streamId→offset→S3StreamSetObject集合下标,读流程中会缓存endOffset到这里,下次顺序读可以用到。

publicclassNodeS3StreamSetObjectMetadataImage{privatefinal DeltaList<S3StreamSetObject> s3Objects;privatefinal StreamOffsetIndexMap offsetIndexMap;// streamId -> offset -> s3Objects的下标// private final Map<Long, NavigableMap<Long, Integer>> streamOffsetIndexMap;}

LocalStreamRangeIndexCache:稀疏索引存储(startOffset,endOffset,objectId)。如果nodeId=当前节点,才能走这个缓存,通过offset二分定位objectId。

缓存写入时机:1)当前节点写S3数据对象提交到Controller后写入,定时写入S3,key=hash(sparse-index-{nodeId})/_kafka_{clusterId}/node-{nodeId};2)启动后重新加载到内存;

publicclassLocalStreamRangeIndexCache{// streamId->(startOffset,endOffset,objectId)privatefinal Map<Long, SparseRangeIndex> streamRangeIndexMap = new HashMap<>();}publicclassSparseRangeIndex{private List<RangeIndex> sortedRangeIndexList;}publicclassRangeIndex{privatefinallong startOffset;privatefinallong endOffset;privatefinallong objectId;}

NodeRangeIndexCache:如果nodeId≠当前节点,读流程中从S3加载其他节点的索引(key=hash(sparse-index-{nodeId})/_kafka_{clusterId}/node-{nodeId},nodeId等于请求匹配到的nodeId),也是通过offset二分定位objectId。

publicclassNodeRangeIndexCache{privatefinal ExpireLRUCache nodeRangeIndexMap = new ExpireLRUCache(MAX_CACHE_SIZE, DEFAULT_EXPIRE_TIME_MS);staticclassExpireLRUCacheextendsAsyncLRUCache<LongStreamRangeIndexCache{    }staticclassStreamRangeIndexCacheimplementsAsyncMeasurable{// streamId->(startOffset,endOffset,objectId)privatefinal CompletableFuture<Map<Long, List<RangeIndex>>> streamRangeIndexMapCf;private CompletableFuture<Integer> sizeCf;    }}

4-3-2、DataBlockCache-真实数据缓存

StreamReader.Block.newBlockWithData:上面获取到S3元数据和Block索引,获取真实Block数据。

privatefinal DataBlockCache dataBlockCache;classBlock{final S3ObjectMetadata metadata;final DataBlockIndex index;    CompletableFuture<Void> loadCf;    DataBlock data;public Block newBlockWithData(boolean readahead){        Block newBlock = new Block(metadata, index);        ObjectReader objectReader = objectReaderFactory.get(metadata);        DataBlockCache.GetOptions getOptions = DataBlockCache.GetOptions.builder().readahead(readahead).build();        loadCf = dataBlockCache.getBlock(getOptions, objectReader, index).thenAccept(newData -> {            newBlock.data = newData;        });        newBlock.loadCf = loadCf;return newBlock;    }}

DataBlockCache.getBlock:每个Block索引的数据缓存在DataBlockCache.Cache.blocks中,cache miss触发读S3。这里已经拿到了DataBlockIndex索引,所以直接发起rangeRead即可。

publicclassDataBlockCache{final Cache[] caches;public CompletableFuture<DataBlock> getBlock(GetOptions options,                                                  ObjectReader objectReader,                                                  DataBlockIndex dataBlockIndex){        Cache cache = cache(dataBlockIndex.streamId());return cache.getBlock(options, objectReader, dataBlockIndex);    }private Cache cache(long streamId){return caches[(int) Math.abs(streamId % caches.length)];    }classCache{// (objectId,blockIndex) -> block数据块final Map<DataBlockGroupKey, DataBlock> blocks = new HashMap<>();final LRUCache<DataBlockGroupKey, DataBlock> lru = new LRUCache<>();private CompletableFuture<DataBlock> getBlock0(GetOptions options, ObjectReader objectReader,            DataBlockIndex dataBlockIndex){long objectId = objectReader.metadata().objectId();            DataBlockGroupKey key = new DataBlockGroupKey(objectId, dataBlockIndex);            DataBlock dataBlock = blocks.get(key);if (dataBlock == null) {// 索引对应block缓存没命中                DataBlock newDataBlock = new DataBlock(objectId, dataBlockIndex, this, time);                dataBlock = newDataBlock;                blocks.put(key, newDataBlock);// 读S3                read(options, objectReader, newDataBlock, eventLoop);            }            lru.touchIfExist(key);        }    }}publicclassDataBlockextendsAbstractReferenceCounted{privatefinallong objectId;privatefinal DataBlockIndex dataBlockIndex;// 数据缓存private ObjectReader.DataBlockGroup dataBlockGroup;}classDataBlockGroupimplementsAutoCloseable{// 数据缓存privatefinal ByteBuf buf;// StreamRecordBatch个数privatefinalint recordCount;}// DefaultObjectReader#readpublic CompletableFuture<DataBlockGroup> read(ReadOptions readOptions,                                               DataBlockIndex block){    CompletableFuture<ByteBuf> rangeReadCf = objectStorage.rangeRead(new ObjectStorage.ReadOptions()          .throttleStrategy(readOptions.throttleStrategy).bucket(metadata.bucket()),        metadata.key(),        block.startPosition(),        block.endPosition()    );return rangeReadCf.thenApply(buf -> {        ByteBuf pooled = ByteBufAlloc.byteBuffer(buf.readableBytes(), BLOCK_CACHE);        pooled.writeBytes(buf);        buf.release();returnnew DataBlockGroup(pooled);    });}

AbstractObjectStorage.rangeRead:5ms一次合并读。

privatefinal List<AbstractObjectStorage.ReadTask> waitingReadTasks = new LinkedList<>();protectedAbstractObjectStorage(...)if(!manualMergeRead){        scheduler.scheduleWithFixedDelay(this::tryMergeRead, 55, TimeUnit.MILLISECONDS);    }}public CompletableFuture<ByteBuf> rangeRead(ReadOptions options, String objectPath, long start, long end){  CompletableFuture<ByteBuf> cf = new CompletableFuture<>();synchronized (waitingReadTasks) {              waitingReadTasks.add(new AbstractObjectStorage.ReadTask(options, objectPath, start, end, cf));  }// ...}

合并读:同一 object path 的多个小 range read 合并成一次请求,减少 S3 GET 次数。 读回整块数据后,按各子任务的 [start, end) 切片分发。

五、Compaction

5-1、概述

AutoMQ 将日志数据持久化到 S3,WAL flush 后产生两类对象:

1)Stream Set Object (SSO) :一个 S3 对象包含多个 stream 的数据,由多 stream 批量上传产生,元数据挂在上传的Node上。

2)Stream Object (SO) :一个 S3 对象只服务单个 stream,大 stream(>16MB)单独上传时产生。 随着时间推移,SSO / SO 数量过多 导致 元数据膨胀、读放大。

Compaction 是后台异步任务,负责 合并小对象、拆分过大对象、清理过期数据

CompactionManager.start:对于SSO,CompactionManager每5分钟做一次compaction,元数据通过CommitStreamSetObjectRequest提交到Controller。

publicvoidstart(){    scheduleNextCompaction((longthis.compactionInterval * 60 * 1000);}voidscheduleNextCompaction(long delayMillis){this.compactionScheduledExecutor.schedule(() -> {this.compact().join();        scheduleNextCompaction(nextDelay);    }, delayMillis, TimeUnit.MILLISECONDS);}

S3StreamClient.startStreamObjectsCompactions:对于SO,S3StreamClient对开启的Stream每分钟做compaction,元数据通过CommitStreamObjectRequest提交到Controller。

privatevoidstartStreamObjectsCompactions(){    streamObjectCompactionScheduler.scheduleWithFixedDelay(() -> {// 集群中s3数据对象总数        CompactionHint hint = new CompactionHint(objectManager.getObjectsCount().get());        List<StreamWrapper> operationStreams = new ArrayList<>(openedStreams.values());        operationStreams.forEach(s -> s.compact(hint));    }, compactionJitterDelay, 1, TimeUnit.MINUTES);}

5-2、StreamSetObject Compaction

SSO的compaction主流程:

CompactionManager.start()  → scheduleNextCompaction()                     // 定时调度,默认 5 分钟    → compact()      → objectManager.getServerObjects()         // 读 本broker 所有 SSO 元数据      → updateStreamDataBlockMap()               // 读 S3数据对象的 blockIndex      → streamManager.getStreams()               // 读 stream元数据      → compact(streamMetadataList, objectMetadataList)          ├── forceSplitObjects()       // 强制拆分          └── compactObjects()          // 压缩

CompactionManager.compact:将SSO分为两组

1)120分钟以前的SSO,forceSplitObjects,N 个 SSO 强制拆分 → N个SO;

2)最近的SSO,compactObjects,N 个 SSO 压缩 → 1个 SSO + N 个 SO;

privatevoidcompact(List<StreamMetadata> streamMetadataList,    List<S3ObjectMetadata> objectMetadataList)throws CompletionException {    Map<Boolean, List<S3ObjectMetadata>> objectMetadataFilterMap = convertS3Objects(objectMetadataList);    List<S3ObjectMetadata> objectsToForceSplit = objectMetadataFilterMap.get(true);    List<S3ObjectMetadata> objectsToCompact = objectMetadataFilterMap.get(false);if (!objectsToForceSplit.isEmpty()) {        forceSplitObjects(streamMetadataList, objectsToForceSplit);    }    compactObjects(streamMetadataList, objectsToCompact);}Map<Boolean, List<S3ObjectMetadata>> convertS3Objects(List<S3ObjectMetadata> streamSetObjectMetadata) {returnnew HashMap<>(streamSetObjectMetadata.stream()        .collect(Collectors.partitioningBy(e ->            (System.currentTimeMillis() - e.dataTimeInMs())            >= TimeUnit.MINUTES.toMillis(this.forceSplitObjectPeriod))));}

forceSplitObjects强制拆分

1)CompactionManager.forceSplitObjects:循环SSO,将每个S3对象切分为SO,发送CommitStreamSetObjectRequest提交。

2)CompactionManager.groupAndSplitStreamDataBlocks:在不超过200MB内存占用的情况下,尽量并行处理,将SSO中同stream的连续offset的blocks合并写入S3成为1个SO。

compactObjects压缩

1)CompactionAnalyzer.analyze:入参=SSO的objectId和对应的BlockIndex集合,生成N个CompactionPlan,每个Plan是一轮S3读写,保证内存占用不超过200MB。

即使SSO超过200MB,使用S3的Multi-Part API可以分片上传,保证内存占用不超过200MB。

// streamDataBlockMap key = objectId value = [{objectId,DataBlockIndex(streamId)}]public List<CompactionPlan> analyze(Map<Long, List<StreamDataBlock>> streamDataBlockMap,    Set<Long> excludedObjectIds){// 过滤出streamId 至少存在于 两个object 中的这部分 object    streamDataBlockMap = filterBlocksToCompact(streamDataBlockMap);if (streamDataBlockMap.isEmpty()) {return Collections.emptyList();    }// 分为两类CompactedObjectBuilder SPLIT-拆分为SO COMPACT-合并为一个SSO    List<CompactedObjectBuilder> compactedObjectBuilders = groupObjectWithLimits(streamDataBlockMap, excludedObjectIds);// 按照200MB内存限制,分为N个Plan执行return generatePlanWithCacheLimit(compactedObjectBuilders);}

CompactionAnalyzer.groupObjectWithLimits:

  • SSO索引按照streamId-startOffset排序,顺序处理;
  • 最终结果要求,SSO中stream数量 ≤ 2w 个,SO ≤ 1w 个
  • compactObjects压缩算法,相同 stream 超过 8MB 拆分为SO,其他合并为SSO;
publicclassCompactedObjectBuilder{privatefinal List<StreamDataBlock> streamDataBlocks;// SPLIT-代表拆分为SO COMPACT-代表合并为一个SSOprivate CompactionType type;}// streamDataBlockMap key = objectId value = [{objectId,DataBlockIndex(streamId)}]List<CompactedObjectBuilder> groupObjectWithLimits(Map<Long, List<StreamDataBlock>> streamDataBlockMap,        Set<Long> excludedObjectIds){// 按照streamId-startOffset排序    List<StreamDataBlock> sortedStreamDataBlocks = CompactionUtils.sortStreamRangePositions(streamDataBlockMap);    List<CompactedObjectBuilder> compactedObjectBuilders = new ArrayList<>();    CompactionStats stats = null;int streamNumInStreamSet = -1;int streamObjectNum = -1;do {final Set<Long> objectsToRemove = new HashSet<>();if (stats != null) {// 为了满足循环条件,剔除对象不参与本次compactionif (streamObjectNum > maxStreamObjectNum) {                addObjectsToRemove(CompactionType.SPLIT, compactedObjectBuilders, stats, objectsToRemove);            } else {                addObjectsToRemove(CompactionType.COMPACT, compactedObjectBuilders, stats, objectsToRemove);            }        }if (!objectsToRemove.isEmpty()) {            excludedObjectIds.addAll(objectsToRemove);        }        sortedStreamDataBlocks.removeIf(e -> objectsToRemove.contains(e.getObjectId()));        objectsToRemove.forEach(streamDataBlockMap::remove);// 压缩算法        compactedObjectBuilders = compactObjects(sortedStreamDataBlocks);// 计算SSO中stream数量和SO数量        stats = CompactionStats.of(compactedObjectBuilders);        streamNumInStreamSet = stats.getStreamRecord().streamNumInStreamSet();        streamObjectNum = stats.getStreamRecord().streamObjectNum();    }// SSO包含<=2w个stream & SO<=1w个while (streamNumInStreamSet > maxStreamNumInStreamSet || streamObjectNum > maxStreamObjectNum);return compactedObjectBuilders;}

2)CompactionManager.executeCompactionPlans:循环Plan读写S3,构造CommitStreamSetObjectRequest

voidexecuteCompactionPlans(CommitStreamSetObjectRequest request,                             List<CompactionPlan> compactionPlans,                            List<S3ObjectMetadata> s3ObjectMetadata)throws CompletionException {for (int i = 0; i < compactionPlans.size(); i++) {// S3读写...// SO        streamObjectCfList.stream().map(CompletableFuture::join)              .forEach(request::addStreamObject);    }// SSO    objectStreamRanges.forEach(request::addStreamRange);    request.setObjectId(uploader.getStreamSetObjectId());    request.setOrderId(s3ObjectMetadata.get(0).objectId());    request.setObjectSize(uploader.complete());    request.setAttributes(ObjectAttributes.builder().bucket(uploader.bucketId()).build().attributes());}

5-3、StreamObject Compaction

S3StreamClient.StreamWrapper.compactV1:针对每个打开的Stream处理,分为三种类型。无论那种类型

1)根据streamId获取所有SO;

2)cleanupExpiredObject:清理小于startOffset的SO;(如log.retention导致segment删除)

3)group0:根据S3对象元数据,按照大小分组;

4)compact:CompactByPhysicalMerge/CompactByCompositeObject 处理 Compaction;

5)objectManager.compactStreamObject:发送CommitStreamObjectRequest,Controller持久化元数据;

privatevoidcompactV1(CompactionHint hint, long now){if (now - lastMajorV1CompactionTimestamp > MAJOR_V1_COMPACTION_INTERVAL             || hint.objectsCount >= MAJOR_V1_COMPACTION_MAX_OBJECT_THRESHOLD) {// 60分钟 or 40w object        compact(MAJOR_V1, hint);        lastMajorV1CompactionTimestamp = System.currentTimeMillis();    } elseif (now - lastMinorV1CompactionTimestamp > MINOR_V1_COMPACTION_INTERVAL) {// 10分钟        compact(MINOR_V1, hint);        lastMinorV1CompactionTimestamp = System.currentTimeMillis();    } else {// 1分钟(定时调度间隔)        compact(CLEANUP_V1, hint);    }}// compact主流程voidcompact0(CompactionType compactionType){long streamId = stream.streamId();long startOffset = stream.startOffset();// 获取stream下所有SO    List<S3ObjectMetadata> objects = objectManager.getStreamObjects(stream.streamId(), 0L, stream.confirmOffset(), Integer.MAX_VALUE).get();    List<S3ObjectMetadata> expiredObjects = new ArrayList<>(objects.size());    List<S3ObjectMetadata> livingObjects = new ArrayList<>(objects.size());for (S3ObjectMetadata object : objects) {if (object.endOffset() <= startOffset) {            expiredObjects.add(object);        } else {            livingObjects.add(object);        }    }// 清理小于startOffset的SO,因为retention,有些数据已经删除    cleanupExpiredObject(expiredObjects);// 根据类型 将 S3对象分组    List<List<S3ObjectMetadata>> objectGroups = group0(livingObjects,        getMaxGroupSize(compactionType),        getObjectFilter(compactionType, majorV1CompactionSkipSmallObject ? minorV1CompactionThreshold : 0));for (List<S3ObjectMetadata> objectGroup : objectGroups) {// 部分场景的group不需要处理if (!checkObjectGroupCouldBeCompact(objectGroup, startOffset, compactionType)) {continue;        }long objectId = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(60)).get();        Optional<CompactStreamObjectRequest> requestOpt;if (MINOR_V1.equals(compactionType)) {// 小合并 内存 - NormalObject            requestOpt = new CompactByPhysicalMerge(...).compact();        } else {// 大合并 索引 - CompositeObject            requestOpt = new CompactByCompositeObject(streamId, stream.streamEpoch(), startOffset, objectGroup,                objectId, objectStorage).compact();        }        request = requestOpt.get();        objectManager.compactStreamObject(request).get();    }}

5-3-1、Major Compaction

StreamObjectCompactor.CompactByCompositeObject.compact:

Major Compaction,处理 4MB以上Nornal对象 和 Composite对象,合并不超过10G。

如果集群中超过40W个对象,也会处理4MB以下Normal对象。一个分区3个Stream,最少3个SO,对应3个对象,即集群规模到达10W分区。

Nornal对象,即 正常写流程 和 SSO Compaction 生成的对象,包括Blocks、Indexes、Footer三个区域,其中每个Block包含了n个StreamRecordBatch(真实数据)。

Composite对象,由Major Compaction生成,其实是2级索引,不包含StreamRecordBatch真实数据。

1)Blocks区域:只包含n个LinkBlock,指向Normal对象;

2)Indexes区域:保留最原始的Normal对象的索引数据;

3)Footer区域:魔数与Normal对象区分;

老Composite对象会被合并到新Composite对象中,不会存在Composite指向Composite的情况。

5-3-2、Minor Compaction

StreamObjectCompactor.CompactByPhysicalMerge.compact:

Minor Compaction 处理 4MB以下 对象,合并也不超过4MB,需要在内存中处理合并为大对象,删除老对象。

5-3-3、Cleanup Compaction

Cleanup是为了处理Composite Object指向的 Normal Object因为已经被过期删除,需要重新压缩Composite Object,Compaction 逻辑 和 Major Compaction一致。

StreamObjectCompactor.checkObjectGroupCouldBeCompact:只是Cleanup要求分组中首个对象必须是Composite对象,且指向的对象有512MB数据已经被删除。

staticbooleancheckObjectGroupCouldBeCompact(List<S3ObjectMetadata> objectGroup, long startOffset,                                              CompactionType compactionType){if (objectGroup.size() == 1 && // Minor和Major,如果按照大小分区,只有一个对象,不做compaction        SKIP_COMPACTION_TYPE_WHEN_ONE_OBJECT_IN_GROUP.contains(compactionType)) {returnfalse;    }if (CLEANUP_V1.equals(compactionType)) {// 首个对象是composite        S3ObjectMetadata metadata = objectGroup.get(0);if (ObjectAttributes.from(metadata.attributes()).type() != Composite) {returnfalse;        }// 估算dirtySize = 删除range / 总range * 对象总大小 > 512MBdouble dirtySize = ((double) startOffset - metadata.startOffset())          / (metadata.endOffset() - metadata.startOffset())           * metadata.objectSize();return dirtySize > MAX_DIRTY_BYTES;    }returntrue;}

总结

AutoMQ中:

1)Stream:一个分区Log对应4个Stream,分别是:元数据数据log时间索引事务索引

2)StreamSlice:一个Segment里的一个文件对应一个StreamSlice,只有最新的Slice可以写入(sealed=false),比如最新的Segment有3个StreamSlice可以写入,分别对应数据log、时间索引、事务索引;

写路径

1)Producer发送消息,Broker写WAL(S3),推进HW和confirmOffset,响应Producer,缓存WAL到LogCache以备热读;

2)LogCache满或定时60s,将WAL转换为1个StreamSetObject(SSO)+n个StreamObject(SO),上传S3;

3)定时执行SSO Compaction:循环当前节点的SSO

  • 老SSO:120min以前,拆分为stream独立SO;
  • 新SSO:大于8MB的stream独立SO,其余压缩一个SSO;

4)定时执行SO Compaction:循环开启的Stream处理

  • Minor:小于4MB的SO,内存合并,生成大SO;
  • Major:大于4MB的SO,生成Composite格式Object(二级索引)指向原始Object;

读路径

1)LogCache:WAL缓存,热数据,包括最近收到的消息;

2)BlockCache:包括索引缓存和数据缓存,从S3加载得到;

3)cache miss:合并读请求,range read S3;

基本 文件 流程 错误 SQL 调试
  1. 请求信息 : 2026-06-17 00:15:04 HTTP/1.1 GET : https://www.yeyulingfeng.com/a/754855.html
  2. 运行时间 : 0.228453s [ 吞吐率:4.38req/s ] 内存消耗:4,952.86kb 文件加载:145
  3. 缓存信息 : 0 reads,0 writes
  4. 会话信息 : SESSION_ID=1d959505f632050c232384463124394e
  1. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/public/index.php ( 0.79 KB )
  2. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/autoload.php ( 0.17 KB )
  3. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/autoload_real.php ( 2.49 KB )
  4. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/platform_check.php ( 0.90 KB )
  5. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/ClassLoader.php ( 14.03 KB )
  6. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/autoload_static.php ( 6.05 KB )
  7. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/helper.php ( 8.34 KB )
  8. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-validate/src/helper.php ( 2.19 KB )
  9. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/ralouphie/getallheaders/src/getallheaders.php ( 1.60 KB )
  10. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/helper.php ( 1.47 KB )
  11. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/stubs/load_stubs.php ( 0.16 KB )
  12. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Exception.php ( 1.69 KB )
  13. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-container/src/Facade.php ( 2.71 KB )
  14. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/deprecation-contracts/function.php ( 0.99 KB )
  15. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/polyfill-mbstring/bootstrap.php ( 8.26 KB )
  16. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/polyfill-mbstring/bootstrap80.php ( 9.78 KB )
  17. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/var-dumper/Resources/functions/dump.php ( 1.49 KB )
  18. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-dumper/src/helper.php ( 0.18 KB )
  19. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/var-dumper/VarDumper.php ( 4.30 KB )
  20. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/guzzlehttp/guzzle/src/functions_include.php ( 0.16 KB )
  21. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/guzzlehttp/guzzle/src/functions.php ( 5.54 KB )
  22. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/App.php ( 15.30 KB )
  23. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-container/src/Container.php ( 15.76 KB )
  24. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/container/src/ContainerInterface.php ( 1.02 KB )
  25. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/provider.php ( 0.19 KB )
  26. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Http.php ( 6.04 KB )
  27. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/helper/Str.php ( 7.29 KB )
  28. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Env.php ( 4.68 KB )
  29. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/common.php ( 0.03 KB )
  30. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/helper.php ( 18.78 KB )
  31. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Config.php ( 5.54 KB )
  32. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/alipay.php ( 3.59 KB )
  33. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/facade/Env.php ( 1.67 KB )
  34. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/app.php ( 0.95 KB )
  35. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/cache.php ( 0.78 KB )
  36. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/console.php ( 0.23 KB )
  37. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/cookie.php ( 0.56 KB )
  38. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/database.php ( 2.48 KB )
  39. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/filesystem.php ( 0.61 KB )
  40. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/lang.php ( 0.91 KB )
  41. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/log.php ( 1.35 KB )
  42. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/middleware.php ( 0.19 KB )
  43. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/route.php ( 1.89 KB )
  44. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/session.php ( 0.57 KB )
  45. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/trace.php ( 0.34 KB )
  46. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/view.php ( 0.82 KB )
  47. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/event.php ( 0.25 KB )
  48. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Event.php ( 7.67 KB )
  49. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/service.php ( 0.13 KB )
  50. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/AppService.php ( 0.26 KB )
  51. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Service.php ( 1.64 KB )
  52. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Lang.php ( 7.35 KB )
  53. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/lang/zh-cn.php ( 13.70 KB )
  54. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/initializer/Error.php ( 3.31 KB )
  55. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/initializer/RegisterService.php ( 1.33 KB )
  56. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/services.php ( 0.14 KB )
  57. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/service/PaginatorService.php ( 1.52 KB )
  58. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/service/ValidateService.php ( 0.99 KB )
  59. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/service/ModelService.php ( 2.04 KB )
  60. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-trace/src/Service.php ( 0.77 KB )
  61. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Middleware.php ( 6.72 KB )
  62. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/initializer/BootService.php ( 0.77 KB )
  63. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/Paginator.php ( 11.86 KB )
  64. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-validate/src/Validate.php ( 63.20 KB )
  65. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/Model.php ( 23.55 KB )
  66. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/Attribute.php ( 21.05 KB )
  67. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/AutoWriteData.php ( 4.21 KB )
  68. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/Conversion.php ( 6.44 KB )
  69. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/DbConnect.php ( 5.16 KB )
  70. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/ModelEvent.php ( 2.33 KB )
  71. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/RelationShip.php ( 28.29 KB )
  72. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/contract/Arrayable.php ( 0.09 KB )
  73. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/contract/Jsonable.php ( 0.13 KB )
  74. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/contract/Modelable.php ( 0.09 KB )
  75. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Db.php ( 2.88 KB )
  76. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/DbManager.php ( 8.52 KB )
  77. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Log.php ( 6.28 KB )
  78. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Manager.php ( 3.92 KB )
  79. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/log/src/LoggerTrait.php ( 2.69 KB )
  80. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/log/src/LoggerInterface.php ( 2.71 KB )
  81. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Cache.php ( 4.92 KB )
  82. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/simple-cache/src/CacheInterface.php ( 4.71 KB )
  83. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/helper/Arr.php ( 16.63 KB )
  84. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/cache/driver/File.php ( 7.84 KB )
  85. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/cache/Driver.php ( 9.03 KB )
  86. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/CacheHandlerInterface.php ( 1.99 KB )
  87. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/Request.php ( 0.09 KB )
  88. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Request.php ( 55.78 KB )
  89. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/middleware.php ( 0.25 KB )
  90. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Pipeline.php ( 2.61 KB )
  91. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-trace/src/TraceDebug.php ( 3.40 KB )
  92. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/middleware/SessionInit.php ( 1.94 KB )
  93. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Session.php ( 1.80 KB )
  94. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/session/driver/File.php ( 6.27 KB )
  95. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/SessionHandlerInterface.php ( 0.87 KB )
  96. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/session/Store.php ( 7.12 KB )
  97. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Route.php ( 23.73 KB )
  98. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/RuleName.php ( 5.75 KB )
  99. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/Domain.php ( 2.53 KB )
  100. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/RuleGroup.php ( 22.43 KB )
  101. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/Rule.php ( 26.95 KB )
  102. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/RuleItem.php ( 9.78 KB )
  103. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/route/app.php ( 3.94 KB )
  104. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/facade/Route.php ( 4.70 KB )
  105. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/dispatch/Controller.php ( 4.74 KB )
  106. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/Dispatch.php ( 10.44 KB )
  107. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/controller/Index.php ( 9.87 KB )
  108. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/BaseController.php ( 2.05 KB )
  109. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/facade/Db.php ( 0.93 KB )
  110. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/connector/Mysql.php ( 5.44 KB )
  111. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/PDOConnection.php ( 52.47 KB )
  112. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/Connection.php ( 8.39 KB )
  113. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/ConnectionInterface.php ( 4.57 KB )
  114. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/builder/Mysql.php ( 16.58 KB )
  115. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/Builder.php ( 24.06 KB )
  116. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/BaseBuilder.php ( 27.50 KB )
  117. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/Query.php ( 15.71 KB )
  118. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/BaseQuery.php ( 45.13 KB )
  119. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/TimeFieldQuery.php ( 7.43 KB )
  120. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/AggregateQuery.php ( 3.26 KB )
  121. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/ModelRelationQuery.php ( 20.07 KB )
  122. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/ParamsBind.php ( 3.66 KB )
  123. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/ResultOperation.php ( 7.01 KB )
  124. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/WhereQuery.php ( 19.37 KB )
  125. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/JoinAndViewQuery.php ( 7.11 KB )
  126. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php ( 2.63 KB )
  127. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/Transaction.php ( 2.77 KB )
  128. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/log/driver/File.php ( 5.96 KB )
  129. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/LogHandlerInterface.php ( 0.86 KB )
  130. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/log/Channel.php ( 3.89 KB )
  131. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/event/LogRecord.php ( 1.02 KB )
  132. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/Collection.php ( 16.47 KB )
  133. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/facade/View.php ( 1.70 KB )
  134. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/View.php ( 4.39 KB )
  135. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/controller/Es.php ( 3.30 KB )
  136. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Response.php ( 8.81 KB )
  137. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/response/View.php ( 3.29 KB )
  138. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Cookie.php ( 6.06 KB )
  139. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-view/src/Think.php ( 8.38 KB )
  140. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/TemplateHandlerInterface.php ( 1.60 KB )
  141. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-template/src/Template.php ( 46.61 KB )
  142. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-template/src/template/driver/File.php ( 2.41 KB )
  143. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-template/src/template/contract/DriverInterface.php ( 0.86 KB )
  144. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/runtime/temp/c935550e3e8a3a4c27dd94e439343fdf.php ( 31.50 KB )
  145. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-trace/src/Html.php ( 4.42 KB )
  1. CONNECT:[ UseTime:0.001052s ] mysql:host=127.0.0.1;port=3306;dbname=wenku;charset=utf8mb4
  2. SHOW FULL COLUMNS FROM `fenlei` [ RunTime:0.001627s ]
  3. SELECT * FROM `fenlei` WHERE `fid` = 0 [ RunTime:0.000688s ]
  4. SELECT * FROM `fenlei` WHERE `fid` = 63 [ RunTime:0.000638s ]
  5. SHOW FULL COLUMNS FROM `set` [ RunTime:0.001319s ]
  6. SELECT * FROM `set` [ RunTime:0.000494s ]
  7. SHOW FULL COLUMNS FROM `article` [ RunTime:0.001619s ]
  8. SELECT * FROM `article` WHERE `id` = 754855 LIMIT 1 [ RunTime:0.001857s ]
  9. UPDATE `article` SET `lasttime` = 1781626504 WHERE `id` = 754855 [ RunTime:0.009328s ]
  10. SELECT * FROM `fenlei` WHERE `id` = 64 LIMIT 1 [ RunTime:0.000653s ]
  11. SELECT * FROM `article` WHERE `id` < 754855 ORDER BY `id` DESC LIMIT 1 [ RunTime:0.001161s ]
  12. SELECT * FROM `article` WHERE `id` > 754855 ORDER BY `id` ASC LIMIT 1 [ RunTime:0.001060s ]
  13. SELECT * FROM `article` WHERE `id` < 754855 ORDER BY `id` DESC LIMIT 10 [ RunTime:0.002108s ]
  14. SELECT * FROM `article` WHERE `id` < 754855 ORDER BY `id` DESC LIMIT 10,10 [ RunTime:0.013836s ]
  15. SELECT * FROM `article` WHERE `id` < 754855 ORDER BY `id` DESC LIMIT 20,10 [ RunTime:0.021771s ]
0.230265s