gotidb/pkg/engine/memory/memory.go

395 lines
8.4 KiB
Go

package memory
import (
"context"
"fmt"
"sync"
"time"
"git.pyer.club/kingecg/gotidb/pkg/engine"
)
// MemoryEngine 实现基于内存的存储引擎
type MemoryEngine struct {
mu sync.RWMutex
config *engine.MemoryEngineConfig
series map[string][]engine.DataPoint // seriesID -> 数据点列表
stats engine.EngineStats
opened bool
closed bool
}
// 确保 MemoryEngine 实现了 Engine 接口
var _ engine.Engine = (*MemoryEngine)(nil)
// NewMemoryEngine 创建一个新的内存引擎
func NewMemoryEngine(config *engine.MemoryEngineConfig) (*MemoryEngine, error) {
if config == nil {
return nil, fmt.Errorf("memory engine config cannot be nil")
}
return &MemoryEngine{
config: config,
series: make(map[string][]engine.DataPoint),
stats: engine.EngineStats{
LastWriteTime: time.Now(),
},
}, nil
}
// Open 实现 Engine 接口
func (m *MemoryEngine) Open() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.opened {
return fmt.Errorf("memory engine already opened")
}
if m.closed {
return fmt.Errorf("memory engine already closed")
}
m.opened = true
return nil
}
// Close 实现 Engine 接口
func (m *MemoryEngine) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
if !m.opened {
return fmt.Errorf("memory engine not opened")
}
if m.closed {
return fmt.Errorf("memory engine already closed")
}
// 清理资源
m.series = nil
m.closed = true
return nil
}
// Write 实现 Engine 接口
func (m *MemoryEngine) Write(ctx context.Context, points []engine.DataPoint) error {
if len(points) == 0 {
return nil
}
m.mu.Lock()
defer m.mu.Unlock()
if !m.opened || m.closed {
return fmt.Errorf("memory engine not open")
}
startTime := time.Now()
// 写入数据点
for _, point := range points {
seriesID := point.GetSeriesID()
if seriesID == "" {
m.stats.WriteErrors++
continue
}
// 获取或创建序列
seriesPoints, exists := m.series[seriesID]
if !exists {
seriesPoints = make([]engine.DataPoint, 0, m.config.MaxHistoryValues)
m.series[seriesID] = seriesPoints
}
// 添加数据点
m.series[seriesID] = append(m.series[seriesID], point)
// 如果超过最大历史值数量,删除最旧的数据点
if len(m.series[seriesID]) > m.config.MaxHistoryValues {
m.series[seriesID] = m.series[seriesID][1:]
}
}
// 更新统计信息
m.stats.PointsCount += int64(len(points))
m.stats.SeriesCount = int64(len(m.series))
m.stats.LastWriteTime = time.Now()
m.stats.WriteLatency = time.Since(startTime) / time.Duration(len(points))
return nil
}
// Query 实现 Engine 接口
func (m *MemoryEngine) Query(ctx context.Context, query engine.Query) (engine.QueryResult, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if !m.opened || m.closed {
return nil, fmt.Errorf("memory engine not open")
}
startTime := time.Now()
var result engine.QueryResult
// 根据查询类型执行不同的查询
switch query.Type {
case engine.QueryTypeLatest:
result = m.queryLatest(query)
case engine.QueryTypeRaw:
result = m.queryRaw(query)
case engine.QueryTypeAggregate:
result = m.queryAggregate(query)
case engine.QueryTypeValueDuration:
result = m.queryValueDuration(query)
default:
m.stats.QueryErrors++
return nil, fmt.Errorf("unsupported query type: %s", query.Type)
}
// 更新统计信息
m.stats.QueryLatency = time.Since(startTime)
return result, nil
}
// queryLatest 执行最新值查询
func (m *MemoryEngine) queryLatest(query engine.Query) engine.QueryResult {
var result engine.QueryResult
for seriesID, points := range m.series {
if len(points) == 0 {
continue
}
// 检查标签是否匹配
if !matchTags(points[0].Labels, query.Tags) {
continue
}
// 获取最新的数据点
latest := points[len(points)-1]
result = append(result, engine.SeriesResult{
SeriesID: seriesID,
Points: []engine.DataPoint{latest},
})
}
return result
}
// queryRaw 执行原始数据查询
func (m *MemoryEngine) queryRaw(query engine.Query) engine.QueryResult {
var result engine.QueryResult
for seriesID, points := range m.series {
if len(points) == 0 {
continue
}
// 检查标签是否匹配
if !matchTags(points[0].Labels, query.Tags) {
continue
}
var matchedPoints []engine.DataPoint
for _, point := range points {
if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime {
matchedPoints = append(matchedPoints, point)
}
}
if len(matchedPoints) > 0 {
// 应用限制
if query.Limit > 0 && len(matchedPoints) > query.Limit {
matchedPoints = matchedPoints[len(matchedPoints)-query.Limit:]
}
result = append(result, engine.SeriesResult{
SeriesID: seriesID,
Points: matchedPoints,
})
}
}
return result
}
// queryAggregate 执行聚合查询
func (m *MemoryEngine) queryAggregate(query engine.Query) engine.QueryResult {
var result engine.QueryResult
for seriesID, points := range m.series {
if len(points) == 0 {
continue
}
// 检查标签是否匹配
if !matchTags(points[0].Labels, query.Tags) {
continue
}
var matchedPoints []engine.DataPoint
for _, point := range points {
if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime {
matchedPoints = append(matchedPoints, point)
}
}
if len(matchedPoints) > 0 {
// 计算聚合值
aggregateValue := calculateAggregate(matchedPoints, query.AggregateType)
// 创建聚合结果点
aggregatePoint := engine.DataPoint{
Timestamp: query.EndTime,
Value: aggregateValue,
Labels: matchedPoints[0].Labels,
}
result = append(result, engine.SeriesResult{
SeriesID: seriesID,
Points: []engine.DataPoint{aggregatePoint},
})
}
}
return result
}
// queryValueDuration 执行值持续时间查询
func (m *MemoryEngine) queryValueDuration(query engine.Query) engine.QueryResult {
// 简化实现,实际应该计算每个值的持续时间
return m.queryRaw(query)
}
// Compact 实现 Engine 接口
func (m *MemoryEngine) Compact() error {
m.mu.Lock()
defer m.mu.Unlock()
if !m.opened || m.closed {
return fmt.Errorf("memory engine not open")
}
// 内存引擎的压缩操作:删除超过最大历史值数量的数据点
for seriesID, points := range m.series {
if len(points) > m.config.MaxHistoryValues {
m.series[seriesID] = points[len(points)-m.config.MaxHistoryValues:]
}
}
// 更新统计信息
m.stats.CompactionCount++
m.stats.LastCompaction = time.Now()
return nil
}
// Cleanup 实现 Engine 接口
func (m *MemoryEngine) Cleanup() error {
m.mu.Lock()
defer m.mu.Unlock()
if !m.opened || m.closed {
return fmt.Errorf("memory engine not open")
}
// 清理空序列
for seriesID, points := range m.series {
if len(points) == 0 {
delete(m.series, seriesID)
}
}
// 更新统计信息
m.stats.SeriesCount = int64(len(m.series))
return nil
}
// Stats 实现 Engine 接口
func (m *MemoryEngine) Stats() engine.EngineStats {
m.mu.RLock()
defer m.mu.RUnlock()
// 计算内存使用量(粗略估计)
var memoryUsage int64
for _, points := range m.series {
// 每个数据点大约占用的内存
const pointSize = 8 + 8 + 64 // timestamp + value + 估计的标签大小
memoryUsage += int64(len(points) * pointSize)
}
stats := m.stats
stats.MemoryUsage = memoryUsage
return stats
}
// matchTags 检查数据点的标签是否匹配查询标签
func matchTags(pointTags, queryTags map[string]string) bool {
for k, v := range queryTags {
if pointTags[k] != v {
return false
}
}
return true
}
// calculateAggregate 计算聚合值
func calculateAggregate(points []engine.DataPoint, aggregateType engine.AggregateType) float64 {
if len(points) == 0 {
return 0
}
switch aggregateType {
case engine.AggregateTypeAvg:
sum := 0.0
for _, p := range points {
sum += p.Value
}
return sum / float64(len(points))
case engine.AggregateTypeSum:
sum := 0.0
for _, p := range points {
sum += p.Value
}
return sum
case engine.AggregateTypeMin:
min := points[0].Value
for _, p := range points {
if p.Value < min {
min = p.Value
}
}
return min
case engine.AggregateTypeMax:
max := points[0].Value
for _, p := range points {
if p.Value > max {
max = p.Value
}
}
return max
case engine.AggregateTypeCount:
return float64(len(points))
default:
return 0
}
}
// 注册内存引擎创建函数
func init() {
engine.RegisterMemoryEngine(func(config *engine.MemoryEngineConfig) (engine.Engine, error) {
return NewMemoryEngine(config)
})
}