gotidb/pkg/manager/datamanager.go

126 lines
3.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package manager
import (
"context"
"errors"
"sync"
"time"
"git.pyer.club/kingecg/gotidb/pkg/model"
"git.pyer.club/kingecg/gotidb/pkg/storage"
)
// DataChangeCallback 数据变更回调函数类型
type DataChangeCallback func(id model.DataPointID, value model.DataValue)
// DataManager 数据管理器
type DataManager struct {
engine storage.StorageEngine
callbacks []DataChangeCallback
callbacksLock sync.RWMutex
}
// NewDataManager 创建一个新的数据管理器
func NewDataManager(engine storage.StorageEngine) *DataManager {
return &DataManager{
engine: engine,
callbacks: make([]DataChangeCallback, 0),
}
}
// Write 写入数据
func (m *DataManager) Write(ctx context.Context, id model.DataPointID, value model.DataValue) error {
// 写入存储引擎
if err := m.engine.Write(ctx, id, value); err != nil {
return err
}
// 触发回调
m.callbacksLock.RLock()
callbacks := m.callbacks
m.callbacksLock.RUnlock()
for _, callback := range callbacks {
callback(id, value)
}
return nil
}
// BatchWrite 批量写入数据
func (m *DataManager) BatchWrite(ctx context.Context, batch []struct {
ID model.DataPointID
Value interface{}
}, timestamp time.Time) error {
for _, item := range batch {
value := model.DataValue{
Timestamp: timestamp,
Value: item.Value,
}
if err := m.Write(ctx, item.ID, value); err != nil {
return err
}
}
return nil
}
// Query 执行查询ExecuteQuery的别名
func (m *DataManager) Query(ctx context.Context, id model.DataPointID, query model.Query) (model.Result, error) {
return m.ExecuteQuery(ctx, id, query)
}
// RegisterCallback 注册数据变更回调
func (m *DataManager) RegisterCallback(callback DataChangeCallback) {
m.callbacksLock.Lock()
defer m.callbacksLock.Unlock()
m.callbacks = append(m.callbacks, callback)
}
// ExecuteQuery 执行查询
func (m *DataManager) ExecuteQuery(ctx context.Context, id model.DataPointID, query model.Query) (model.Result, error) {
switch query.Type() {
case model.QueryTypeLatest:
value, err := m.engine.GetLatest(ctx, id)
if err != nil {
return nil, err
}
return model.NewLatestResult(value), nil
case model.QueryTypeAll:
values, err := m.engine.Read(ctx, id)
if err != nil {
return nil, err
}
return model.NewAllResult(values), nil
case model.QueryTypeDuration:
duration, err := m.engine.GetDuration(ctx, id)
if err != nil {
return nil, err
}
return model.NewDurationResult(duration), nil
default:
return nil, errors.New("unsupported query type")
}
}
// Close 关闭数据管理器
func (m *DataManager) Close() error {
return m.engine.Close()
}
// EnablePersistence 启用持久化
func (m *DataManager) EnablePersistence(config storage.PersistenceConfig) error {
return m.engine.EnablePersistence(config)
}
// CreateDataPoint 创建一个新的数据点ID
func CreateDataPoint(deviceID, metricCode string, labels map[string]string, value interface{}) model.DataPointID {
return model.DataPointID{
DeviceID: deviceID,
MetricCode: metricCode,
Labels: labels,
}
}