InfluxDB Deep Dive - From Index to Data Blocks
Introduction
The previous article mainly introduced the main components of the storage engine and their relationships. Based on readers’ understanding of the general working principles of the storage engine, this article continues to introduce how InfluxDB indexes data on disk and how query statements quickly find corresponding data and return results through indexes, starting from basic concepts.
From B+ Tree to LSM Tree
Building Cursors
Users can perform query operations on InfluxDB using InfluxQL, which is an SQL-like query language that allows developers accustomed to using SQL to query data to quickly develop with InfluxDB. SELECT statements undergo lexical analysis, syntactic analysis, and finally extract options such as measurement, field, tagSet, and time-series conditions. According to the official design document of the storage engine, a series of cursors are pre-constructed before executing query statements. A cursor is created for each field involved in the query statement. Different domain data are abstracted into Value interface types at the upper layer to shield differences between different data types. Conditional iteration is performed on individual field cursors based on conditions in conditional expressions. The minimum set of query results is obtained through intersection and union operations, and finally data from multiple cursors are merged according to index to obtain the desired results. Simply put, unlike relational databases, time-series databases are more like KV stores where what appears to be row views are actually composed by merging multi-column data.
Data columns are indexed by Series key and timestamp. FROM helps us find the corresponding measurement, time-related conditional expressions in WHERE help us find the corresponding retention policy and shard within the time range, pulling data from two data sources Cache and FileStore. The FileStore interface type provides convenience for quickly finding index files and TSM files while shielding underlying search details. Part of the data that each cursor needs to iterate comes from Cache mapping lookup, and another part comes from FileStore. However, in the cursor construction phase, only the Value collection in Cache is retrieved, while Values in TSM files have not been read yet. Instead, a series of Block numbers containing data points are found through mmap index files.
// Using auto-generation techniques to solve construction of different types of cursors, taking int type as an example
func (e *Engine) buildIntegerCursor(ctx context.Context, measurement, seriesKey, field string, opt query.IteratorOptions) integerCursor {
key := SeriesFieldKeyBytes(seriesKey, field)
// Directly get []Value through series key
cacheValues := e.Cache.Values(key)
// Get block numbers containing data points from index
keyCursor := e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending)
return newIntegerCursor(opt.SeekTime(), opt.Ascending, cacheValues, keyCursor)
}
// Called by e.KeyCursor(..)
func newKeyCursor(ctx context.Context, fs *FileStore, key []byte, t int64, ascending bool) *KeyCursor {
c := &KeyCursor{
key: key,
seeks: fs.locations(key, t, ascending),
ctx: ctx,
col: metrics.GroupFromContext(ctx),
ascending: ascending,
}
if ascending {
sort.Sort(ascLocations(c.seeks))
} else {
sort.Sort(descLocations(c.seeks))
}
// Reference target TSM files
for _, f := range c.seeks {
f.r.Ref()
}
c.seek(t)
return c
}
// Return block numbers containing points
func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
var cache []IndexEntry
locations := make([]*location, 0, len(f.files))
for _, fd := range f.files {
minTime, maxTime := fd.TimeRange()
// Skip TSM files that don't meet temporal conditions
if ascending && maxTime < t {
continue
} else if !ascending && minTime > t {
continue
}
tombstones := fd.TombstoneRange(key)
// Find data blocks in file that may contain target field points
// Involves binary search on mmap index file
// Returns offset and size of blocks containing key in file
entries := fd.ReadEntries(key, &cache)
LOOP:
for i := 0; i < len(entries); i++ {
ie := entries[i]
// Skip deleted blocks
for _, t := range tombstones {
if t.Min <= ie.MinTime && t.Max >= ie.MaxTime {
continue LOOP
}
}
// Skip blocks that don't meet conditions
if ascending && ie.MaxTime < t {
continue
} else if !ascending && ie.MinTime > t {
continue
}
location := &location{
r: fd,
entry: ie,
}
if ascending {
// For ascending cursor, mark previous time as read
location.readMin = math.MinInt64
location.readMax = t - 1
} else {
// Descending cursor similarly
location.readMin = t + 1
location.readMax = math.MaxInt64
}
// Return file and target point set blocks
locations = append(locations, location)
}
}
return locations
}The cursor itself is also an interface type with clear and simple interface methods that have strong generality. Various types of cursors can be easily extended from the basic cursor interface to meet query requirements, while providing multiple different iterators to support various aggregations, transformations, and ordered operations on data.
type cursor interface {
close() error
next() (t int64, v interface{})
}
// Buffer cursor, specifically used for applying some SQL functions like min, max to certain fields
type bufCursor struct {
cur cursor
buf struct {
key int64
value interface{}
filled bool
}
ascending bool
}
func newBufCursor(cur cursor, ascending bool) *bufCursor {
return &bufCursor{cur: cur, ascending: ascending}
}
// Ordinary cursor for iterating values of some int-type field, other types also use auto-generation in batches
type integerCursor interface {
cursor
nextInteger() (t int64, v int64)
}
func newIntegerCursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) integerCursor {
if ascending {
return newIntegerAscendingCursor(seek, cacheValues, tsmKeyCursor)
}
return newIntegerDescendingCursor(seek, cacheValues, tsmKeyCursor)
}
// Concrete structure implementing integerCursor interface
type integerAscendingCursor struct {
cache struct {
values Values
pos int
}
tsm struct {
values []IntegerValue
pos int
keyCursor *KeyCursor
}
}
func newIntegerAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *integerAscendingCursor {
c := &integerAscendingCursor{}
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
return c.cache.values[i].UnixNano() >= seek
})
c.tsm.keyCursor = tsmKeyCursor
// Read block
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.values)
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
return c.tsm.values[i].UnixNano() >= seek
})
return c
}
func (c *integerAscendingCursor) nextTSM() {
c.tsm.pos++
if c.tsm.pos >= len(c.tsm.values) {
c.tsm.keyCursor.Next()
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.values)
if len(c.tsm.values) == 0 {
return
}
c.tsm.pos = 0
}
}We can see the important method ReadIntegerBlock for reading blocks, where the actual operation of iteratively reading blocks on disk occurs. Let’s look at this method in more detail:
func (c *KeyCursor) ReadIntegerBlock(buf *[]IntegerValue) ([]IntegerValue, error) {
first := c.current[0]
*buf = (*buf)[:0]
var values IntegerValues
values, err := first.r.ReadIntegerBlockAt(&first.entry, buf)
// ...
}ReadIntegerBlockAt finds the corresponding block in the file through the passed offset. The file here is actually accessing an mmap memory-mapped file. This approach allows us to access a file with better performance as if accessing a byte slice in memory, and writing only requires writing to this slice, which can then be automatically flushed back by the operating system or manually flushed. Access to mmap-mapped memory is handled by mmapAccessor. Let’s briefly examine its structure and usage:
type mmapAccessor struct {
accessCount uint64 // Access count
freeCount uint64 // Check reference count before release
mmapWillNeed bool
mu sync.RWMutex
b []byte // Block mapped to memory
f *os.File // File mapped to memory
index *indirectIndex
}
// Storage engine creates mmap memory mapping for each TSM file
func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
t := &TSMReader{}
// Variable-length function parameters for custom configuration, plugin-style configuration design can reduce changes to exported API interfaces
for _, option := range options {
option(t)
}
stat, err := f.Stat()
if err != nil {
return nil, err
}
t.size = stat.Size()
t.lastModified = stat.ModTime().UnixNano()
t.accessor = &mmapAccessor{
f: f,
mmapWillNeed: t.madviseWillNeed,
}
index, err := t.accessor.init()
if err != nil {
return nil, err
}
t.index = index
t.tombstoner = NewTombstoner(t.Path(), index.ContainsKey)
if err := t.applyTombstones(); err != nil {
return nil, err
}
return t, nil
}
// General method for reading blocks from mmap memory
// first.r.ReadIntegerBlockAt uses auto-generated methods for reading int-type blocks to reduce memory overhead from type conversion
func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) {
m.incAccess()
m.mu.RLock()
defer m.mu.RUnlock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
return nil, ErrTSMClosed
}
// Read TSM file block according to offset in entry
var err error
values, err = DecodeBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
if err != nil {
return nil, err
}
return values, nil
}So far, each cursor has obtained values from their respective columns. How are these values combined together? Let’s continue reading.
Iterator Merging
When creating tagSet iterators initially, a query.Iterator slice will be returned, and multiple iterators will be merged into one iterator for return. The merging details are as follows:
func (a Iterators) Merge(opt IteratorOptions) (Iterator, error) {
// Whether it's a built-in function expression
call, ok := opt.Expr.(*influxql.Call)
// Determine whether ordered output is needed based on opt options
if !ok && opt.MergeSorted() {
// Creating ordered merge iterator is needed for name or tag ordered output
itr := NewSortedMergeIterator(a, opt)
if itr != nil && opt.InterruptCh != nil {
itr = NewInterruptIterator(itr, opt.InterruptCh)
}
return itr, nil
}
// No need for ordered output,
itr := NewMergeIterator(a, opt)
if itr == nil {
return nil, nil
}
// ...
if !ok {
return itr, nil
}
}Essentially, the parsing process of SELECT statements can be summarized as a recursive descent process in compilation theory, top-down respectively being shard - engine - tagSet - series key. Iterators are generated bottom-up and merged through the above method before being returned to their upper layer. query.Iterator is the universal iterator interface connecting all these.
func buildCursor(ctx context.Context, stmt *influxql.SelectStatement, ic IteratorCreator, opt IteratorOptions) (Cursor, error) {
// ...
// Produce an iterator for every single call and create an iterator scanner
// associated with it.
scanners := make([]IteratorScanner, 0, len(valueMapper.calls))
for call := range valueMapper.calls {
driver := valueMapper.table[call]
if driver.Type == influxql.Unknown {
// The primary driver of this call is of unknown type, so skip this.
continue
}
itr, err := buildFieldIterator(ctx, call, ic, stmt.Sources, opt, selector, stmt.Target != nil)
if err != nil {
for _, s := range scanners {
s.Close()
}
return nil, err
}
keys := make([]influxql.VarRef, 0, len(auxKeys)+1)
keys = append(keys, driver)
keys = append(keys, auxKeys...)
scanner := NewIteratorScanner(itr, keys, opt.FillValue)
scanners = append(scanners, scanner)
}
if len(scanners) == 0 {
return newNullCursor(fields), nil
} else if len(scanners) == 1 {
return newScannerCursor(scanners[0], fields, opt), nil
}
return newMultiScannerCursor(scanners, fields, opt), nil
}