gotidb/pkg/storage/engine.go

222 lines
5.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package storage
import (
"context"
"sync"
"time"
"git.pyer.club/kingecg/gotidb/pkg/model"
)
// PersistenceType 持久化类型
type PersistenceType string
const (
// PersistenceTypeNone 不持久化
PersistenceTypeNone PersistenceType = "none"
// PersistenceTypeWAL 使用WAL日志持久化
PersistenceTypeWAL PersistenceType = "wal"
// PersistenceTypeBoltDB 使用BoltDB持久化
PersistenceTypeBoltDB PersistenceType = "boltdb"
)
// PersistenceConfig 持久化配置
type PersistenceConfig struct {
Type PersistenceType // 持久化类型
Directory string // 持久化目录
SyncEvery int // 每写入多少条数据同步一次
}
// StorageEngine 存储引擎接口
type StorageEngine interface {
// Write 写入数据
Write(ctx context.Context, id model.DataPointID, value model.DataValue) error
// Read 读取数据
Read(ctx context.Context, id model.DataPointID) ([]model.DataValue, error)
// GetLatest 获取最新数据
GetLatest(ctx context.Context, id model.DataPointID) (model.DataValue, error)
// GetDuration 获取持续时间
GetDuration(ctx context.Context, id model.DataPointID) (time.Duration, error)
// EnablePersistence 启用持久化
EnablePersistence(config PersistenceConfig) error
// Close 关闭存储引擎
Close() error
}
// MemoryEngine 内存存储引擎
type MemoryEngine struct {
data map[string]*model.CircularBuffer // 数据存储
dataLock sync.RWMutex // 数据锁
persister Persister // 持久化器
}
// ReadLatest 读取最新数据GetLatest 的别名)
func (e *MemoryEngine) ReadLatest(ctx context.Context, id model.DataPointID) (model.DataValue, error) {
return e.GetLatest(ctx, id)
}
// BatchWrite 批量写入数据
func (e *MemoryEngine) BatchWrite(ctx context.Context, batch []struct {
ID model.DataPointID
Value model.DataValue
}) error {
for _, item := range batch {
if err := e.Write(ctx, item.ID, item.Value); err != nil {
return err
}
}
return nil
}
// ReadAll 读取所有数据Read 的别名)
func (e *MemoryEngine) ReadAll(ctx context.Context, id model.DataPointID) ([]model.DataValue, error) {
return e.Read(ctx, id)
}
// ReadDuration 读取指定时间范围内的数据
func (e *MemoryEngine) ReadDuration(ctx context.Context, id model.DataPointID, from, to time.Time) ([]model.DataValue, error) {
key := id.String()
e.dataLock.RLock()
buffer, exists := e.data[key]
e.dataLock.RUnlock()
if !exists {
return []model.DataValue{}, nil
}
// 读取所有数据
allValues := buffer.Read()
// 过滤出指定时间范围内的数据
var filteredValues []model.DataValue
for _, value := range allValues {
if (value.Timestamp.Equal(from) || value.Timestamp.After(from)) &&
(value.Timestamp.Equal(to) || value.Timestamp.Before(to)) {
filteredValues = append(filteredValues, value)
}
}
return filteredValues, nil
}
// NewMemoryEngine 创建一个新的内存存储引擎
func NewMemoryEngine() *MemoryEngine {
return &MemoryEngine{
data: make(map[string]*model.CircularBuffer),
}
}
// Write 写入数据
func (e *MemoryEngine) Write(ctx context.Context, id model.DataPointID, value model.DataValue) error {
key := id.String()
e.dataLock.RLock()
buffer, exists := e.data[key]
e.dataLock.RUnlock()
if !exists {
// 如果数据点不存在,创建一个新的环形缓冲区
buffer = model.NewCircularBuffer()
e.dataLock.Lock()
e.data[key] = buffer
e.dataLock.Unlock()
}
// 写入数据
buffer.Write(value)
// 如果启用了持久化写入WAL日志
if e.persister != nil {
if err := e.persister.Write(id, value); err != nil {
return err
}
}
return nil
}
// Read 读取数据
func (e *MemoryEngine) Read(ctx context.Context, id model.DataPointID) ([]model.DataValue, error) {
key := id.String()
e.dataLock.RLock()
buffer, exists := e.data[key]
e.dataLock.RUnlock()
if !exists {
return []model.DataValue{}, nil
}
return buffer.Read(), nil
}
// GetLatest 获取最新数据
func (e *MemoryEngine) GetLatest(ctx context.Context, id model.DataPointID) (model.DataValue, error) {
key := id.String()
e.dataLock.RLock()
buffer, exists := e.data[key]
e.dataLock.RUnlock()
if !exists {
return model.DataValue{}, nil
}
value, exists := buffer.GetLatest()
if !exists {
return model.DataValue{}, nil
}
return value, nil
}
// GetDuration 获取持续时间
func (e *MemoryEngine) GetDuration(ctx context.Context, id model.DataPointID) (time.Duration, error) {
key := id.String()
e.dataLock.RLock()
buffer, exists := e.data[key]
e.dataLock.RUnlock()
if !exists {
return 0, nil
}
return buffer.GetDuration(), nil
}
// EnablePersistence 启用持久化
func (e *MemoryEngine) EnablePersistence(config PersistenceConfig) error {
var persister Persister
var err error
switch config.Type {
case PersistenceTypeWAL:
persister, err = NewWALPersister(config.Directory, config.SyncEvery)
case PersistenceTypeNone:
// 不启用持久化
e.persister = nil
return nil
default:
// 默认不启用持久化
e.persister = nil
return nil
}
if err != nil {
return err
}
e.persister = persister
return nil
}
// Close 关闭存储引擎
func (e *MemoryEngine) Close() error {
if e.persister != nil {
return e.persister.Close()
}
return nil
}