gotidb/pkg/storage/engine.go

273 lines
6.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package storage
import (
"context"
"fmt"
"sync"
"time"
"git.pyer.club/kingecg/gotidb/pkg/model"
)
// PersistenceType 持久化类型
type PersistenceType string
const (
// PersistenceTypeNone 不持久化
PersistenceTypeNone PersistenceType = "none"
// PersistenceTypeWAL 使用WAL日志持久化
PersistenceTypeWAL PersistenceType = "wal"
// PersistenceTypeBoltDB 使用BoltDB持久化
PersistenceTypeBoltDB PersistenceType = "boltdb"
)
// PersistenceConfig 持久化配置
type PersistenceConfig struct {
Type PersistenceType // 持久化类型
Directory string // 持久化目录
SyncEvery int // 每写入多少条数据同步一次
}
// StorageEngine 存储引擎接口
type StorageEngine interface {
// Write 写入数据
Write(ctx context.Context, id model.DataPointID, value model.DataValue) error
// Read 读取数据
Read(ctx context.Context, id model.DataPointID) ([]model.DataValue, error)
// GetLatest 获取最新数据
GetLatest(ctx context.Context, id model.DataPointID) (model.DataValue, error)
// GetDuration 获取持续时间
GetDuration(ctx context.Context, id model.DataPointID) (time.Duration, error)
// EnablePersistence 启用持久化
EnablePersistence(config PersistenceConfig) error
// Close 关闭存储引擎
Close() error
}
// MemoryEngine 内存存储引擎
type MemoryEngine struct {
buffLen int // every metric buffer size
data map[string]*model.CircularBuffer // 数据存储
dataLock sync.RWMutex // 数据锁
persister Persister // 持久化器
}
// ReadLatest 读取最新数据GetLatest 的别名)
func (e *MemoryEngine) ReadLatest(ctx context.Context, id model.DataPointID) (model.DataValue, error) {
return e.GetLatest(ctx, id)
}
// BatchWrite 批量写入数据
func (e *MemoryEngine) BatchWrite(ctx context.Context, batch []struct {
ID model.DataPointID
Value model.DataValue
}) error {
for _, item := range batch {
if err := e.Write(ctx, item.ID, item.Value); err != nil {
return err
}
}
return nil
}
// ReadAll 读取所有数据Read 的别名)
func (e *MemoryEngine) ReadAll(ctx context.Context, id model.DataPointID) ([]model.DataValue, error) {
return e.Read(ctx, id)
}
// ReadDuration 读取指定时间范围内的数据
func (e *MemoryEngine) ReadDuration(ctx context.Context, id model.DataPointID, from, to time.Time) ([]model.DataValue, error) {
key := id.String()
e.dataLock.RLock()
buffer, exists := e.data[key]
e.dataLock.RUnlock()
if !exists {
return []model.DataValue{}, nil
}
// 读取所有数据
allValues := buffer.Read()
// 过滤出指定时间范围内的数据
var filteredValues []model.DataValue
for _, value := range allValues {
if (value.Timestamp.Equal(from) || value.Timestamp.After(from)) &&
(value.Timestamp.Equal(to) || value.Timestamp.Before(to)) {
filteredValues = append(filteredValues, value)
}
}
return filteredValues, nil
}
// NewMemoryEngine 创建一个新的内存存储引擎
func NewMemoryEngine(memBufferLen int) *MemoryEngine {
return &MemoryEngine{
buffLen: memBufferLen,
data: make(map[string]*model.CircularBuffer),
}
}
// Write 写入数据
func (e *MemoryEngine) Write(ctx context.Context, id model.DataPointID, value model.DataValue) error {
key := id.String()
e.dataLock.RLock()
buffer, exists := e.data[key]
e.dataLock.RUnlock()
if !exists {
// 如果数据点不存在,创建一个新的环形缓冲区
bufferLen := e.buffLen
if bufferLen <= 0 {
bufferLen = 30
}
buffer = model.NewCircularBuffer(bufferLen)
e.dataLock.Lock()
e.data[key] = buffer
e.dataLock.Unlock()
}
// 写入数据
buffer.Write(value)
// 如果启用了持久化写入WAL日志
if e.persister != nil {
if err := e.persister.Write(id, value); err != nil {
return err
}
}
return nil
}
// Read 读取数据
func (e *MemoryEngine) Read(ctx context.Context, id model.DataPointID) ([]model.DataValue, error) {
key := id.String()
e.dataLock.RLock()
buffer, exists := e.data[key]
e.dataLock.RUnlock()
if !exists {
return []model.DataValue{}, nil
}
return buffer.Read(), nil
}
// GetLatest 获取最新数据
func (e *MemoryEngine) GetLatest(ctx context.Context, id model.DataPointID) (model.DataValue, error) {
key := id.String()
e.dataLock.RLock()
buffer, exists := e.data[key]
e.dataLock.RUnlock()
if !exists {
return model.DataValue{}, nil
}
value, exists := buffer.GetLatest()
if !exists {
return model.DataValue{}, nil
}
return value, nil
}
// GetDuration 获取持续时间
func (e *MemoryEngine) GetDuration(ctx context.Context, id model.DataPointID) (time.Duration, error) {
key := id.String()
e.dataLock.RLock()
buffer, exists := e.data[key]
e.dataLock.RUnlock()
if !exists {
return 0, nil
}
return buffer.GetDuration(), nil
}
// EnablePersistence 启用持久化
func (e *MemoryEngine) EnablePersistence(config PersistenceConfig) error {
var persister Persister
var err error
switch config.Type {
case PersistenceTypeWAL:
// 先从WAL日志恢复数据
entries, err := LoadWAL(config.Directory)
if err != nil {
return fmt.Errorf("failed to load WAL data: %v", err)
}
// 恢复数据到内存引擎
if len(entries) > 0 {
// 临时保存当前的持久化器
tempPersister := e.persister
// 暂时禁用持久化避免恢复过程中触发新的WAL写入
e.persister = nil
// 使用上下文,以便在需要时可以取消操作
ctx := context.Background()
// 遍历WAL条目并恢复数据
for _, entry := range entries {
// 构造数据点ID
id := model.DataPointID{
DeviceID: entry.DeviceID,
MetricCode: entry.MetricCode,
Labels: entry.Labels,
}
// 构造数据值
value := model.DataValue{
Timestamp: entry.Timestamp,
Value: entry.Value,
}
// 写入内存引擎(不经过持久化器)
if err := e.Write(ctx, id, value); err != nil {
// 恢复原来的持久化器
e.persister = tempPersister
return fmt.Errorf("failed to restore WAL entry: %v", err)
}
}
// 恢复原来的持久化器
e.persister = tempPersister
}
// 创建新的WAL持久化器
persister, err = NewWALPersister(config.Directory, config.SyncEvery)
case PersistenceTypeNone:
// 不启用持久化
e.persister = nil
return nil
default:
// 默认不启用持久化
e.persister = nil
return nil
}
if err != nil {
return err
}
e.persister = persister
return nil
}
// Close 关闭存储引擎
func (e *MemoryEngine) Close() error {
if e.persister != nil {
return e.persister.Close()
}
return nil
}