inlfuxdb源码分析-写入分析(三)
写TSM文件源码分析:
从单测用例 $GOPATH/influxdata/influxdb/tsdb/engine/tsm1/writer_test.go#TestTSMWriter_Write_Single开始跟代码
1 | func TestTSMWriter_Write_Single(t *testing.T) { |
我们来分析一下TSMWriter这个类
1 | type tsmWriter struct { |
看一下TSMWriter的write方法
// Write writes a new block containing key and values.
func (t *tsmWriter) Write(key string, values Values) error {
//传入key和values,values对象我们后面再分析
if len(key) > maxKeyLength {
return ErrMaxKeyLengthExceeded
}
// Nothing to write
if len(values) == 0 {
return nil
}
// Write header only after we have some data to write.
// 先写入头信息
if t.n == 0 {
if err := t.writeHeader(); err != nil {
return err
}
}
// 写入body信息,这里可以对照上文中给出的数据结构来看
block, err := values.Encode(nil)
if err != nil {
return err
}
blockType, err := BlockType(block)
if err != nil {
return err
}
var checksum [crc32.Size]byte
binary.BigEndian.PutUint32(checksum[:], crc32.ChecksumIEEE(block))
_, err = t.w.Write(checksum[:])
if err != nil {
return err
}
n, err := t.w.Write(block)
if err != nil {
return err
}
n += len(checksum)
// Record this block in index
// 写入directIndex
t.index.Add(key, blockType, values[0].UnixNano(), values[len(values)-1].UnixNano(), t.n, uint32(n))
// Increment file position pointer
t.n += int64(n)
return nil
}
//t.index.Add方法具体实现
func (d *directIndex) Add(key string, blockType byte, minTime, maxTime int64, offset int64, size uint32) {
d.mu.Lock()
defer d.mu.Unlock()
// d.block是 map[string]*indexEntries,最终大概的结构是
// ex: key是cpu,{cpu: [{‘MinTime’: xxx, ‘MaxTime’,:xxx, ‘offset’: xxx, ’size’: xxx}], ‘mem’: ...}
entries := d.blocks[key]
if entries == nil {
entries = &indexEntries{
Type: blockType,
}
d.blocks[key] = entries
// size of the key stored in the index
d.size += uint32(2 + len(key))
// size of the count of entries stored in the index
d.size += indexCountSize
}
//entries []IndexEntry
entries.entries = append(entries.entries, IndexEntry{
MinTime: minTime,
MaxTime: maxTime,
Offset: offset,
Size: size,
})
// size of the encoded index entry
d.size += indexEntrySize
}
下面我们看看tsmWriter的writeIndex方法的实现
func (t *tsmWriter) WriteIndex() error {
indexPos := t.n
if t.index.KeyCount() == 0 {
return ErrNoValues
}
// Write the index
if _, err := t.index.WriteTo(t.w); err != nil {
return err
}
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], uint64(indexPos))
// Write the index index position
_, err := t.w.Write(buf[:])
return err
}
主要看t.index.WriteTo(t.w) 这个方法
要记住之前index的结构

type directIndex struct {
mu sync.RWMutex
size uint32
blocks map[string]*indexEntries
}
这个结构为了后面writeIndex准备
Reader.go#1342
type indexEntries struct {
Type byte
entries []IndexEntry
}
writer.go#172
type IndexEntry struct {
// The min and max time of all points stored in the block.
MinTime, MaxTime int64
// The absolute position in the file where this block is located.
Offset int64
// The size in bytes of the block in the file.
Size uint32
}
//把用到的几个对象放在上面,以便看的更清楚一些
func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
d.mu.RLock()
defer d.mu.RUnlock()
// Index blocks are writtens sorted by key
// keys: [‘cpu’, ‘mem’, xxxx]
keys := make([]string, 0, len(d.blocks))
for k := range d.blocks {
keys = append(keys, k)
}
// 字典排序
sort.Strings(keys)
var (
n int
err error
buf [5]byte
N int64
)
// For each key, individual entries are sorted by time
for _, key := range keys {
entries := d.blocks[key]
if entries.Len() > maxIndexEntries {
return N, fmt.Errorf("key '%s' exceeds max index entries: %d > %d", key, entries.Len(), maxIndexEntries)
}
sort.Sort(entries)
//2字节的key len
binary.BigEndian.PutUint16(buf[0:2], uint16(len(key)))
//1字节的key Type
buf[2] = entries.Type
//2字节的内容长度
binary.BigEndian.PutUint16(buf[3:5], uint16(entries.Len()))
// Append the key length and key
if n, err = w.Write(buf[0:2]); err != nil {
return int64(n) + N, fmt.Errorf("write: writer key length error: %v", err)
}
N += int64(n)
if n, err = io.WriteString(w, key); err != nil {
return int64(n) + N, fmt.Errorf("write: writer key error: %v", err)
}
N += int64(n)
// Append the block type and count
if n, err = w.Write(buf[2:5]); err != nil {
return int64(n) + N, fmt.Errorf("write: writer block type and count error: %v", err)
}
N += int64(n)
// Append each index entry for all blocks for this key
var n64 int64
//在主要看这个方法
if n64, err = entries.WriteTo(w); err != nil {
return n64 + N, fmt.Errorf("write: writer entries error: %v", err)
}
N += n64
}
return N, nil
}
func (a *indexEntries) WriteTo(w io.Writer) (total int64, err error) {
var buf [indexEntrySize]byte
var n int
for _, entry := range a.entries {
entry.AppendTo(buf[:])
n, err = w.Write(buf[:])
total += int64(n)
if err != nil {
return total, err
}
}
return total, nil
}
func (e *IndexEntry) AppendTo(b []byte) []byte {
if len(b) < indexEntrySize {
if cap(b) < indexEntrySize {
b = make([]byte, indexEntrySize)
} else {
b = b[:indexEntrySize]
}
}
binary.BigEndian.PutUint64(b[:8], uint64(e.MinTime))
binary.BigEndian.PutUint64(b[8:16], uint64(e.MaxTime))
binary.BigEndian.PutUint64(b[16:24], uint64(e.Offset))
binary.BigEndian.PutUint32(b[24:28], uint32(e.Size))
return b
}
至此写入数据已经完成,我们可以查看tsm产生的文件