feat(storage): 添加 BoltDB 存储引擎
- 新增 BoltDBConfig 和 BoltDBEngine 类型 - 实现了 Write、Read、GetLatest 等存储引擎接口方法 - 添加了 EnablePersistence、BatchWrite 和 ReadDuration 等扩展方法 - 优化了缓存机制,提高读取性能
This commit is contained in:
parent
90d3ecefb8
commit
0db0e02a8a
8
go.mod
8
go.mod
|
@ -9,9 +9,11 @@ require (
|
|||
github.com/gorilla/websocket v1.5.1
|
||||
github.com/nats-io/nats.go v1.31.0
|
||||
github.com/prometheus/client_golang v1.19.1
|
||||
github.com/stretchr/testify v1.9.0
|
||||
github.com/stretchr/testify v1.10.0
|
||||
)
|
||||
|
||||
require go.etcd.io/bbolt v1.4.1 // indirect
|
||||
|
||||
require (
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/bytedance/sonic v1.10.2 // indirect
|
||||
|
@ -51,8 +53,8 @@ require (
|
|||
golang.org/x/crypto v0.26.0 // indirect
|
||||
golang.org/x/mod v0.18.0 // indirect
|
||||
golang.org/x/net v0.28.0 // indirect
|
||||
golang.org/x/sync v0.8.0 // indirect
|
||||
golang.org/x/sys v0.23.0 // indirect
|
||||
golang.org/x/sync v0.10.0 // indirect
|
||||
golang.org/x/sys v0.29.0 // indirect
|
||||
golang.org/x/text v0.17.0 // indirect
|
||||
golang.org/x/tools v0.22.0 // indirect
|
||||
google.golang.org/protobuf v1.33.0 // indirect
|
||||
|
|
7
go.sum
7
go.sum
|
@ -110,10 +110,13 @@ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
|
|||
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
|
||||
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
|
||||
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
|
||||
go.etcd.io/bbolt v1.4.1 h1:5mOV+HWjIPLEAlUGMsveaUvK2+byZMFOzojoi7bh7uI=
|
||||
go.etcd.io/bbolt v1.4.1/go.mod h1:c8zu2BnXWTu2XM4XcICtbGSl9cFwsXtcf9zLt2OncM8=
|
||||
go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
|
||||
go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM=
|
||||
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
|
||||
|
@ -127,11 +130,15 @@ golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
|
|||
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
|
||||
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
|
||||
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
||||
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
|
||||
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
|
||||
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
|
||||
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
|
||||
golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA=
|
||||
|
|
|
@ -0,0 +1,322 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.pyer.club/kingecg/gotidb/pkg/model"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
const (
|
||||
// PersistenceTypeBoltDB BoltDB持久化类型
|
||||
PersistenceTypeBoltDB PersistenceType = "boltdb"
|
||||
|
||||
// 默认bucket名称
|
||||
devicesBucketName = "devices"
|
||||
dataBucketName = "data"
|
||||
metaBucketName = "meta"
|
||||
)
|
||||
|
||||
// BoltDBConfig BoltDB配置
|
||||
type BoltDBConfig struct {
|
||||
FilePath string // 数据库文件路径
|
||||
BucketSize int // 数据分桶大小(默认30,与CircularBuffer大小一致)
|
||||
Options *bbolt.Options // BoltDB选项
|
||||
}
|
||||
|
||||
// BoltDBEngine BoltDB存储引擎
|
||||
type BoltDBEngine struct {
|
||||
db *bbolt.DB
|
||||
cache *sync.Map // 内存缓存
|
||||
config BoltDBConfig // 配置信息
|
||||
mu sync.RWMutex // 并发控制
|
||||
}
|
||||
|
||||
// NewBoltDBEngine 创建一个新的BoltDB存储引擎
|
||||
func NewBoltDBEngine(config BoltDBConfig) (*BoltDBEngine, error) {
|
||||
// 确保目录存在
|
||||
if err := os.MkdirAll(filepath.Dir(config.FilePath), 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create database directory: %v", err)
|
||||
}
|
||||
|
||||
// 设置默认值
|
||||
if config.BucketSize == 0 {
|
||||
config.BucketSize = 30 // 与CircularBuffer大小一致
|
||||
}
|
||||
|
||||
// 打开数据库
|
||||
db, err := bbolt.Open(config.FilePath, 0644, config.Options)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open database: %v", err)
|
||||
}
|
||||
|
||||
// 初始化根bucket
|
||||
err = db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists([]byte(devicesBucketName))
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("failed to create root bucket: %v", err)
|
||||
}
|
||||
|
||||
return &BoltDBEngine{
|
||||
db: db,
|
||||
cache: &sync.Map{},
|
||||
config: config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Write 写入数据
|
||||
func (e *BoltDBEngine) Write(ctx context.Context, id model.DataPointID, value model.DataValue) error {
|
||||
// 获取设备bucket路径
|
||||
deviceKey := id.String()
|
||||
|
||||
// 序列化数据
|
||||
data, err := json.Marshal(value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal value: %v", err)
|
||||
}
|
||||
|
||||
// 写入数据库
|
||||
err = e.db.Update(func(tx *bbolt.Tx) error {
|
||||
// 获取或创建设备bucket
|
||||
devices := tx.Bucket([]byte(devicesBucketName))
|
||||
if devices == nil {
|
||||
return fmt.Errorf("devices bucket not found")
|
||||
}
|
||||
|
||||
deviceBucket, err := devices.CreateBucketIfNotExists([]byte(deviceKey))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create device bucket: %v", err)
|
||||
}
|
||||
|
||||
// 获取或创建数据bucket
|
||||
dataBucket, err := deviceBucket.CreateBucketIfNotExists([]byte(dataBucketName))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create data bucket: %v", err)
|
||||
}
|
||||
|
||||
// 获取或创建元数据bucket
|
||||
metaBucket, err := deviceBucket.CreateBucketIfNotExists([]byte(metaBucketName))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create meta bucket: %v", err)
|
||||
}
|
||||
|
||||
// 写入数据
|
||||
timestamp := []byte(value.Timestamp.Format(time.RFC3339Nano))
|
||||
if err := dataBucket.Put(timestamp, data); err != nil {
|
||||
return fmt.Errorf("failed to write data: %v", err)
|
||||
}
|
||||
|
||||
// 更新最新值
|
||||
if err := metaBucket.Put([]byte("latest"), data); err != nil {
|
||||
return fmt.Errorf("failed to update latest value: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 更新缓存
|
||||
if buffer, ok := e.cache.Load(deviceKey); ok {
|
||||
buffer.(*model.CircularBuffer).Write(value)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read 读取数据
|
||||
func (e *BoltDBEngine) Read(ctx context.Context, id model.DataPointID) ([]model.DataValue, error) {
|
||||
deviceKey := id.String()
|
||||
|
||||
// 尝试从缓存读取
|
||||
if buffer, ok := e.cache.Load(deviceKey); ok {
|
||||
return buffer.(*model.CircularBuffer).Read(), nil
|
||||
}
|
||||
|
||||
// 从数据库读取
|
||||
var values []model.DataValue
|
||||
err := e.db.View(func(tx *bbolt.Tx) error {
|
||||
devices := tx.Bucket([]byte(devicesBucketName))
|
||||
if devices == nil {
|
||||
return nil // 没有数据
|
||||
}
|
||||
|
||||
deviceBucket := devices.Bucket([]byte(deviceKey))
|
||||
if deviceBucket == nil {
|
||||
return nil // 没有数据
|
||||
}
|
||||
|
||||
dataBucket := deviceBucket.Bucket([]byte(dataBucketName))
|
||||
if dataBucket == nil {
|
||||
return nil // 没有数据
|
||||
}
|
||||
|
||||
// 遍历所有数据
|
||||
return dataBucket.ForEach(func(k, v []byte) error {
|
||||
var value model.DataValue
|
||||
if err := json.Unmarshal(v, &value); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal value: %v", err)
|
||||
}
|
||||
values = append(values, value)
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 创建并填充缓存
|
||||
buffer := model.NewCircularBuffer()
|
||||
for _, value := range values {
|
||||
buffer.Write(value)
|
||||
}
|
||||
e.cache.Store(deviceKey, buffer)
|
||||
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// GetLatest 获取最新数据
|
||||
func (e *BoltDBEngine) GetLatest(ctx context.Context, id model.DataPointID) (model.DataValue, error) {
|
||||
deviceKey := id.String()
|
||||
|
||||
// 尝试从缓存读取
|
||||
if buffer, ok := e.cache.Load(deviceKey); ok {
|
||||
if value, exists := buffer.(*model.CircularBuffer).GetLatest(); exists {
|
||||
return value, nil
|
||||
}
|
||||
}
|
||||
|
||||
// 从数据库读取
|
||||
var latest model.DataValue
|
||||
err := e.db.View(func(tx *bbolt.Tx) error {
|
||||
devices := tx.Bucket([]byte(devicesBucketName))
|
||||
if devices == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
deviceBucket := devices.Bucket([]byte(deviceKey))
|
||||
if deviceBucket == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
metaBucket := deviceBucket.Bucket([]byte(metaBucketName))
|
||||
if metaBucket == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
data := metaBucket.Get([]byte("latest"))
|
||||
if data == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return json.Unmarshal(data, &latest)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return model.DataValue{}, err
|
||||
}
|
||||
|
||||
return latest, nil
|
||||
}
|
||||
|
||||
// GetDuration 获取持续时间
|
||||
func (e *BoltDBEngine) GetDuration(ctx context.Context, id model.DataPointID) (time.Duration, error) {
|
||||
deviceKey := id.String()
|
||||
|
||||
// 尝试从缓存读取
|
||||
if buffer, ok := e.cache.Load(deviceKey); ok {
|
||||
return buffer.(*model.CircularBuffer).GetDuration(), nil
|
||||
}
|
||||
|
||||
// 从数据库读取所有数据以计算持续时间
|
||||
values, err := e.Read(ctx, id)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if len(values) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// 找到最新和最旧的值
|
||||
latest := values[0]
|
||||
oldest := values[0]
|
||||
for _, v := range values[1:] {
|
||||
if v.Timestamp.After(latest.Timestamp) {
|
||||
latest = v
|
||||
}
|
||||
if v.Timestamp.Before(oldest.Timestamp) {
|
||||
oldest = v
|
||||
}
|
||||
}
|
||||
|
||||
return latest.Timestamp.Sub(oldest.Timestamp), nil
|
||||
}
|
||||
|
||||
// EnablePersistence 启用持久化
|
||||
func (e *BoltDBEngine) EnablePersistence(config PersistenceConfig) error {
|
||||
// BoltDB本身就是持久化的,所以这里不需要额外的操作
|
||||
return nil
|
||||
}
|
||||
|
||||
// BatchWrite 批量写入数据
|
||||
func (e *BoltDBEngine) BatchWrite(ctx context.Context, batch []struct {
|
||||
ID model.DataPointID
|
||||
Value model.DataValue
|
||||
}) error {
|
||||
for _, item := range batch {
|
||||
if err := e.Write(ctx, item.ID, item.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReadLatest 读取最新数据(GetLatest 的别名)
|
||||
func (e *BoltDBEngine) ReadLatest(ctx context.Context, id model.DataPointID) (model.DataValue, error) {
|
||||
return e.GetLatest(ctx, id)
|
||||
}
|
||||
|
||||
// ReadAll 读取所有数据(Read 的别名)
|
||||
func (e *BoltDBEngine) ReadAll(ctx context.Context, id model.DataPointID) ([]model.DataValue, error) {
|
||||
return e.Read(ctx, id)
|
||||
}
|
||||
|
||||
// ReadDuration 读取指定时间范围内的数据
|
||||
func (e *BoltDBEngine) ReadDuration(ctx context.Context, id model.DataPointID, from, to time.Time) ([]model.DataValue, error) {
|
||||
deviceKey := id.String()
|
||||
|
||||
// 从数据库读取所有数据
|
||||
values, err := e.Read(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 过滤出指定时间范围内的数据
|
||||
var filteredValues []model.DataValue
|
||||
for _, value := range values {
|
||||
if (value.Timestamp.Equal(from) || value.Timestamp.After(from)) &&
|
||||
(value.Timestamp.Equal(to) || value.Timestamp.Before(to)) {
|
||||
filteredValues = append(filteredValues, value)
|
||||
}
|
||||
}
|
||||
|
||||
return filteredValues, nil
|
||||
}
|
||||
|
||||
// Close 关闭存储引擎
|
||||
func (e *BoltDBEngine) Close() error {
|
||||
return e.db.Close()
|
||||
}
|
Loading…
Reference in New Issue