本文共 8324 字,大约阅读时间需要 27 分钟。
Log是对多个LogSegemnt对象的顺序组合,形成一个逻辑的日志。为了快速定位LogSegment,Log使用调表对LogSegment进行管理。
向Log中追加信息数顺序写入的,那么只有最后一个LogSegment能够进行写入操作,之前所有的LogSegment都不能写入数据。最后一个LogSegment使用activeSegement()获取,下面是Log的一些基本字段。class Log(val dir: File,//Log对应的磁盘目录,存放每个LogSegment对应的日志文件和索引文件。 @volatile var config: LogConfig,//Log相关的配置信息 @volatile var recoveryPoint: Long = 0L,//指定恢复操作的其实offset,recoveryPoint之前的Message已经刷到了磁盘上持久存储,后面的消息不一定,出现宕机时有可能会消失,所以只需要恢复recoveryPoint之前的消息就可以了。 scheduler: Scheduler, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { import kafka.log.Log._ /* A lock that guards all modifications to the log */ private val lock = new Object /* last time it was flushed */ private val lastflushedTime = new AtomicLong(time.milliseconds) def initFileSize() : Int = { if (config.preallocate) config.segmentSize else 0 } /* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] loadSegments() /* 用来分配消息的offset,同时也是副本的LEO。它的messageOffset记录了Log最后一个offset值,segmentBaseOffset字段记录了activeSegment的baseOffset,relativePositionInSegment记录了activeSegment的大小 */ @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)}
向Log追加消息的功能由append()完成,把ProducerRequest解析成ByteBufferMessageSet。
def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = { //检测消息长度和校验码,得到LogAppendInfo对象如果为直接返回 val appendInfo = analyzeAndValidateMessageSet(messages) // if we have any valid messages, append them to the log if (appendInfo.shallowCount == 0) return appendInfo // 截断未通过校验的字段。 var validMessages = trimInvalidBytes(messages, appendInfo) try { // 插入Log中 lock synchronized { // 判断是否需要分配offset,默认需要 if (assignOffsets) { //获取nextOffsetMetadata记录的messageOffset字段,从此值后开发分配offset。 val offset = new LongRef(nextOffsetMetadata.messageOffset) //firstOffset记录第一条分配的offset,受压缩信息的影响 appendInfo.firstOffset = offset.value val now = time.milliseconds val (validatedMessages, messageSizesMaybeChanged) = try { //进一步验证并分配offset validMessages.validateMessagesAndAssignOffsets(offset, now, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact, config.messageFormatVersion.messageFormatVersion, config.messageTimestampType, config.messageTimestampDifferenceMaxMs) } catch { case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) } validMessages = validatedMessages //最后一条消息的offset,不受压缩影响 appendInfo.lastOffset = offset.value - 1 //修改时间戳 if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) appendInfo.timestamp = now // 如果在validateMessagesAndAssignOffsets修改了长度,就需要重新检查。 if (messageSizesMaybeChanged) { for (messageAndOffset <- validMessages.shallowIterator) { if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) { // we record the original message set size instead of the trimmed size // to be consistent with pre-compression bytesRejectedRate recording BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize)) } } } } else { // we are taking the offsets we are given if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset) throw new IllegalArgumentException("Out of order offsets found in " + messages) } // ByteBufferMessageSet大小是否大于LogConfig if (validMessages.sizeInBytes > config.segmentSize) { throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d." .format(validMessages.sizeInBytes, config.segmentSize)) } // 获取activeSegment,可能会之前是activeSegment慢了重新分配 val segment = maybeRoll(validMessages.sizeInBytes) // 追加消息 segment.append(appendInfo.firstOffset, validMessages) // 更新LEO updateLogEndOffset(appendInfo.lastOffset + 1) trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s" .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages)) //查看未刷新到磁盘的数据是否到一定阈值。 if (unflushedMessages >= config.flushInterval) flush() appendInfo } } catch { case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e) } }
analyzeAndValidateMessageSet验证消息长度、校验码、每部offset是否单调递增,并不会解压缩消息:
private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = { var shallowMessageCount = 0 //记录外层消息的数量 var validBytesCount = 0 //通过验证的Message字节数之和。 var firstOffset, lastOffset = -1L//第一条消息和最后一条消息的offset var sourceCodec: CompressionCodec = NoCompressionCodec var monotonic = true//标志生产者为消息分配的内部offset是否单调递增。 for(messageAndOffset <- messages.shallowIterator) { // 记录第一条消息的offset,此时的offset还是生产者分配的offset if(firstOffset < 0) firstOffset = messageAndOffset.offset // 判断内部offset首付递增 if(lastOffset >= messageAndOffset.offset) monotonic = false // 记录最后一条消息的offset lastOffset = messageAndOffset.offset val m = messageAndOffset.message // 检测消息长度。 val messageSize = MessageSet.entrySize(m) if(messageSize > config.maxMessageSize) { BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." .format(messageSize, config.maxMessageSize)) } // CRC32校验码确认 m.ensureValid() //通过检测的外层消息数量 shallowMessageCount += 1 //通过检测的字节数量 validBytesCount += messageSize val messageCodec = m.compressionCodec if(messageCodec != NoCompressionCodec) sourceCodec = messageCodec } // 记录服务端的压缩方式 val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) LogAppendInfo(firstOffset, lastOffset, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)}
maybeRoll方法检测是否满足activeSegment创建新的条件。
1. 当前大小加上即将要追加消息的大小超过LogSegment最大长度。 2. activeSegment的寿命超过了最长存活时间 3. 索引文件满了。 创建新的activeSegment会创建新的.log文件,索引文件.index,把新的Segment添加到segments这个跳表中。flush方法会吧recoveryPoint到LEO之间的消息刷新到磁盘上,修改recoveryPoint的值。
def flush(offset: Long) : Unit = { //offset之前的消息已经刷新到磁盘上,则不需要再刷新 if (offset <= this.recoveryPoint) return debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " + time.milliseconds + " unflushed = " + unflushedMessages) //logSegments方法通查到recoveryPoint和offset之间的LogSegment对象 for(segment <- logSegments(this.recoveryPoint, offset)) //调用LogSegment.flush()会调用日志文件和索引文件的flush写到磁盘中 segment.flush() lock synchronized { if(offset > this.recoveryPoint) { this.recoveryPoint = offset//修改recoveryPiont lastflushedTime.set(time.milliseconds) } } }
转载地址:http://wnxx.baihongyu.com/