问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

图解Kafka的RecordBatch结构

发布网友 发布时间:2024-10-01 23:50

我来回答

1个回答

热心网友 时间:2024-10-19 21:30

?《Kafka运维管控平台LogiKM》? ??更强大的管控能力?? ??更高效的问题定位能力? ? ?更便捷的集群运维能力? ? ??更专业的资源治理??更友好的运维生态? ? ?

@[TOC]

阅读完本文你大概会获得以下知识

什么时候执行消息的压缩操作

RecordBatch结构图

RecordBatch

我们之前有讲过生产者的ProducerBatch, 这个RecordBatch跟ProducerBatch的区别是什么呢?

RecordBatch是在ProducerBatch里面的一个专门存放消息的对象, 除此之外ProducerBatch还有其他相关属性,例如还有重试、回调等等相关属性。

RecordBatch初始化

在创建一个需要创建一个新的ProducerBatch的时候,同时需要构建一个MemoryRecordsBuilder, 这个对象我们可以理解为消息构造器,所有的消息相关都存放到这个里面。

public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,byte magic,CompressionType compressionType,TimestampType timestampType,long baseOffset,long logAppendTime,long producerId,short producerEpoch,int baseSequence,boolean isTransactional,boolean isControlBatch,int partitionLeaderEpoch,int writeLimit) {// 省略部分....this.magic = magic;this.timestampType = timestampType;this.compressionType = compressionType;this.baseOffset = baseOffset;this.logAppendTime = logAppendTime;this.numRecords = 0;this.uncompressedRecordsSizeInBytes = 0;this.actualCompressionRatio = 1;this.maxTimestamp = RecordBatch.NO_TIMESTAMP;this.producerId = producerId;this.producerEpoch = producerEpoch;this.baseSequence = baseSequence;this.isTransactional = isTransactional;this.isControlBatch = isControlBatch;this.partitionLeaderEpoch = partitionLeaderEpoch;this.writeLimit = writeLimit;this.initialPosition = bufferStream.position();this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType);// Buffer一开始就需要预留61B的位置用于 存放消息投 RecordHeaderbufferStream.position(initialPosition + batchHeaderSizeInBytes);this.bufferStream = bufferStream;//选择合适的压缩器实现类this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));}

上面的源码可知重点:

bufferStream 一开始的时候就需要预留 61B 的位置给 消息头使用,也就是RecordHeader。batchHeaderSizeInBytes = 61

根据配置的压缩类型compression.type,选择对应的压缩输出流。例如假设使用lz4压缩类型,返回的输出流实体对象为KafkaLZ4BlockOutputStream , 这里面有写入消息的方法和压缩方法。

写入消息

创建了Batch之后,自然需要写入消息

源码位置:

private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException {ensureOpenForRecordAppend();// 位移偏移量 ;offset 是当前lastOffset+1, 如果是最开始的时候,它是0; baseOffset 默认是0int offsetDelta = (int) (offset - baseOffset);long timestampDelta = timestamp - firstTimestamp;//将数据 写到appendStream中。int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);// 记录一下 写入了多少数据recordWritten(offset, timestamp, sizeInBytes);}

offsetDelta:表示该条消息的相对整个RecordBatch的位移偏移量, 计算逻辑是(offset ?- baseOffset); ? ? ? 使用偏移量可以节省字节数 offset 值等于当前RecordBatch的最后一个offset+1,计算逻辑是(offset = lastOffset == null ? baseOffset : lastOffset + 1;)baseOffset 值是RecordBatch的起始偏移量,一般值为0 ;

timestampDelta : 表示该条消息的相对整个RecordBatch的时间戳的偏移量,计算逻辑(timestamp - firstTimestamp) ,使用偏移量可以节省字节数 timestamp 值逻辑timestamp = record.timestamp() == null ? nowMs : record.timestamp() ,意思是这个值也是可以通过设置record属性来设置的。 ?firstTimestamp 值就是timestamp第一次的值。

得到了上面的基础值之后, 就将消息写入到Buffer中, 这里的写入涉及到变长字段Varints,一定程度节省空间。这里写入write()的时候,底层执行的是根据你选择的压缩类型决定使用哪个实现类,例如KafkaLZ4BlockOutputStream。 具体的Record的格式请看下面的 Record格式

注意: 这里写入消息的时候,第一条消息,是从第62位写入的,因为前面的61B已经被BatchHeader先预定了(初始化的时候)。

Record结构图

要了解消息的格式,我们先看看消息是怎么写入的

DefaultRecord#writeTo

public static int writeTo(DataOutputStream out,int offsetDelta,long timestampDelta,ByteBuffer key,ByteBuffer value,Header[] headers) throws IOException {int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);ByteUtils.writeVarint(sizeInBytes, out);byte attributes = 0; // there are no used record attributes at the momentout.write(attributes);ByteUtils.writeVarlong(timestampDelta, out);ByteUtils.writeVarint(offsetDelta, out);if (key == null) {ByteUtils.writeVarint(-1, out);} else {int keySize = key.remaining();ByteUtils.writeVarint(keySize, out);Utils.writeTo(out, key, keySize);}if (value == null) {ByteUtils.writeVarint(-1, out);} else {int valueSize = value.remaining();ByteUtils.writeVarint(valueSize, out);Utils.writeTo(out, value, valueSize);}if (headers == null)throw new IllegalArgumentException("Headers cannot be null");ByteUtils.writeVarint(headers.length, out);for (Header header : headers) {String headerKey = header.key();if (headerKey == null)throw new IllegalArgumentException("Invalid null header key found in headers");byte[] utf8Bytes = Utils.utf8(headerKey);ByteUtils.writeVarint(utf8Bytes.length, out);out.write(utf8Bytes);byte[] headerValue = header.value();if (headerValue == null) {ByteUtils.writeVarint(-1, out);} else {ByteUtils.writeVarint(headerValue.length, out);out.write(headerValue);}}return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;}

从源码可以得知消息格式为:

Record属性解释:

length:整个Record的消息总大小, 使用可变字段。

attributes:已经弃用,默认为0,固定占用了1B

timestampDelta: 时间戳的增量,使用可变字段。使用增量可以有效节约内存

offsetDelta: 位移的增量,使用可变字段, 使用增量可以有效节约内存

keyLength: key的长度,使用可变字段, 如果没有key,该值为-1。

key: ?key的信息,正常存储。如果key==null,则该值不存在。

valueLength:value的长度,使用可变字段, 如果没有key,改值为-1.

value: value的信息,正常存储,如果value==null,则该值也不存在。

headers:消息头,这个字段用于支持应用级别的扩展,可以携带很多信息,例如你带一个TraceId也不过分。

header counts : 消息头的数量,使用可变字段

Varints 是可变长自动,可以有效的节省空间

Header属性解释:

类似,就不再赘述了。

关闭ProducerBatch

当一个ProducerBatch即将发送出去的时候(ReadyBatch), 会先将Batch关闭掉batch.close()。

关闭输出流appendStream并压缩数据

在这个过程中,也会将appendStream关闭掉, 也就是用于存储消息体的输出流,那么在它调用 out.flush()的时候就会调用对应的实现类流,比如我们的压缩类型是lz4, 那么这里实现类就是 KafkaLZ4BlockOutputStream

?MemoryRecordsBuilder#closeForRecordAppends?KafkaLZ4BlockOutputStream#flush

public void flush() throws IOException {if (!finished) {writeBlock();}if (out != null) {out.flush();}}

什么时候执行压缩操作 其中的 writeBlock()就是在执行压缩操作, 所以你应该知道, 这个时候压缩了Records。并且只是Records。

填充RecordBatchHeader数据

上面我们已经给Records消息集压缩过了, 还记得我们在写入消息的时候是从 position 61 后面开始写的吗?

这个61B的空间是用来干嘛的呢?

?MemoryRecordsBuilder#writeDefaultBatchHeader

private int writeDefaultBatchHeader() {ensureOpenForRecordBatchWrite();ByteBuffer buffer = bufferStream.buffer();//当前buffer的位置int pos = buffer.position();//将位置移动到初始位置0buffer.position(initialPosition);// 大小int size = pos - initialPosition;//已压缩的大小int writtenCompressed = size - DefaultRecordBatch.RECORD_BATCH_OVERHEAD;// 偏移量增量 int offsetDelta = (int) (lastOffset - baseOffset);final long maxTimestamp;if (timestampType == TimestampType.LOG_APPEND_TIME)maxTimestamp = logAppendTime;elsemaxTimestamp = this.maxTimestamp;//讲RecordBatch 消息头写入bufferDefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType,firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch,partitionLeaderEpoch, numRecords);//重新定位buffer.position(pos);return writtenCompressed;}

真正写入数据的地方的

DefaultRecordBatch#writeHeader

static void writeHeader(ByteBuffer buffer,long baseOffset,int lastOffsetDelta,int sizeInBytes,byte magic,CompressionType compressionType,TimestampType timestampType,long firstTimestamp,long maxTimestamp,long producerId,short epoch,int sequence,boolean isTransactional,boolean isControlBatch,int partitionLeaderEpoch,int numRecords) {if (magic < RecordBatch.CURRENT_MAGIC_VALUE)throw new IllegalArgumentException("Invalid magic value " + magic);if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP)throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp);short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch);int position = buffer.position();buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset);buffer.putInt(position + LENGTH_OFFSET, sizeInBytes - LOG_OVERHEAD);buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch);buffer.put(position + MAGIC_OFFSET, magic);buffer.putShort(position + ATTRIBUTES_OFFSET, attributes);buffer.putLong(position + FIRST_TIMESTAMP_OFFSET, firstTimestamp);buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp);buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta);buffer.putLong(position + PRODUCER_ID_OFFSET, producerId);buffer.putShort(position + PRODUCER_EPOCH_OFFSET, epoch);buffer.putInt(position + BASE_SEQUENCE_OFFSET, sequence);buffer.putInt(position + RECORDS_COUNT_OFFSET, numRecords);long crc = Crc32C.compute(buffer, ATTRIBUTES_OFFSET, sizeInBytes - ATTRIBUTES_OFFSET);buffer.putInt(position + CRC_OFFSET, (int) crc);buffer.position(position + RECORD_BATCH_OVERHEAD);}

可以看到CRC的计算,是在最后面的时候计算,然后填充到buffer里面的,但是这个并不意味着crc32是放在最后一个, CRC_OFFSET的位置是17的位置。

RecordBatchHeader结构图

RecordBatchHeader属性解释:

baseOffset: ?当然RecordBatch的起始位移,一般默认为0

length:计算从partition leader epoch 字段开始到整体末尾的长度,计算的逻辑是(sizeInBytes - LOG_OVERHEAD), 这个sizeInBytes就是整个RecordBatch的长度。LOG_OVERHEAD = 12

partition leader epoch: 分区的Leader纪元,也就是版本号

magic: 消息格式版本号, V2版本 该值为2

crc32: ?该RecordBatch的校验值, 计算该值是从attributes的位置开始计算的。

attributes:消息的属性,这里用了2个字节, 低3位表示压缩格式,第4位表示时间戳,第5位表示事务标识,第6位表示是否控制消息。如下图

last offset delta : RecordBatch中最后一个Record的offset与first offset的差值。

first timestamp: 第一条Record的时间戳。对于Record的时间戳的值 ,如果在构造待发送的ProducerRecord的时候设置了timestamp,那么就是这个设置的值,如果没有设置那就是当前时间戳的值。

max timestamp: RecordBatch中最大时间戳。

producer id : ?用于支持幂等和事务的属性。

producer epoch :用于支持幂等和事务的属性。

base sequence :用于支持幂等和事务的属性。

record count : 消息数量

RecordBatch整体结构图

在创建RecordBatch的时候,会先预留61B的位置给BatchHeader, 实现方式就是让buffer的位置移动到61位buffer.possition(61)

消息写入的时候并不会压缩,只有等到即将发送这个Batch的时候,会关闭Batch,从而进行压缩(如果配置了压缩策略的话), 压缩的知识Records, 不包含 RecordBatchHeader

填充RecordBatchHeader

原文:https://juejin.cn/post/7121939963981398023
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
什么是Medical Literature Analysis and Retrieval System (NLM)的缩 ... 医学工作者的因特网内容简介 2002年的一篇文章英文摘要被《医学文献联机数据库》(MEDLINE)收录,是否... 医学器具medline啥意思 民国市与县的关系是怎样的,市与县分别又是什么? 历代兵制中国民国兵制 冰箱运行时有水流声怎么回事? ud牛郎眼影真假判断 爆闪/没有平替❌ 牛郎眼影真假鉴别方法 鉴别❗️衰败城市牛郎眼影真假,粉质肉眼差距 ...查询里的排名是单独职位排名还是总体报考人数排名? 麒麟9000s是什么时候生产的? 王者荣耀积分夺宝幸运值满分详解_王者荣耀积分夺宝幸运值多少满2022 ...想问下杭州中西医结合医院治疗的声带息肉怎么样呢? TCL L32E4500A-3D可以使用多大的USB硬盘,硬盘适用windowsXP等可以使用... l32e4500a怎么安装电视猫 ...换台时声音小,过一会儿音量逐渐正常是什么原因?型号:L32E4500A... l32e4500a怎么复位 女孩子把男孩子当闺蜜是什么意思? 女生对男生说希望做单纯的友谊关系是什么意思 如何正确进行无土栽培种植 为什么青青生态农业研究所能够成为葡萄种植业的标杆呢? 齐白石,其实是个逗逼老爷子! ...我在云南旅游买了一对翡翠玉手镯,是A吗?价格值吗?我感觉买贵了,能... 篮球怎么造句 打的好怎么造句 2017年准生证如何办理 这种一张图弄成多图是怎么做到的啊? 云系统重装好还是下载镜像重装好? 水果成熟的季节各类水果成熟季节是什么时候 劳动仲裁笔迹鉴定检材的提供是劳动者吗 劳动仲裁笔迹鉴定程序主要是什么 为何qq空间只能看一个qq的访客,关联号怎么不可以 酷派7269手机能不能用移动4g卡 轮毂修复有必要吗 3g手机酷派大神f1用4g卡怎么开起数据用不起啊 轮毂变形修复后安全吗? 我的手机是3G酷派,我买了个4G卡,想图片这样,我的3G闲时流量还能用... 酷派5217可以用电信4G卡吗?我知道3G手机不能用4G网络,但它能用3G和正 ... 酷派ivvi三网全通手机可以用电信4G卡吗 为什么酷派8720l 4g手机卡插在3g手机里后再插回4g手机没反应 xterm中文设置 解决Linux终端NotDisplaying问题Linux终端不显示 利润年末结转什么科目 年末结转净利润的会计分录如何做 年末结转本年净利润的会计分录_百度知 ... ...病了现在吃的药是齐多夫定片.拉米夫定片.依非韦伦片.复方新诺明请问... 小米移动电源2冲不进电怎么办? 小米移动电源好久没用怎么充不进电 蒜头鼻怎么办?教小弟一个简单的方法~ 没有保险开解除劳动合同证明书范文是怎么样的?