inlfuxdb源码分析-写入分析(三)

inlfuxdb源码分析-写入分析(三)

写TSM文件源码分析:
从单测用例 $GOPATH/influxdata/influxdb/tsdb/engine/tsm1/writer_test.go#TestTSMWriter_Write_Single开始跟代码

1
2
3
4
5
6
7
8
9
10
11
12
13
func TestTSMWriter_Write_Single(t *testing.T) {
//创建空文件夹,使用了ioutil.TempDir("", "tsm1-test”),会根据两个参数随机生成文件夹
dir := MustTempDir()
//函数运行完,删除该文件夹
defer os.RemoveAll(dir)
//调用ioutil.TempFile(dir, "tsm1test”),返回临时文件
f := MustTempFile(dir)
//构建TSMWriter对象, 所有关于tsm文件的写操作全由这个对象完成
w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
. . .

我们来分析一下TSMWriter这个类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
type tsmWriter struct {
wrapped io.Writer //这个不用说了, 任何实现了io.Writer接口的对象
w *bufio.Writer //Writer implements buffering for an io.Writer object,一个带buffering的io.Writer对象
index IndexWriter //索引数据,IndexWriter是一个接口, 具体的实现类为 directIndex见下文
n int64
}

writer.go#240
// directIndex is a simple in-memory index implementation for a TSM file. The full index
// must fit in memory.
type directIndex struct {
mu sync.RWMutex
size uint32
blocks map[string]*indexEntries
}
这个结构为了后面writeIndex准备

Reader.go#1342
type indexEntries struct {
Type byte
entries []IndexEntry
}

writer.go#172
type IndexEntry struct {
// The min and max time of all points stored in the block.
MinTime, MaxTime int64

// The absolute position in the file where this block is located.
Offset int64

// The size in bytes of the block in the file.
Size uint32
}


// NewTSMWriter returns a new TSMWriter writing to w.
func NewTSMWriter(w io.Writer) (TSMWriter, error) {
index := &directIndex{
blocks: map[string]*indexEntries{},
}

return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 1024*1024), index: index}, nil
}

看一下TSMWriter的write方法

// Write writes a new block containing key and values.
func (t *tsmWriter) Write(key string, values Values) error {
   //传入key和values,values对象我们后面再分析
   if len(key) > maxKeyLength {
      return ErrMaxKeyLengthExceeded
   }

   // Nothing to write
   if len(values) == 0 {
      return nil
   }

   // Write header only after we have some data to write.
   // 先写入头信息
   if t.n == 0 {
      if err := t.writeHeader(); err != nil {
         return err
      }
   }

   // 写入body信息,这里可以对照上文中给出的数据结构来看
   block, err := values.Encode(nil)
   if err != nil {
      return err
   }

   blockType, err := BlockType(block)
   if err != nil {
      return err
   }

   var checksum [crc32.Size]byte
   binary.BigEndian.PutUint32(checksum[:], crc32.ChecksumIEEE(block))

   _, err = t.w.Write(checksum[:])
   if err != nil {
      return err
   }

   n, err := t.w.Write(block)
   if err != nil {
      return err
   }
   n += len(checksum)

   // Record this block in index
   // 写入directIndex
   t.index.Add(key, blockType, values[0].UnixNano(), values[len(values)-1].UnixNano(), t.n, uint32(n))

   // Increment file position pointer
   t.n += int64(n)
   return nil
}


//t.index.Add方法具体实现
func (d *directIndex) Add(key string, blockType byte, minTime, maxTime int64, offset int64, size uint32) {
   d.mu.Lock()
   defer d.mu.Unlock()

   // d.block是 map[string]*indexEntries,最终大概的结构是
   // ex: key是cpu,{cpu: [{‘MinTime’: xxx, ‘MaxTime’,:xxx, ‘offset’: xxx, ’size’: xxx}], ‘mem’: ...}
   entries := d.blocks[key]
   if entries == nil {
      entries = &indexEntries{
         Type: blockType,
      }
      d.blocks[key] = entries
      // size of the key stored in the index
      d.size += uint32(2 + len(key))

      // size of the count of entries stored in the index
      d.size += indexCountSize
   }
   //entries []IndexEntry
   entries.entries = append(entries.entries, IndexEntry{
      MinTime: minTime,
      MaxTime: maxTime,
      Offset:  offset,
      Size:    size,
   })

   // size of the encoded index entry
   d.size += indexEntrySize
}

下面我们看看tsmWriter的writeIndex方法的实现

func (t *tsmWriter) WriteIndex() error {
   indexPos := t.n

   if t.index.KeyCount() == 0 {
      return ErrNoValues
   }

   // Write the index
   if _, err := t.index.WriteTo(t.w); err != nil {
      return err
   }

   var buf [8]byte
   binary.BigEndian.PutUint64(buf[:], uint64(indexPos))

   // Write the index index position
   _, err := t.w.Write(buf[:])
   return err
}

主要看t.index.WriteTo(t.w) 这个方法
要记住之前index的结构

image

type directIndex struct {
   mu     sync.RWMutex
   size   uint32
   blocks map[string]*indexEntries
}
这个结构为了后面writeIndex准备

Reader.go#1342
type indexEntries struct {
   Type    byte
   entries []IndexEntry
}

writer.go#172
type IndexEntry struct {
   // The min and max time of all points stored in the block.
   MinTime, MaxTime int64

   // The absolute position in the file where this block is located.
   Offset int64

   // The size in bytes of the block in the file.
   Size uint32
}
//把用到的几个对象放在上面,以便看的更清楚一些



func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
   d.mu.RLock()
   defer d.mu.RUnlock()

   // Index blocks are writtens sorted by key
   // keys: [‘cpu’, ‘mem’, xxxx]
   keys := make([]string, 0, len(d.blocks))
   for k := range d.blocks {
      keys = append(keys, k)
   }
   // 字典排序
   sort.Strings(keys)

   var (
      n   int
      err error
      buf [5]byte
      N   int64
   )

   // For each key, individual entries are sorted by time
   for _, key := range keys {
      entries := d.blocks[key]

      if entries.Len() > maxIndexEntries {
         return N, fmt.Errorf("key '%s' exceeds max index entries: %d > %d", key, entries.Len(), maxIndexEntries)
      }
      sort.Sort(entries)
      //2字节的key len
      binary.BigEndian.PutUint16(buf[0:2], uint16(len(key)))
      //1字节的key Type
      buf[2] = entries.Type
      //2字节的内容长度
      binary.BigEndian.PutUint16(buf[3:5], uint16(entries.Len()))

      // Append the key length and key
      if n, err = w.Write(buf[0:2]); err != nil {
         return int64(n) + N, fmt.Errorf("write: writer key length error: %v", err)
      }
      N += int64(n)

      if n, err = io.WriteString(w, key); err != nil {
         return int64(n) + N, fmt.Errorf("write: writer key error: %v", err)
      }
      N += int64(n)

      // Append the block type and count
      if n, err = w.Write(buf[2:5]); err != nil {
         return int64(n) + N, fmt.Errorf("write: writer block type and count error: %v", err)
      }
      N += int64(n)

      // Append each index entry for all blocks for this key
      var n64 int64
      //在主要看这个方法
      if n64, err = entries.WriteTo(w); err != nil {
         return n64 + N, fmt.Errorf("write: writer entries error: %v", err)
      }
      N += n64

   }
   return N, nil
}


func (a *indexEntries) WriteTo(w io.Writer) (total int64, err error) {
   var buf [indexEntrySize]byte
   var n int

   for _, entry := range a.entries {
      entry.AppendTo(buf[:])
      n, err = w.Write(buf[:])
      total += int64(n)
      if err != nil {
         return total, err
      }
   }

   return total, nil
}

func (e *IndexEntry) AppendTo(b []byte) []byte {
   if len(b) < indexEntrySize {
      if cap(b) < indexEntrySize {
         b = make([]byte, indexEntrySize)
      } else {
         b = b[:indexEntrySize]
      }
   }

   binary.BigEndian.PutUint64(b[:8], uint64(e.MinTime))
   binary.BigEndian.PutUint64(b[8:16], uint64(e.MaxTime))
   binary.BigEndian.PutUint64(b[16:24], uint64(e.Offset))
   binary.BigEndian.PutUint32(b[24:28], uint32(e.Size))

   return b
}

至此写入数据已经完成,我们可以查看tsm产生的文件