diff --git a/go.mod b/go.mod index 67686c5..eee35cb 100644 --- a/go.mod +++ b/go.mod @@ -9,9 +9,11 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/nats-io/nats.go v1.31.0 github.com/prometheus/client_golang v1.19.1 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 ) +require go.etcd.io/bbolt v1.4.1 // indirect + require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.10.2 // indirect @@ -51,8 +53,8 @@ require ( golang.org/x/crypto v0.26.0 // indirect golang.org/x/mod v0.18.0 // indirect golang.org/x/net v0.28.0 // indirect - golang.org/x/sync v0.8.0 // indirect - golang.org/x/sys v0.23.0 // indirect + golang.org/x/sync v0.10.0 // indirect + golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.17.0 // indirect golang.org/x/tools v0.22.0 // indirect google.golang.org/protobuf v1.33.0 // indirect diff --git a/go.sum b/go.sum index 99696e8..5df72a2 100644 --- a/go.sum +++ b/go.sum @@ -110,10 +110,13 @@ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +go.etcd.io/bbolt v1.4.1 h1:5mOV+HWjIPLEAlUGMsveaUvK2+byZMFOzojoi7bh7uI= +go.etcd.io/bbolt v1.4.1/go.mod h1:c8zu2BnXWTu2XM4XcICtbGSl9cFwsXtcf9zLt2OncM8= go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= @@ -127,11 +130,15 @@ golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA= diff --git a/pkg/storage/boltdb.go b/pkg/storage/boltdb.go new file mode 100644 index 0000000..0663217 --- /dev/null +++ b/pkg/storage/boltdb.go @@ -0,0 +1,322 @@ +package storage + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/model" + "go.etcd.io/bbolt" +) + +const ( + // PersistenceTypeBoltDB BoltDB持久化类型 + PersistenceTypeBoltDB PersistenceType = "boltdb" + + // 默认bucket名称 + devicesBucketName = "devices" + dataBucketName = "data" + metaBucketName = "meta" +) + +// BoltDBConfig BoltDB配置 +type BoltDBConfig struct { + FilePath string // 数据库文件路径 + BucketSize int // 数据分桶大小(默认30,与CircularBuffer大小一致) + Options *bbolt.Options // BoltDB选项 +} + +// BoltDBEngine BoltDB存储引擎 +type BoltDBEngine struct { + db *bbolt.DB + cache *sync.Map // 内存缓存 + config BoltDBConfig // 配置信息 + mu sync.RWMutex // 并发控制 +} + +// NewBoltDBEngine 创建一个新的BoltDB存储引擎 +func NewBoltDBEngine(config BoltDBConfig) (*BoltDBEngine, error) { + // 确保目录存在 + if err := os.MkdirAll(filepath.Dir(config.FilePath), 0755); err != nil { + return nil, fmt.Errorf("failed to create database directory: %v", err) + } + + // 设置默认值 + if config.BucketSize == 0 { + config.BucketSize = 30 // 与CircularBuffer大小一致 + } + + // 打开数据库 + db, err := bbolt.Open(config.FilePath, 0644, config.Options) + if err != nil { + return nil, fmt.Errorf("failed to open database: %v", err) + } + + // 初始化根bucket + err = db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte(devicesBucketName)) + return err + }) + if err != nil { + db.Close() + return nil, fmt.Errorf("failed to create root bucket: %v", err) + } + + return &BoltDBEngine{ + db: db, + cache: &sync.Map{}, + config: config, + }, nil +} + +// Write 写入数据 +func (e *BoltDBEngine) Write(ctx context.Context, id model.DataPointID, value model.DataValue) error { + // 获取设备bucket路径 + deviceKey := id.String() + + // 序列化数据 + data, err := json.Marshal(value) + if err != nil { + return fmt.Errorf("failed to marshal value: %v", err) + } + + // 写入数据库 + err = e.db.Update(func(tx *bbolt.Tx) error { + // 获取或创建设备bucket + devices := tx.Bucket([]byte(devicesBucketName)) + if devices == nil { + return fmt.Errorf("devices bucket not found") + } + + deviceBucket, err := devices.CreateBucketIfNotExists([]byte(deviceKey)) + if err != nil { + return fmt.Errorf("failed to create device bucket: %v", err) + } + + // 获取或创建数据bucket + dataBucket, err := deviceBucket.CreateBucketIfNotExists([]byte(dataBucketName)) + if err != nil { + return fmt.Errorf("failed to create data bucket: %v", err) + } + + // 获取或创建元数据bucket + metaBucket, err := deviceBucket.CreateBucketIfNotExists([]byte(metaBucketName)) + if err != nil { + return fmt.Errorf("failed to create meta bucket: %v", err) + } + + // 写入数据 + timestamp := []byte(value.Timestamp.Format(time.RFC3339Nano)) + if err := dataBucket.Put(timestamp, data); err != nil { + return fmt.Errorf("failed to write data: %v", err) + } + + // 更新最新值 + if err := metaBucket.Put([]byte("latest"), data); err != nil { + return fmt.Errorf("failed to update latest value: %v", err) + } + + return nil + }) + + if err != nil { + return err + } + + // 更新缓存 + if buffer, ok := e.cache.Load(deviceKey); ok { + buffer.(*model.CircularBuffer).Write(value) + } + + return nil +} + +// Read 读取数据 +func (e *BoltDBEngine) Read(ctx context.Context, id model.DataPointID) ([]model.DataValue, error) { + deviceKey := id.String() + + // 尝试从缓存读取 + if buffer, ok := e.cache.Load(deviceKey); ok { + return buffer.(*model.CircularBuffer).Read(), nil + } + + // 从数据库读取 + var values []model.DataValue + err := e.db.View(func(tx *bbolt.Tx) error { + devices := tx.Bucket([]byte(devicesBucketName)) + if devices == nil { + return nil // 没有数据 + } + + deviceBucket := devices.Bucket([]byte(deviceKey)) + if deviceBucket == nil { + return nil // 没有数据 + } + + dataBucket := deviceBucket.Bucket([]byte(dataBucketName)) + if dataBucket == nil { + return nil // 没有数据 + } + + // 遍历所有数据 + return dataBucket.ForEach(func(k, v []byte) error { + var value model.DataValue + if err := json.Unmarshal(v, &value); err != nil { + return fmt.Errorf("failed to unmarshal value: %v", err) + } + values = append(values, value) + return nil + }) + }) + + if err != nil { + return nil, err + } + + // 创建并填充缓存 + buffer := model.NewCircularBuffer() + for _, value := range values { + buffer.Write(value) + } + e.cache.Store(deviceKey, buffer) + + return values, nil +} + +// GetLatest 获取最新数据 +func (e *BoltDBEngine) GetLatest(ctx context.Context, id model.DataPointID) (model.DataValue, error) { + deviceKey := id.String() + + // 尝试从缓存读取 + if buffer, ok := e.cache.Load(deviceKey); ok { + if value, exists := buffer.(*model.CircularBuffer).GetLatest(); exists { + return value, nil + } + } + + // 从数据库读取 + var latest model.DataValue + err := e.db.View(func(tx *bbolt.Tx) error { + devices := tx.Bucket([]byte(devicesBucketName)) + if devices == nil { + return nil + } + + deviceBucket := devices.Bucket([]byte(deviceKey)) + if deviceBucket == nil { + return nil + } + + metaBucket := deviceBucket.Bucket([]byte(metaBucketName)) + if metaBucket == nil { + return nil + } + + data := metaBucket.Get([]byte("latest")) + if data == nil { + return nil + } + + return json.Unmarshal(data, &latest) + }) + + if err != nil { + return model.DataValue{}, err + } + + return latest, nil +} + +// GetDuration 获取持续时间 +func (e *BoltDBEngine) GetDuration(ctx context.Context, id model.DataPointID) (time.Duration, error) { + deviceKey := id.String() + + // 尝试从缓存读取 + if buffer, ok := e.cache.Load(deviceKey); ok { + return buffer.(*model.CircularBuffer).GetDuration(), nil + } + + // 从数据库读取所有数据以计算持续时间 + values, err := e.Read(ctx, id) + if err != nil { + return 0, err + } + + if len(values) == 0 { + return 0, nil + } + + // 找到最新和最旧的值 + latest := values[0] + oldest := values[0] + for _, v := range values[1:] { + if v.Timestamp.After(latest.Timestamp) { + latest = v + } + if v.Timestamp.Before(oldest.Timestamp) { + oldest = v + } + } + + return latest.Timestamp.Sub(oldest.Timestamp), nil +} + +// EnablePersistence 启用持久化 +func (e *BoltDBEngine) EnablePersistence(config PersistenceConfig) error { + // BoltDB本身就是持久化的,所以这里不需要额外的操作 + return nil +} + +// BatchWrite 批量写入数据 +func (e *BoltDBEngine) BatchWrite(ctx context.Context, batch []struct { + ID model.DataPointID + Value model.DataValue +}) error { + for _, item := range batch { + if err := e.Write(ctx, item.ID, item.Value); err != nil { + return err + } + } + return nil +} + +// ReadLatest 读取最新数据(GetLatest 的别名) +func (e *BoltDBEngine) ReadLatest(ctx context.Context, id model.DataPointID) (model.DataValue, error) { + return e.GetLatest(ctx, id) +} + +// ReadAll 读取所有数据(Read 的别名) +func (e *BoltDBEngine) ReadAll(ctx context.Context, id model.DataPointID) ([]model.DataValue, error) { + return e.Read(ctx, id) +} + +// ReadDuration 读取指定时间范围内的数据 +func (e *BoltDBEngine) ReadDuration(ctx context.Context, id model.DataPointID, from, to time.Time) ([]model.DataValue, error) { + deviceKey := id.String() + + // 从数据库读取所有数据 + values, err := e.Read(ctx, id) + if err != nil { + return nil, err + } + + // 过滤出指定时间范围内的数据 + var filteredValues []model.DataValue + for _, value := range values { + if (value.Timestamp.Equal(from) || value.Timestamp.After(from)) && + (value.Timestamp.Equal(to) || value.Timestamp.Before(to)) { + filteredValues = append(filteredValues, value) + } + } + + return filteredValues, nil +} + +// Close 关闭存储引擎 +func (e *BoltDBEngine) Close() error { + return e.db.Close() +}