323 lines
7.7 KiB
Go
323 lines
7.7 KiB
Go
package storage
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"os"
|
||
"path/filepath"
|
||
"sync"
|
||
"time"
|
||
|
||
"git.pyer.club/kingecg/gotidb/pkg/model"
|
||
"go.etcd.io/bbolt"
|
||
)
|
||
|
||
const (
|
||
// PersistenceTypeBoltDB BoltDB持久化类型
|
||
// PersistenceTypeBoltDB PersistenceType = "boltdb"
|
||
|
||
// 默认bucket名称
|
||
devicesBucketName = "devices"
|
||
dataBucketName = "data"
|
||
metaBucketName = "meta"
|
||
)
|
||
|
||
// BoltDBConfig BoltDB配置
|
||
type BoltDBConfig struct {
|
||
FilePath string // 数据库文件路径
|
||
BucketSize int // 数据分桶大小(默认30,与CircularBuffer大小一致)
|
||
Options *bbolt.Options // BoltDB选项
|
||
}
|
||
|
||
// BoltDBEngine BoltDB存储引擎
|
||
type BoltDBEngine struct {
|
||
db *bbolt.DB
|
||
cache *sync.Map // 内存缓存
|
||
config BoltDBConfig // 配置信息
|
||
mu sync.RWMutex // 并发控制
|
||
}
|
||
|
||
// NewBoltDBEngine 创建一个新的BoltDB存储引擎
|
||
func NewBoltDBEngine(config BoltDBConfig) (*BoltDBEngine, error) {
|
||
// 确保目录存在
|
||
if err := os.MkdirAll(filepath.Dir(config.FilePath), 0755); err != nil {
|
||
return nil, fmt.Errorf("failed to create database directory: %v", err)
|
||
}
|
||
|
||
// 设置默认值
|
||
if config.BucketSize == 0 {
|
||
config.BucketSize = 30 // 与CircularBuffer大小一致
|
||
}
|
||
|
||
// 打开数据库
|
||
db, err := bbolt.Open(config.FilePath, 0644, config.Options)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to open database: %v", err)
|
||
}
|
||
|
||
// 初始化根bucket
|
||
err = db.Update(func(tx *bbolt.Tx) error {
|
||
_, err := tx.CreateBucketIfNotExists([]byte(devicesBucketName))
|
||
return err
|
||
})
|
||
if err != nil {
|
||
db.Close()
|
||
return nil, fmt.Errorf("failed to create root bucket: %v", err)
|
||
}
|
||
|
||
return &BoltDBEngine{
|
||
db: db,
|
||
cache: &sync.Map{},
|
||
config: config,
|
||
}, nil
|
||
}
|
||
|
||
// Write 写入数据
|
||
func (e *BoltDBEngine) Write(ctx context.Context, id model.DataPointID, value model.DataValue) error {
|
||
// 获取设备bucket路径
|
||
deviceKey := id.String()
|
||
|
||
// 序列化数据
|
||
data, err := json.Marshal(value)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to marshal value: %v", err)
|
||
}
|
||
|
||
// 写入数据库
|
||
err = e.db.Update(func(tx *bbolt.Tx) error {
|
||
// 获取或创建设备bucket
|
||
devices := tx.Bucket([]byte(devicesBucketName))
|
||
if devices == nil {
|
||
return fmt.Errorf("devices bucket not found")
|
||
}
|
||
|
||
deviceBucket, err := devices.CreateBucketIfNotExists([]byte(deviceKey))
|
||
if err != nil {
|
||
return fmt.Errorf("failed to create device bucket: %v", err)
|
||
}
|
||
|
||
// 获取或创建数据bucket
|
||
dataBucket, err := deviceBucket.CreateBucketIfNotExists([]byte(dataBucketName))
|
||
if err != nil {
|
||
return fmt.Errorf("failed to create data bucket: %v", err)
|
||
}
|
||
|
||
// 获取或创建元数据bucket
|
||
metaBucket, err := deviceBucket.CreateBucketIfNotExists([]byte(metaBucketName))
|
||
if err != nil {
|
||
return fmt.Errorf("failed to create meta bucket: %v", err)
|
||
}
|
||
|
||
// 写入数据
|
||
timestamp := []byte(value.Timestamp.Format(time.RFC3339Nano))
|
||
if err := dataBucket.Put(timestamp, data); err != nil {
|
||
return fmt.Errorf("failed to write data: %v", err)
|
||
}
|
||
|
||
// 更新最新值
|
||
if err := metaBucket.Put([]byte("latest"), data); err != nil {
|
||
return fmt.Errorf("failed to update latest value: %v", err)
|
||
}
|
||
|
||
return nil
|
||
})
|
||
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 更新缓存
|
||
if buffer, ok := e.cache.Load(deviceKey); ok {
|
||
buffer.(*model.CircularBuffer).Write(value)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// Read 读取数据
|
||
func (e *BoltDBEngine) Read(ctx context.Context, id model.DataPointID) ([]model.DataValue, error) {
|
||
deviceKey := id.String()
|
||
|
||
// 尝试从缓存读取
|
||
if buffer, ok := e.cache.Load(deviceKey); ok {
|
||
return buffer.(*model.CircularBuffer).Read(), nil
|
||
}
|
||
|
||
// 从数据库读取
|
||
var values []model.DataValue
|
||
err := e.db.View(func(tx *bbolt.Tx) error {
|
||
devices := tx.Bucket([]byte(devicesBucketName))
|
||
if devices == nil {
|
||
return nil // 没有数据
|
||
}
|
||
|
||
deviceBucket := devices.Bucket([]byte(deviceKey))
|
||
if deviceBucket == nil {
|
||
return nil // 没有数据
|
||
}
|
||
|
||
dataBucket := deviceBucket.Bucket([]byte(dataBucketName))
|
||
if dataBucket == nil {
|
||
return nil // 没有数据
|
||
}
|
||
|
||
// 遍历所有数据
|
||
return dataBucket.ForEach(func(k, v []byte) error {
|
||
var value model.DataValue
|
||
if err := json.Unmarshal(v, &value); err != nil {
|
||
return fmt.Errorf("failed to unmarshal value: %v", err)
|
||
}
|
||
values = append(values, value)
|
||
return nil
|
||
})
|
||
})
|
||
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 创建并填充缓存
|
||
buffer := model.NewCircularBuffer()
|
||
for _, value := range values {
|
||
buffer.Write(value)
|
||
}
|
||
e.cache.Store(deviceKey, buffer)
|
||
|
||
return values, nil
|
||
}
|
||
|
||
// GetLatest 获取最新数据
|
||
func (e *BoltDBEngine) GetLatest(ctx context.Context, id model.DataPointID) (model.DataValue, error) {
|
||
deviceKey := id.String()
|
||
|
||
// 尝试从缓存读取
|
||
if buffer, ok := e.cache.Load(deviceKey); ok {
|
||
if value, exists := buffer.(*model.CircularBuffer).GetLatest(); exists {
|
||
return value, nil
|
||
}
|
||
}
|
||
|
||
// 从数据库读取
|
||
var latest model.DataValue
|
||
err := e.db.View(func(tx *bbolt.Tx) error {
|
||
devices := tx.Bucket([]byte(devicesBucketName))
|
||
if devices == nil {
|
||
return nil
|
||
}
|
||
|
||
deviceBucket := devices.Bucket([]byte(deviceKey))
|
||
if deviceBucket == nil {
|
||
return nil
|
||
}
|
||
|
||
metaBucket := deviceBucket.Bucket([]byte(metaBucketName))
|
||
if metaBucket == nil {
|
||
return nil
|
||
}
|
||
|
||
data := metaBucket.Get([]byte("latest"))
|
||
if data == nil {
|
||
return nil
|
||
}
|
||
|
||
return json.Unmarshal(data, &latest)
|
||
})
|
||
|
||
if err != nil {
|
||
return model.DataValue{}, err
|
||
}
|
||
|
||
return latest, nil
|
||
}
|
||
|
||
// GetDuration 获取持续时间
|
||
func (e *BoltDBEngine) GetDuration(ctx context.Context, id model.DataPointID) (time.Duration, error) {
|
||
deviceKey := id.String()
|
||
|
||
// 尝试从缓存读取
|
||
if buffer, ok := e.cache.Load(deviceKey); ok {
|
||
return buffer.(*model.CircularBuffer).GetDuration(), nil
|
||
}
|
||
|
||
// 从数据库读取所有数据以计算持续时间
|
||
values, err := e.Read(ctx, id)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
if len(values) == 0 {
|
||
return 0, nil
|
||
}
|
||
|
||
// 找到最新和最旧的值
|
||
latest := values[0]
|
||
oldest := values[0]
|
||
for _, v := range values[1:] {
|
||
if v.Timestamp.After(latest.Timestamp) {
|
||
latest = v
|
||
}
|
||
if v.Timestamp.Before(oldest.Timestamp) {
|
||
oldest = v
|
||
}
|
||
}
|
||
|
||
return latest.Timestamp.Sub(oldest.Timestamp), nil
|
||
}
|
||
|
||
// EnablePersistence 启用持久化
|
||
func (e *BoltDBEngine) EnablePersistence(config PersistenceConfig) error {
|
||
// BoltDB本身就是持久化的,所以这里不需要额外的操作
|
||
return nil
|
||
}
|
||
|
||
// BatchWrite 批量写入数据
|
||
func (e *BoltDBEngine) BatchWrite(ctx context.Context, batch []struct {
|
||
ID model.DataPointID
|
||
Value model.DataValue
|
||
}) error {
|
||
for _, item := range batch {
|
||
if err := e.Write(ctx, item.ID, item.Value); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// ReadLatest 读取最新数据(GetLatest 的别名)
|
||
func (e *BoltDBEngine) ReadLatest(ctx context.Context, id model.DataPointID) (model.DataValue, error) {
|
||
return e.GetLatest(ctx, id)
|
||
}
|
||
|
||
// ReadAll 读取所有数据(Read 的别名)
|
||
func (e *BoltDBEngine) ReadAll(ctx context.Context, id model.DataPointID) ([]model.DataValue, error) {
|
||
return e.Read(ctx, id)
|
||
}
|
||
|
||
// ReadDuration 读取指定时间范围内的数据
|
||
func (e *BoltDBEngine) ReadDuration(ctx context.Context, id model.DataPointID, from, to time.Time) ([]model.DataValue, error) {
|
||
// deviceKey := id.String()
|
||
|
||
// 从数据库读取所有数据
|
||
values, err := e.Read(ctx, id)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 过滤出指定时间范围内的数据
|
||
var filteredValues []model.DataValue
|
||
for _, value := range values {
|
||
if (value.Timestamp.Equal(from) || value.Timestamp.After(from)) &&
|
||
(value.Timestamp.Equal(to) || value.Timestamp.Before(to)) {
|
||
filteredValues = append(filteredValues, value)
|
||
}
|
||
}
|
||
|
||
return filteredValues, nil
|
||
}
|
||
|
||
// Close 关闭存储引擎
|
||
func (e *BoltDBEngine) Close() error {
|
||
return e.db.Close()
|
||
}
|