博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka日志存储(六):Log
阅读量:271 次
发布时间:2019-03-01

本文共 8324 字,大约阅读时间需要 27 分钟。

Log是对多个LogSegemnt对象的顺序组合,形成一个逻辑的日志。为了快速定位LogSegment,Log使用调表对LogSegment进行管理。

向Log中追加信息数顺序写入的,那么只有最后一个LogSegment能够进行写入操作,之前所有的LogSegment都不能写入数据。最后一个LogSegment使用activeSegement()获取,下面是Log的一些基本字段。
 

class Log(val dir: File,//Log对应的磁盘目录,存放每个LogSegment对应的日志文件和索引文件。          @volatile var config: LogConfig,//Log相关的配置信息          @volatile var recoveryPoint: Long = 0L,//指定恢复操作的其实offset,recoveryPoint之前的Message已经刷到了磁盘上持久存储,后面的消息不一定,出现宕机时有可能会消失,所以只需要恢复recoveryPoint之前的消息就可以了。          scheduler: Scheduler,          time: Time = SystemTime) extends Logging with KafkaMetricsGroup {  import kafka.log.Log._  /* A lock that guards all modifications to the log */  private val lock = new Object  /* last time it was flushed */  private val lastflushedTime = new AtomicLong(time.milliseconds)  def initFileSize() : Int = {    if (config.preallocate)      config.segmentSize    else      0  }  /* the actual segments of the log */  private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]  loadSegments()  /* 用来分配消息的offset,同时也是副本的LEO。它的messageOffset记录了Log最后一个offset值,segmentBaseOffset字段记录了activeSegment的baseOffset,relativePositionInSegment记录了activeSegment的大小 */  @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)}

向Log追加消息的功能由append()完成,把ProducerRequest解析成ByteBufferMessageSet。

 

def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {    //检测消息长度和校验码,得到LogAppendInfo对象如果为直接返回    val appendInfo = analyzeAndValidateMessageSet(messages)    // if we have any valid messages, append them to the log    if (appendInfo.shallowCount == 0)      return appendInfo    // 截断未通过校验的字段。    var validMessages = trimInvalidBytes(messages, appendInfo)    try {      // 插入Log中      lock synchronized {        // 判断是否需要分配offset,默认需要        if (assignOffsets) {          //获取nextOffsetMetadata记录的messageOffset字段,从此值后开发分配offset。             val offset = new LongRef(nextOffsetMetadata.messageOffset)          //firstOffset记录第一条分配的offset,受压缩信息的影响          appendInfo.firstOffset = offset.value          val now = time.milliseconds          val (validatedMessages, messageSizesMaybeChanged) = try {          //进一步验证并分配offset            validMessages.validateMessagesAndAssignOffsets(offset,                                                           now,                                                           appendInfo.sourceCodec,                                                           appendInfo.targetCodec,                                                           config.compact,                                                           config.messageFormatVersion.messageFormatVersion,                                                           config.messageTimestampType,                                                           config.messageTimestampDifferenceMaxMs)          } catch {            case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)          }          validMessages = validatedMessages          //最后一条消息的offset,不受压缩影响          appendInfo.lastOffset = offset.value - 1          //修改时间戳          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)            appendInfo.timestamp = now          // 如果在validateMessagesAndAssignOffsets修改了长度,就需要重新检查。          if (messageSizesMaybeChanged) {            for (messageAndOffset <- validMessages.shallowIterator) {              if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {                // we record the original message set size instead of the trimmed size                // to be consistent with pre-compression bytesRejectedRate recording                BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)                BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)                throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."                  .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))              }            }          }        } else {          // we are taking the offsets we are given          if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)            throw new IllegalArgumentException("Out of order offsets found in " + messages)        }        // ByteBufferMessageSet大小是否大于LogConfig        if (validMessages.sizeInBytes > config.segmentSize) {          throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."            .format(validMessages.sizeInBytes, config.segmentSize))        }        // 获取activeSegment,可能会之前是activeSegment慢了重新分配        val segment = maybeRoll(validMessages.sizeInBytes)        // 追加消息        segment.append(appendInfo.firstOffset, validMessages)        // 更新LEO        updateLogEndOffset(appendInfo.lastOffset + 1)        trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"          .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))        //查看未刷新到磁盘的数据是否到一定阈值。        if (unflushedMessages >= config.flushInterval)          flush()        appendInfo      }    } catch {      case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)    }  }

analyzeAndValidateMessageSet验证消息长度、校验码、每部offset是否单调递增,并不会解压缩消息:

private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = {    var shallowMessageCount = 0 //记录外层消息的数量    var validBytesCount = 0 //通过验证的Message字节数之和。    var firstOffset, lastOffset = -1L//第一条消息和最后一条消息的offset    var sourceCodec: CompressionCodec = NoCompressionCodec    var monotonic = true//标志生产者为消息分配的内部offset是否单调递增。    for(messageAndOffset <- messages.shallowIterator) {      // 记录第一条消息的offset,此时的offset还是生产者分配的offset      if(firstOffset < 0)        firstOffset = messageAndOffset.offset      // 判断内部offset首付递增      if(lastOffset >= messageAndOffset.offset)        monotonic = false      // 记录最后一条消息的offset      lastOffset = messageAndOffset.offset      val m = messageAndOffset.message      // 检测消息长度。      val messageSize = MessageSet.entrySize(m)      if(messageSize > config.maxMessageSize) {        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)        BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)        throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."          .format(messageSize, config.maxMessageSize))      }      // CRC32校验码确认      m.ensureValid()      //通过检测的外层消息数量      shallowMessageCount += 1      //通过检测的字节数量      validBytesCount += messageSize      val messageCodec = m.compressionCodec      if(messageCodec != NoCompressionCodec)        sourceCodec = messageCodec    }    // 记录服务端的压缩方式    val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)    LogAppendInfo(firstOffset, lastOffset, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)}

maybeRoll方法检测是否满足activeSegment创建新的条件。

1. 当前大小加上即将要追加消息的大小超过LogSegment最大长度。
2. activeSegment的寿命超过了最长存活时间
3. 索引文件满了。
创建新的activeSegment会创建新的.log文件,索引文件.index,把新的Segment添加到segments这个跳表中。
 

flush方法会吧recoveryPoint到LEO之间的消息刷新到磁盘上,修改recoveryPoint的值。

def flush(offset: Long) : Unit = {      //offset之前的消息已经刷新到磁盘上,则不需要再刷新    if (offset <= this.recoveryPoint)      return    debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +          time.milliseconds + " unflushed = " + unflushedMessages)    //logSegments方法通查到recoveryPoint和offset之间的LogSegment对象    for(segment <- logSegments(this.recoveryPoint, offset))      //调用LogSegment.flush()会调用日志文件和索引文件的flush写到磁盘中      segment.flush()    lock synchronized {      if(offset > this.recoveryPoint) {        this.recoveryPoint = offset//修改recoveryPiont        lastflushedTime.set(time.milliseconds)      }    }  }

 

转载地址:http://wnxx.baihongyu.com/

你可能感兴趣的文章