diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..f980ab9 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,7 @@ +{ + // 使用 IntelliSense 了解相关属性。 + // 悬停以查看现有属性的描述。 + // 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [] +} \ No newline at end of file diff --git a/docs/design/write_buffer_implementation.md b/docs/design/write_buffer_implementation.md new file mode 100644 index 0000000..51afae7 --- /dev/null +++ b/docs/design/write_buffer_implementation.md @@ -0,0 +1,402 @@ +# WriteBufferHandler 接口实现设计方案 + +本文档描述了 Bolt 和 Memory 引擎如何实现 `WriteBufferHandler` 接口的设计方案,以提高时序数据库的写入性能。 + +## 背景 + +当前的时序数据库实现中,`WriteBufferHandler` 接口和 `WriteBuffer` 结构体已经定义,但 Bolt 和 Memory 引擎尚未实现该接口。通过实现这个接口,我们可以利用缓冲机制来提高写入性能,特别是在高并发场景下。 + +## 设计目标 + +1. 实现 `WriteBufferHandler` 接口,包括 `WriteToBuffer`、`FlushBuffer` 和 `ValidatePoint` 方法 +2. 优化批量写入性能,减少 I/O 操作 +3. 确保数据一致性和错误处理 +4. 支持高并发写入(>500K ops/sec) + +## 1. Bolt 引擎实现 + +Bolt 是一个基于文件的键值存储,通常使用事务来管理写入操作。以下是 Bolt 引擎实现 `WriteBufferHandler` 的方案: + +```go +// pkg/engine/bolt/bolt_write.go + +package bolt + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "sync" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" + bolt "go.etcd.io/bbolt" +) + +// 确保 BoltEngine 实现了 WriteBufferHandler 接口 +var _ engine.WriteBufferHandler = (*BoltEngine)(nil) + +// 临时存储结构,用于在事务提交前缓存数据点 +type writeCache struct { + points map[string][]engine.DataPoint // 按序列ID分组的数据点 + mu sync.Mutex +} + +// 初始化写缓存 +func (b *BoltEngine) initWriteCache() { + b.writeCache = &writeCache{ + points: make(map[string][]engine.DataPoint), + } +} + +// WriteToBuffer 实现 WriteBufferHandler 接口 +// 将数据点添加到临时缓存,不立即写入数据库 +func (b *BoltEngine) WriteToBuffer(point engine.DataPoint) error { + // 验证引擎状态 + if !b.opened || b.closed { + return fmt.Errorf("bolt engine not open") + } + + // 验证数据点 + if err := b.validateDataPoint(point); err != nil { + return err + } + + // 获取序列ID + seriesID := point.GetSeriesID() + + // 添加到临时缓存 + b.writeCache.mu.Lock() + defer b.writeCache.mu.Unlock() + + if b.writeCache.points == nil { + b.writeCache.points = make(map[string][]engine.DataPoint) + } + + b.writeCache.points[seriesID] = append(b.writeCache.points[seriesID], point) + + return nil +} + +// FlushBuffer 实现 WriteBufferHandler 接口 +// 将临时缓存中的数据点写入数据库 +func (b *BoltEngine) FlushBuffer() error { + // 验证引擎状态 + if !b.opened || b.closed { + return fmt.Errorf("bolt engine not open") + } + + // 获取并清空临时缓存 + b.writeCache.mu.Lock() + points := b.writeCache.points + b.writeCache.points = make(map[string][]engine.DataPoint) + b.writeCache.mu.Unlock() + + // 如果没有数据点,直接返回 + if len(points) == 0 { + return nil + } + + // 开始写入事务 + return b.db.Update(func(tx *bolt.Tx) error { + // 获取或创建索引桶 + indexBucket, err := tx.CreateBucketIfNotExists([]byte(indexBucketName)) + if err != nil { + return fmt.Errorf("failed to create index bucket: %v", err) + } + + // 按序列处理数据点 + for seriesID, seriesPoints := range points { + // 获取或创建序列桶 + bucketName := seriesBucketPrefix + seriesID + bucket, err := tx.CreateBucketIfNotExists([]byte(bucketName)) + if err != nil { + return fmt.Errorf("failed to create series bucket: %v", err) + } + + // 更新索引 + if err := indexBucket.Put([]byte(seriesID), []byte{1}); err != nil { + return fmt.Errorf("failed to update index: %v", err) + } + + // 写入数据点 + for _, point := range seriesPoints { + // 序列化数据点 + data, err := json.Marshal(point) + if err != nil { + return fmt.Errorf("failed to marshal data point: %v", err) + } + + // 使用时间戳作为键 + key := make([]byte, 8) + binary.BigEndian.PutUint64(key, uint64(point.Timestamp)) + + // 写入数据 + if err := bucket.Put(key, data); err != nil { + return fmt.Errorf("failed to write data point: %v", err) + } + + // 更新统计信息 + b.stats.PointsCount++ + } + } + + // 更新统计信息 + b.stats.LastWriteTime = time.Now() + + return nil + }) +} + +// ValidatePoint 实现 WriteBufferHandler 接口 +// 验证数据点是否可以写入,但不实际写入 +func (b *BoltEngine) ValidatePoint(point engine.DataPoint) error { + // 验证引擎状态 + if !b.opened || b.closed { + return fmt.Errorf("bolt engine not open") + } + + // 验证数据点 + return b.validateDataPoint(point) +} + +// validateDataPoint 验证数据点 +func (b *BoltEngine) validateDataPoint(point engine.DataPoint) error { + // 验证时间戳 + if point.Timestamp <= 0 { + return fmt.Errorf("invalid timestamp: %d", point.Timestamp) + } + + // 验证标签 + if len(point.Labels) == 0 { + return fmt.Errorf("data point must have at least one label") + } + + return nil +} +``` + +## 2. Memory 引擎实现 + +Memory 引擎将数据存储在内存中,通常使用 map 和锁来管理数据。以下是 Memory 引擎实现 `WriteBufferHandler` 的方案: + +```go +// pkg/engine/memory/memory_write.go + +package memory + +import ( + "fmt" + "math" + "sync" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" +) + +// 确保 MemoryEngine 实现了 WriteBufferHandler 接口 +var _ engine.WriteBufferHandler = (*MemoryEngine)(nil) + +// 临时写缓存 +type memWriteCache struct { + points map[string][]engine.DataPoint + mu sync.Mutex +} + +// 初始化写缓存 +func (m *MemoryEngine) initWriteCache() { + m.writeCache = &memWriteCache{ + points: make(map[string][]engine.DataPoint), + } +} + +// WriteToBuffer 实现 WriteBufferHandler 接口 +// 将数据点添加到临时缓存 +func (m *MemoryEngine) WriteToBuffer(point engine.DataPoint) error { + // 验证引擎状态 + if !m.opened || m.closed { + return fmt.Errorf("memory engine not open") + } + + // 验证数据点 + if err := m.validateDataPoint(point); err != nil { + return err + } + + // 获取序列ID + seriesID := point.GetSeriesID() + + // 添加到临时缓存 + m.writeCache.mu.Lock() + defer m.writeCache.mu.Unlock() + + if m.writeCache.points == nil { + m.writeCache.points = make(map[string][]engine.DataPoint) + } + + m.writeCache.points[seriesID] = append(m.writeCache.points[seriesID], point) + + return nil +} + +// FlushBuffer 实现 WriteBufferHandler 接口 +// 将临时缓存中的数据点写入内存存储 +func (m *MemoryEngine) FlushBuffer() error { + // 验证引擎状态 + if !m.opened || m.closed { + return fmt.Errorf("memory engine not open") + } + + // 获取并清空临时缓存 + m.writeCache.mu.Lock() + points := m.writeCache.points + m.writeCache.points = make(map[string][]engine.DataPoint) + m.writeCache.mu.Unlock() + + // 如果没有数据点,直接返回 + if len(points) == 0 { + return nil + } + + // 写入数据 + m.mu.Lock() + defer m.mu.Unlock() + + for seriesID, seriesPoints := range points { + // 获取或创建序列 + series, exists := m.series[seriesID] + if !exists { + series = &memorySeries{ + id: seriesID, + points: make(map[int64]engine.DataPoint), + } + m.series[seriesID] = series + m.stats.SeriesCount++ + } + + // 写入数据点 + for _, point := range seriesPoints { + // 在内存引擎中实现环形队列覆盖机制 + // 如果序列中的数据点数量达到限制,删除最旧的数据点 + if m.maxPointsPerSeries > 0 && len(series.points) >= m.maxPointsPerSeries { + // 找到最旧的时间戳 + var oldestTimestamp int64 = math.MaxInt64 + for ts := range series.points { + if ts < oldestTimestamp { + oldestTimestamp = ts + } + } + // 删除最旧的数据点 + delete(series.points, oldestTimestamp) + } + + // 添加新数据点 + series.points[point.Timestamp] = point + m.stats.PointsCount++ + } + } + + // 更新统计信息 + m.stats.LastWriteTime = time.Now() + + return nil +} + +// ValidatePoint 实现 WriteBufferHandler 接口 +// 验证数据点是否可以写入,但不实际写入 +func (m *MemoryEngine) ValidatePoint(point engine.DataPoint) error { + // 验证引擎状态 + if !m.opened || m.closed { + return fmt.Errorf("memory engine not open") + } + + // 验证数据点 + return m.validateDataPoint(point) +} + +// validateDataPoint 验证数据点 +func (m *MemoryEngine) validateDataPoint(point engine.DataPoint) error { + // 验证时间戳 + if point.Timestamp <= 0 { + return fmt.Errorf("invalid timestamp: %d", point.Timestamp) + } + + // 验证标签 + if len(point.Labels) == 0 { + return fmt.Errorf("data point must have at least one label") + } + + return nil +} +``` + +## 3. 集成到引擎的 Write 方法中 + +最后,我们需要修改引擎的 `Write` 方法,使用 `WriteBuffer` 来处理批量写入: + +```go +// 以 Bolt 引擎为例 +func (b *BoltEngine) Write(ctx context.Context, points []DataPoint) error { + // 验证引擎状态 + if !b.opened || b.closed { + return fmt.Errorf("bolt engine not open") + } + + // 创建写缓冲区 + // 缓冲区大小可以根据性能测试调整 + buffer := engine.NewWriteBuffer(b, 1000) + + // 写入数据点 + for _, point := range points { + if err := buffer.Write(point); err != nil { + return fmt.Errorf("failed to write data point: %v", err) + } + } + + // 刷新缓冲区,确保所有数据都被写入 + if err := buffer.Flush(); err != nil { + return fmt.Errorf("failed to flush buffer: %v", err) + } + + return nil +} +``` + +## 4. 初始化和清理 + +在引擎的 `Open` 和 `Close` 方法中,我们需要初始化和清理写缓存: + +```go +// 以 Bolt 引擎为例 +func (b *BoltEngine) Open() error { + // 现有的打开逻辑... + + // 初始化写缓存 + b.initWriteCache() + + return nil +} + +func (b *BoltEngine) Close() error { + // 现有的关闭逻辑... + + // 清理写缓存 + b.writeCache = nil + + return nil +} +``` + +## 5. 性能优化建议 + +1. **批量大小调优**:根据实际工作负载调整 `WriteBuffer` 的大小 +2. **并发控制**:使用细粒度锁减少锁竞争 +3. **内存管理**:对于 Memory 引擎,实现数据点过期和清理策略 +4. **监控指标**:添加缓冲区性能指标(如缓冲区命中率、平均批量大小等) + +## 6. 实现注意事项 + +1. **错误处理**:确保在出错时正确清理资源 +2. **事务管理**:对于 Bolt 引擎,确保事务正确提交或回滚 +3. **并发安全**:确保所有操作都是线程安全的 +4. **内存泄 \ No newline at end of file diff --git a/docs/requirement/require.md b/docs/requirement/require.md index 4c342dd..08976ba 100644 --- a/docs/requirement/require.md +++ b/docs/requirement/require.md @@ -1,7 +1,7 @@ 时序数据库需求的总结及推荐类库 一、核心需求总结 1.数据存储逻辑 -每个数据点仅保留最近30次变化值(环形队列覆盖机制) +每个数据点仅保留最近30次变化值(环形队列覆盖机制,只在内存引擎实现这个特性) 精确追踪当前值的持续时间(记录值变更的起始时间戳) 2.读写性能要求 高速内存处理:微秒级写入延迟,支持高并发(>500K ops/sec) diff --git a/examples/engine/main.go b/examples/engine/main.go deleted file mode 100644 index 76d7737..0000000 --- a/examples/engine/main.go +++ /dev/null @@ -1,156 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "time" - - "git.pyer.club/kingecg/gotidb/pkg/engine" - "git.pyer.club/kingecg/gotidb/pkg/engine/memory" -) - -func main() { - // 创建引擎注册表 - registry := engine.NewEngineRegistry() - - // 注册内存引擎 - memory.Register(registry) - - // 创建引擎配置 - config := engine.NewEngineConfig(). - WithMaxRetention(24 * time.Hour). - WithMaxPoints(1000) - - // 创建内存引擎实例 - eng, err := registry.Create("memory", config) - if err != nil { - log.Fatal("Failed to create engine:", err) - } - - // 打开引擎 - if err := eng.Open(); err != nil { - log.Fatal("Failed to open engine:", err) - } - defer eng.Close() - - // 写入一些测试数据 - deviceID := "device001" - metricCode := "temperature" - now := time.Now() - - // 写入单个数据点 - point := engine.DataPoint{ - DeviceID: deviceID, - MetricCode: metricCode, - Labels: map[string]string{ - "location": "room1", - "floor": "1st", - }, - Value: 25.5, - Timestamp: now.UnixNano(), - } - - if err := eng.WritePoint(context.Background(), point); err != nil { - log.Fatal("Failed to write point:", err) - } - - // 写入一批数据点 - var points []engine.DataPoint - for i := 0; i < 10; i++ { - points = append(points, engine.DataPoint{ - DeviceID: deviceID, - MetricCode: metricCode, - Labels: map[string]string{ - "location": "room1", - "floor": "1st", - }, - Value: 25.5 + float64(i), - Timestamp: now.Add(time.Duration(i) * time.Second).UnixNano(), - }) - } - - if err := eng.WriteBatch(context.Background(), points); err != nil { - log.Fatal("Failed to write batch:", err) - } - - // 查询最新数据 - latestQuery := engine.NewQueryBuilder(). - ForMetric(metricCode). - WithTag("location", engine.OpEqual, "room1"). - Build() - latestQuery.Type = engine.QueryTypeLatest - - result, err := eng.Query(context.Background(), latestQuery) - if err != nil { - log.Fatal("Failed to query latest data:", err) - } - - if tsResult, ok := result.(*engine.TimeSeriesResult); ok { - fmt.Println("\nLatest data:") - for _, p := range tsResult.Points { - fmt.Printf("Time: %v, Value: %.2f\n", - time.Unix(0, p.Timestamp).Format(time.RFC3339), - p.Value) - } - } - - // 查询原始数据 - rawQuery := engine.NewQueryBuilder(). - ForMetric(metricCode). - WithTimeRange(now.Add(-1*time.Hour).UnixNano(), now.UnixNano()). - WithTag("location", engine.OpEqual, "room1"). - Build() - - result, err = eng.Query(context.Background(), rawQuery) - if err != nil { - log.Fatal("Failed to query raw data:", err) - } - - if tsResult, ok := result.(*engine.TimeSeriesResult); ok { - fmt.Println("\nRaw data:") - for _, p := range tsResult.Points { - fmt.Printf("Time: %v, Value: %.2f\n", - time.Unix(0, p.Timestamp).Format(time.RFC3339), - p.Value) - } - } - - // 查询聚合数据 - aggQuery := engine.NewQueryBuilder(). - ForMetric(metricCode). - WithTimeRange(now.Add(-1*time.Hour).UnixNano(), now.UnixNano()). - WithTag("location", engine.OpEqual, "room1"). - WithAggregation(engine.AggAvg, 5*time.Minute). - Build() - - result, err = eng.Query(context.Background(), aggQuery) - if err != nil { - log.Fatal("Failed to query aggregate data:", err) - } - - if aggResult, ok := result.(*engine.AggregateResult); ok { - fmt.Println("\nAggregate data (5-minute averages):") - for _, g := range aggResult.Groups { - fmt.Printf("Time range: %v - %v, Average: %.2f, Count: %d\n", - time.Unix(0, g.StartTime).Format(time.RFC3339), - time.Unix(0, g.EndTime).Format(time.RFC3339), - g.Value, - g.Count) - } - } - - // 打印引擎统计信息 - stats := eng.Stats() - fmt.Printf("\nEngine stats:\n") - fmt.Printf("Points count: %d\n", stats.PointsCount) - fmt.Printf("Last write time: %v\n", stats.LastWriteTime.Format(time.RFC3339)) - - // 打印引擎能力 - caps := eng.Capabilities() - fmt.Printf("\nEngine capabilities:\n") - fmt.Printf("Supports compression: %v\n", caps.SupportsCompression) - fmt.Printf("Supports persistence: %v\n", caps.SupportsPersistence) - fmt.Printf("Supports replication: %v\n", caps.SupportsReplication) - fmt.Printf("Max concurrent writes: %d\n", caps.MaxConcurrentWrites) -} diff --git a/examples/engine/multi_engine.go b/examples/engine/multi_engine.go deleted file mode 100644 index 14f8b76..0000000 --- a/examples/engine/multi_engine.go +++ /dev/null @@ -1,263 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "os" - "time" - - "git.pyer.club/kingecg/gotidb/pkg/engine" - "git.pyer.club/kingecg/gotidb/pkg/engine/file" - "git.pyer.club/kingecg/gotidb/pkg/engine/memory" -) - -func main() { - // 创建引擎注册表 - registry := engine.NewEngineRegistry() - - // 注册内存引擎和文件引擎 - memory.Register(registry) - file.Register(registry) - - // 创建临时目录用于文件引擎 - tempDir, err := os.MkdirTemp("", "gotidb_example_*") - if err != nil { - log.Fatal("Failed to create temp dir:", err) - } - defer os.RemoveAll(tempDir) - - // 创建内存引擎配置 - memConfig := engine.NewEngineConfig(). - WithMaxRetention(24 * time.Hour). - WithMaxPoints(1000) - - // 创建文件引擎配置 - fileConfig := engine.NewEngineConfig() - fileConfig.SetFileConfig(&engine.FileEngineConfig{ - DataDir: tempDir, - SegmentSize: 1024 * 1024, // 1MB - CompactWindow: time.Hour, - MaxSegments: 10, - UseCompression: true, - CompressionLevel: 6, - }) - - // 创建内存引擎和文件引擎实例 - memEng, err := registry.Create("memory", memConfig) - if err != nil { - log.Fatal("Failed to create memory engine:", err) - } - - fileEng, err := registry.Create("file", fileConfig) - if err != nil { - log.Fatal("Failed to create file engine:", err) - } - - // 打开引擎 - if err := memEng.Open(); err != nil { - log.Fatal("Failed to open memory engine:", err) - } - defer memEng.Close() - - if err := fileEng.Open(); err != nil { - log.Fatal("Failed to open file engine:", err) - } - defer fileEng.Close() - - // 演示不同场景下的引擎使用 - - // 场景1:高频写入,短期存储 - 使用内存引擎 - fmt.Println("\n=== 场景1:高频写入,短期存储(内存引擎)===") - demoHighFrequencyWrites(memEng) - - // 场景2:长期存储,历史数据查询 - 使用文件引擎 - fmt.Println("\n=== 场景2:长期存储,历史数据查询(文件引擎)===") - demoHistoricalData(fileEng) - - // 场景3:聚合查询性能对比 - fmt.Println("\n=== 场景3:聚合查询性能对比 ===") - demoAggregationComparison(memEng, fileEng) - - // 打印引擎统计信息 - printEngineStats("Memory Engine", memEng) - printEngineStats("File Engine", fileEng) -} - -// 演示高频写入场景 -func demoHighFrequencyWrites(eng engine.Engine) { - start := time.Now() - count := 1000 - - // 批量写入数据 - var points []engine.DataPoint - for i := 0; i < count; i++ { - points = append(points, engine.DataPoint{ - DeviceID: "sensor001", - MetricCode: "temperature", - Labels: map[string]string{ - "location": "room1", - "floor": "1st", - }, - Value: 25.5 + float64(i%10), - Timestamp: time.Now().Add(time.Duration(i) * time.Millisecond).UnixNano(), - }) - } - - if err := eng.WriteBatch(context.Background(), points); err != nil { - log.Printf("Failed to write batch: %v", err) - return - } - - duration := time.Since(start) - fmt.Printf("写入 %d 个数据点耗时: %v (%.2f points/sec)\n", - count, duration, float64(count)/duration.Seconds()) - - // 查询最新数据 - query := engine.NewQueryBuilder(). - ForMetric("temperature"). - WithTag("location", engine.OpEqual, "room1"). - Build() - query.Type = engine.QueryTypeLatest - - result, err := eng.Query(context.Background(), query) - if err != nil { - log.Printf("Failed to query latest data: %v", err) - return - } - - if tsResult, ok := result.(*engine.TimeSeriesResult); ok { - fmt.Printf("最新数据点: %.2f (时间: %v)\n", - tsResult.Points[0].Value, - time.Unix(0, tsResult.Points[0].Timestamp).Format(time.RFC3339)) - } -} - -// 演示历史数据存储和查询场景 -func demoHistoricalData(eng engine.Engine) { - // 写入跨越多个时间段的数据 - now := time.Now() - var points []engine.DataPoint - for i := 0; i < 24; i++ { - points = append(points, engine.DataPoint{ - DeviceID: "sensor002", - MetricCode: "power", - Labels: map[string]string{ - "device": "solar_panel", - "unit": "watts", - }, - Value: 100 + float64(i*50), - Timestamp: now.Add(time.Duration(-i) * time.Hour).UnixNano(), - }) - } - - if err := eng.WriteBatch(context.Background(), points); err != nil { - log.Printf("Failed to write historical data: %v", err) - return - } - - // 查询24小时内的数据 - query := engine.NewQueryBuilder(). - ForMetric("power"). - WithTimeRange(now.Add(-24*time.Hour).UnixNano(), now.UnixNano()). - WithTag("device", engine.OpEqual, "solar_panel"). - Build() - - result, err := eng.Query(context.Background(), query) - if err != nil { - log.Printf("Failed to query historical data: %v", err) - return - } - - if tsResult, ok := result.(*engine.TimeSeriesResult); ok { - fmt.Printf("24小时内的数据点数量: %d\n", len(tsResult.Points)) - if len(tsResult.Points) > 0 { - fmt.Printf("最早数据点: %.2f (时间: %v)\n", - tsResult.Points[0].Value, - time.Unix(0, tsResult.Points[0].Timestamp).Format(time.RFC3339)) - fmt.Printf("最新数据点: %.2f (时间: %v)\n", - tsResult.Points[len(tsResult.Points)-1].Value, - time.Unix(0, tsResult.Points[len(tsResult.Points)-1].Timestamp).Format(time.RFC3339)) - } - } -} - -// 演示聚合查询性能对比 -func demoAggregationComparison(memEng, fileEng engine.Engine) { - // 准备测试数据 - now := time.Now() - var points []engine.DataPoint - for i := 0; i < 1000; i++ { - points = append(points, engine.DataPoint{ - DeviceID: "sensor003", - MetricCode: "cpu_usage", - Labels: map[string]string{ - "host": "server1", - }, - Value: float64(30 + (i % 40)), - Timestamp: now.Add(time.Duration(-i) * time.Minute).UnixNano(), - }) - } - - // 写入两个引擎 - if err := memEng.WriteBatch(context.Background(), points); err != nil { - log.Printf("Failed to write to memory engine: %v", err) - return - } - - if err := fileEng.WriteBatch(context.Background(), points); err != nil { - log.Printf("Failed to write to file engine: %v", err) - return - } - - // 创建聚合查询 - query := engine.NewQueryBuilder(). - ForMetric("cpu_usage"). - WithTimeRange(now.Add(-24*time.Hour).UnixNano(), now.UnixNano()). - WithTag("host", engine.OpEqual, "server1"). - WithAggregation(engine.AggAvg, 1*time.Hour). - Build() - - // 测试内存引擎聚合性能 - memStart := time.Now() - memResult, err := memEng.Query(context.Background(), query) - if err != nil { - log.Printf("Memory engine aggregation failed: %v", err) - return - } - memDuration := time.Since(memStart) - - // 测试文件引擎聚合性能 - fileStart := time.Now() - fileResult, err := fileEng.Query(context.Background(), query) - if err != nil { - log.Printf("File engine aggregation failed: %v", err) - return - } - fileDuration := time.Since(fileStart) - - // 打印性能对比 - fmt.Printf("内存引擎聚合查询耗时: %v\n", memDuration) - fmt.Printf("文件引擎聚合查询耗时: %v\n", fileDuration) - - if memAgg, ok := memResult.(*engine.AggregateResult); ok { - fmt.Printf("内存引擎聚合组数: %d\n", len(memAgg.Groups)) - } - if fileAgg, ok := fileResult.(*engine.AggregateResult); ok { - fmt.Printf("文件引擎聚合组数: %d\n", len(fileAgg.Groups)) - } -} - -// 打印引擎统计信息 -func printEngineStats(name string, eng engine.Engine) { - stats := eng.Stats() - caps := eng.Capabilities() - - fmt.Printf("\n=== %s 统计信息 ===\n", name) - fmt.Printf("数据点总数: %d\n", stats.PointsCount) - fmt.Printf("最后写入时间: %v\n", stats.LastWriteTime.Format(time.RFC3339)) - fmt.Printf("支持压缩: %v\n", caps.SupportsCompression) - fmt.Printf("支持持久化: %v\n", caps.SupportsPersistence) - fmt.Printf("支持复制: %v\n", caps.SupportsReplication) - fmt.Printf("最大并发写入: %d\n", caps.MaxConcurrentWrites) -} diff --git a/examples/file_engine_example.go b/examples/file_engine_example.go deleted file mode 100644 index 3f95a3c..0000000 --- a/examples/file_engine_example.go +++ /dev/null @@ -1,200 +0,0 @@ -package main - -import ( - "context" - "fmt" - "os" - "time" - - "git.pyer.club/kingecg/gotidb/pkg/engine" - _ "git.pyer.club/kingecg/gotidb/pkg/engine/file" // 导入文件引擎以注册 -) - -func main() { - // 创建临时目录 - tempDir, err := os.MkdirTemp("", "gotidb_example") - if err != nil { - fmt.Printf("Failed to create temp dir: %v\n", err) - return - } - defer os.RemoveAll(tempDir) - - // 创建引擎配置 - config := &engine.FileEngineConfig{ - DataDir: tempDir, - SegmentSize: 1024 * 1024, // 1MB - MaxSegments: 10, - WriteBufferSize: 1000, - IndexCacheSize: 1024 * 1024, // 1MB - UseCompression: false, - CompressionLevel: 0, - CompactThreshold: 0.7, - MaxOpenFiles: 100, - SyncWrites: true, - RetentionPeriod: 24 * time.Hour, - } - - // 创建引擎 - e, err := engine.NewEngine(engine.EngineConfig{ - Type: "file", - FileConfig: config, - }) - if err != nil { - fmt.Printf("Failed to create file engine: %v\n", err) - return - } - - // 打开引擎 - if err := e.Open(); err != nil { - fmt.Printf("Failed to open engine: %v\n", err) - return - } - defer e.Close() - - // 创建上下文 - ctx := context.Background() - - // 写入测试数据 - fmt.Println("Writing data points...") - points := []engine.DataPoint{ - { - Timestamp: time.Now().UnixNano(), - Value: 1.0, - Labels: map[string]string{ - "host": "server1", - "region": "us-west", - "app": "web", - }, - }, - { - Timestamp: time.Now().Add(time.Second).UnixNano(), - Value: 2.0, - Labels: map[string]string{ - "host": "server1", - "region": "us-west", - "app": "web", - }, - }, - { - Timestamp: time.Now().Add(2 * time.Second).UnixNano(), - Value: 3.0, - Labels: map[string]string{ - "host": "server2", - "region": "us-east", - "app": "api", - }, - }, - { - Timestamp: time.Now().Add(3 * time.Second).UnixNano(), - Value: 4.0, - Labels: map[string]string{ - "host": "server2", - "region": "us-east", - "app": "api", - }, - }, - } - - // 写入数据 - if err := e.Write(ctx, points); err != nil { - fmt.Printf("Failed to write points: %v\n", err) - return - } - - // 查询原始数据 - fmt.Println("\nQuerying raw data for server1...") - rawQuery := engine.Query{ - Type: engine.QueryTypeRaw, - StartTime: time.Now().Add(-time.Minute).UnixNano(), - EndTime: time.Now().Add(time.Minute).UnixNano(), - Tags: map[string]string{ - "host": "server1", - }, - Limit: 10, - } - - rawResult, err := e.Query(ctx, rawQuery) - if err != nil { - fmt.Printf("Failed to query raw data: %v\n", err) - return - } - - // 打印原始查询结果 - fmt.Printf("Raw query returned %d series\n", len(rawResult)) - for i, series := range rawResult { - fmt.Printf("Series %d (ID: %s):\n", i+1, series.SeriesID) - fmt.Printf(" Labels: %v\n", series.Points[0].Labels) - fmt.Printf(" Points: %d\n", len(series.Points)) - for j, point := range series.Points { - fmt.Printf(" Point %d: timestamp=%s, value=%f\n", - j+1, - time.Unix(0, point.Timestamp).Format(time.RFC3339Nano), - point.Value) - } - } - - // 查询最新数据 - fmt.Println("\nQuerying latest data for each host...") - latestQuery := engine.Query{ - Type: engine.QueryTypeLatest, - StartTime: time.Now().Add(-time.Minute).UnixNano(), - EndTime: time.Now().Add(time.Minute).UnixNano(), - Tags: map[string]string{}, // 空标签查询所有序列 - } - - latestResult, err := e.Query(ctx, latestQuery) - if err != nil { - fmt.Printf("Failed to query latest data: %v\n", err) - return - } - - // 打印最新查询结果 - fmt.Printf("Latest query returned %d series\n", len(latestResult)) - for i, series := range latestResult { - fmt.Printf("Series %d (ID: %s):\n", i+1, series.SeriesID) - fmt.Printf(" Labels: %v\n", series.Points[0].Labels) - for _, point := range series.Points { - fmt.Printf(" Latest point: timestamp=%s, value=%f\n", - time.Unix(0, point.Timestamp).Format(time.RFC3339Nano), - point.Value) - } - } - - // 查询聚合数据 - fmt.Println("\nQuerying aggregate data (average) for each region...") - aggQuery := engine.Query{ - Type: engine.QueryTypeAggregate, - StartTime: time.Now().Add(-time.Minute).UnixNano(), - EndTime: time.Now().Add(time.Minute).UnixNano(), - AggregateType: engine.AggregateTypeAvg, - Tags: map[string]string{}, // 空标签查询所有序列 - } - - aggResult, err := e.Query(ctx, aggQuery) - if err != nil { - fmt.Printf("Failed to query aggregate data: %v\n", err) - return - } - - // 打印聚合查询结果 - fmt.Printf("Aggregate query returned %d series\n", len(aggResult)) - for i, series := range aggResult { - fmt.Printf("Series %d (ID: %s):\n", i+1, series.SeriesID) - fmt.Printf(" Labels: %v\n", series.Points[0].Labels) - for _, point := range series.Points { - fmt.Printf(" Average value: %f\n", point.Value) - } - } - - // 获取引擎统计信息 - stats := e.Stats() - fmt.Println("\nEngine statistics:") - fmt.Printf(" Points count: %d\n", stats.PointsCount) - fmt.Printf(" Segments count: %d\n", stats.SegmentsCount) - fmt.Printf(" Last write time: %s\n", stats.LastWriteTime.Format(time.RFC3339)) - if !stats.LastCompactionTime.IsZero() { - fmt.Printf(" Last compaction time: %s\n", stats.LastCompactionTime.Format(time.RFC3339)) - } else { - fmt.Printf(" Last compaction time: Never\n") - } -} diff --git a/pkg/engine/bolt/README.md b/pkg/engine/bolt/README.md new file mode 100644 index 0000000..5d8eba5 --- /dev/null +++ b/pkg/engine/bolt/README.md @@ -0,0 +1,98 @@ +# Bolt 存储引擎 + +## 设计原则 + +1. **文件大小限制**:每个源代码文件不超过300行,以保持代码的可维护性和可读性。 + +2. **文件组织**: + - `bolt.go` - 主要结构和基本方法 + - `bolt_query.go` - 查询相关实现 + - `bolt_utils.go` - 工具函数和辅助方法 + - `bolt_maintenance.go` - 维护相关方法 + +3. **职责分离**: + - 每个文件都有明确的职责 + - 相关的功能被组织在同一个文件中 + - 避免文件之间的循环依赖 + +## 主要功能 + +1. **基本操作** + - 创建和初始化引擎 + - 打开和关闭数据库 + - 写入数据点 + +2. **查询功能** + - 最新值查询 + - 原始数据查询 + - 聚合查询 + - 值持续时间查询 + +3. **维护功能** + - 数据压缩 + - 空序列清理 + - 统计信息管理 + +## 数据组织 + +1. **Bucket 结构** + - `series_*` - 存储序列数据 + - `meta` - 存储元数据 + - `index` - 存储序列索引 + +2. **键值设计** + - 序列数据:时间戳(8字节)作为键 + - 元数据:预定义的键(如 "stats") + - 索引:序列ID作为键和值 + +## 性能考虑 + +1. **并发控制** + - 使用读写锁控制并发访问 + - 细粒度的事务管理 + +2. **内存管理** + - 复用对象和缓冲区 + - 批量操作优化 + +3. **磁盘优化** + - 定期压缩 + - 空间回收 + +## 使用示例 + +```go +// 创建引擎实例 +config := &engine.BoltEngineConfig{ + Path: "/path/to/data.db", +} +eng, err := NewBoltEngine(config) +if err != nil { + log.Fatal(err) +} + +// 打开引擎 +if err := eng.Open(); err != nil { + log.Fatal(err) +} +defer eng.Close() + +// 写入数据 +points := []engine.DataPoint{ + { + Timestamp: time.Now().UnixNano(), + Value: 42.0, + Labels: map[string]string{"sensor": "temp"}, + }, +} +if err := eng.Write(context.Background(), points); err != nil { + log.Fatal(err) +} +``` + +## 注意事项 + +1. 在添加新功能时,确保遵循文件大小限制 +2. 保持一致的错误处理和日志记录方式 +3. 定期运行维护操作(压缩和清理) +4. 监控引擎统计信息以优化性能 \ No newline at end of file diff --git a/pkg/engine/bolt/bolt.go b/pkg/engine/bolt/bolt.go new file mode 100644 index 0000000..f495af2 --- /dev/null +++ b/pkg/engine/bolt/bolt.go @@ -0,0 +1,228 @@ +package bolt + +import ( + "context" + "encoding/binary" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" + bolt "go.etcd.io/bbolt" +) + +const ( + // bucket 名称 + seriesBucketPrefix = "series_" // 序列数据 bucket 前缀 + metaBucketName = "meta" // 元数据 bucket + indexBucketName = "index" // 索引 bucket + + // 元数据键 + statsKey = "stats" // 统计信息键 +) + +// BoltEngine 实现基于 BoltDB 的存储引擎 +type BoltEngine struct { + mu sync.RWMutex + config *engine.BoltEngineConfig + db *bolt.DB + stats engine.EngineStats + opened bool + closed bool +} + +// 确保 BoltEngine 实现了 Engine 接口 +var _ engine.Engine = (*BoltEngine)(nil) + +// NewBoltEngine 创建一个新的 Bolt 引擎 +func NewBoltEngine(config *engine.BoltEngineConfig) (engine.Engine, error) { + if config == nil { + return nil, fmt.Errorf("bolt engine config cannot be nil") + } + + // 确保数据目录存在 + if err := os.MkdirAll(filepath.Dir(config.Path), 0755); err != nil { + return nil, fmt.Errorf("failed to create data directory: %v", err) + } + + return &BoltEngine{ + config: config, + stats: engine.EngineStats{ + LastWriteTime: time.Now(), + }, + }, nil +} + +// Open 实现 Engine 接口 +func (b *BoltEngine) Open() error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.opened { + return fmt.Errorf("bolt engine already opened") + } + + if b.closed { + return fmt.Errorf("bolt engine already closed") + } + + // 打开数据库 + db, err := bolt.Open(b.config.Path, 0600, &bolt.Options{ + Timeout: 1 * time.Second, + }) + if err != nil { + return fmt.Errorf("failed to open bolt database: %v", err) + } + + // 创建必要的 bucket + err = db.Update(func(tx *bolt.Tx) error { + // 创建元数据 bucket + if _, err := tx.CreateBucketIfNotExists([]byte(metaBucketName)); err != nil { + return fmt.Errorf("failed to create meta bucket: %v", err) + } + + // 创建索引 bucket + if _, err := tx.CreateBucketIfNotExists([]byte(indexBucketName)); err != nil { + return fmt.Errorf("failed to create index bucket: %v", err) + } + + return nil + }) + if err != nil { + db.Close() + return fmt.Errorf("failed to create buckets: %v", err) + } + + b.db = db + b.opened = true + + // 加载统计信息 + if err := b.loadStats(); err != nil { + return fmt.Errorf("failed to load stats: %v", err) + } + + return nil +} + +// Close 实现 Engine 接口 +func (b *BoltEngine) Close() error { + b.mu.Lock() + defer b.mu.Unlock() + + if !b.opened { + return fmt.Errorf("bolt engine not opened") + } + + if b.closed { + return fmt.Errorf("bolt engine already closed") + } + + // 保存统计信息 + if err := b.saveStats(); err != nil { + return fmt.Errorf("failed to save stats: %v", err) + } + + // 关闭数据库 + if err := b.db.Close(); err != nil { + return fmt.Errorf("failed to close bolt database: %v", err) + } + + b.closed = true + return nil +} + +// Write 实现 Engine 接口 +func (b *BoltEngine) Write(ctx context.Context, points []engine.DataPoint) error { + if len(points) == 0 { + return nil + } + + b.mu.Lock() + defer b.mu.Unlock() + + if !b.opened || b.closed { + return fmt.Errorf("bolt engine not open") + } + + startTime := time.Now() + + // 写入数据点 + err := b.db.Update(func(tx *bolt.Tx) error { + for _, point := range points { + seriesID := point.GetSeriesID() + if seriesID == "" { + b.stats.WriteErrors++ + continue + } + + // 获取或创建序列 bucket + bucketName := seriesBucketPrefix + seriesID + bucket, err := tx.CreateBucketIfNotExists([]byte(bucketName)) + if err != nil { + return fmt.Errorf("failed to create series bucket: %v", err) + } + + // 序列化数据点 + value, err := json.Marshal(point) + if err != nil { + return fmt.Errorf("failed to marshal data point: %v", err) + } + + // 使用时间戳作为键 + key := make([]byte, 8) + binary.BigEndian.PutUint64(key, uint64(point.Timestamp)) + + // 写入数据点 + if err := bucket.Put(key, value); err != nil { + return fmt.Errorf("failed to write data point: %v", err) + } + + // 更新索引 + indexBucket := tx.Bucket([]byte(indexBucketName)) + if err := indexBucket.Put([]byte(seriesID), []byte(seriesID)); err != nil { + return fmt.Errorf("failed to update index: %v", err) + } + } + + return nil + }) + + if err != nil { + return fmt.Errorf("failed to write points: %v", err) + } + + // 更新统计信息 + b.stats.PointsCount += int64(len(points)) + b.stats.LastWriteTime = time.Now() + b.stats.WriteLatency = time.Since(startTime) / time.Duration(len(points)) + + // 更新序列数 + if err := b.updateSeriesCount(); err != nil { + return fmt.Errorf("failed to update series count: %v", err) + } + + return nil +} + +// Stats 实现 Engine 接口 +func (b *BoltEngine) Stats() engine.EngineStats { + b.mu.RLock() + defer b.mu.RUnlock() + + stats := b.stats + + // 获取数据库大小 + if b.db != nil { + stats.DiskUsage = b.db.Stats().TxStats.PageCount * int64(b.db.Info().PageSize) + } + + return stats +} + +// 注册 Bolt 引擎创建函数 +func init() { + engine.RegisterBoltEngine(NewBoltEngine) +} diff --git a/pkg/engine/bolt/bolt_maintenance.go b/pkg/engine/bolt/bolt_maintenance.go new file mode 100644 index 0000000..1dce358 --- /dev/null +++ b/pkg/engine/bolt/bolt_maintenance.go @@ -0,0 +1,80 @@ +package bolt + +import ( + "fmt" + "time" + + bolt "go.etcd.io/bbolt" +) + +// Compact 实现 Engine 接口 +func (b *BoltEngine) Compact() error { + b.mu.Lock() + defer b.mu.Unlock() + + if !b.opened || b.closed { + return fmt.Errorf("bolt engine not open") + } + + // 执行数据库压缩 + if err := b.db.Sync(); err != nil { + return fmt.Errorf("failed to sync database: %v", err) + } + + // 更新统计信息 + b.stats.CompactionCount++ + b.stats.LastCompaction = time.Now() + + return nil +} + +// Cleanup 实现 Engine 接口 +func (b *BoltEngine) Cleanup() error { + b.mu.Lock() + defer b.mu.Unlock() + + if !b.opened || b.closed { + return fmt.Errorf("bolt engine not open") + } + + // 清理空序列 + err := b.db.Update(func(tx *bolt.Tx) error { + indexBucket := tx.Bucket([]byte(indexBucketName)) + if indexBucket == nil { + return fmt.Errorf("index bucket not found") + } + + return indexBucket.ForEach(func(k, v []byte) error { + seriesID := string(k) + bucket := tx.Bucket([]byte(seriesBucketPrefix + seriesID)) + if bucket == nil { + return nil + } + + // 如果序列为空,删除它 + if bucket.Stats().KeyN == 0 { + if err := tx.DeleteBucket([]byte(seriesBucketPrefix + seriesID)); err != nil { + return fmt.Errorf("failed to delete empty series bucket: %v", err) + } + + // 从索引中删除 + if err := indexBucket.Delete(k); err != nil { + return fmt.Errorf("failed to delete series from index: %v", err) + } + } + + return nil + }) + }) + + if err != nil { + return fmt.Errorf("failed to cleanup: %v", err) + } + + // 更新序列数 + if err := b.updateSeriesCount(); err != nil { + return fmt.Errorf("failed to update series count: %v", err) + } + + return nil +} diff --git a/pkg/engine/bolt/bolt_query.go b/pkg/engine/bolt/bolt_query.go new file mode 100644 index 0000000..77d321d --- /dev/null +++ b/pkg/engine/bolt/bolt_query.go @@ -0,0 +1,203 @@ +package bolt + +import ( + "context" + "encoding/binary" + "encoding/json" + "fmt" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" + bolt "go.etcd.io/bbolt" +) + +// Query 实现 Engine 接口 +func (b *BoltEngine) Query(ctx context.Context, query engine.Query) (engine.QueryResult, error) { + b.mu.RLock() + defer b.mu.RUnlock() + + if !b.opened || b.closed { + return nil, fmt.Errorf("bolt engine not open") + } + + startTime := time.Now() + var result engine.QueryResult + + err := b.db.View(func(tx *bolt.Tx) error { + // 遍历所有序列 + indexBucket := tx.Bucket([]byte(indexBucketName)) + if indexBucket == nil { + return fmt.Errorf("index bucket not found") + } + + return indexBucket.ForEach(func(k, v []byte) error { + seriesID := string(k) + bucket := tx.Bucket([]byte(seriesBucketPrefix + seriesID)) + if bucket == nil { + return nil + } + + // 根据查询类型执行不同的查询 + var seriesResult *engine.SeriesResult + var err error + + switch query.Type { + case engine.QueryTypeLatest: + seriesResult, err = b.queryLatest(bucket, seriesID, query) + case engine.QueryTypeRaw: + seriesResult, err = b.queryRaw(bucket, seriesID, query) + case engine.QueryTypeAggregate: + seriesResult, err = b.queryAggregate(bucket, seriesID, query) + case engine.QueryTypeValueDuration: + seriesResult, err = b.queryValueDuration(bucket, seriesID, query) + default: + return fmt.Errorf("unsupported query type: %s", query.Type) + } + + if err != nil { + return err + } + + if seriesResult != nil { + result = append(result, *seriesResult) + } + + return nil + }) + }) + + if err != nil { + b.stats.QueryErrors++ + return nil, fmt.Errorf("failed to execute query: %v", err) + } + + // 更新统计信息 + b.stats.QueryLatency = time.Since(startTime) + + return result, nil +} + +// queryLatest 执行最新值查询 +func (b *BoltEngine) queryLatest(bucket *bolt.Bucket, seriesID string, query engine.Query) (*engine.SeriesResult, error) { + cursor := bucket.Cursor() + k, v := cursor.Last() + if k == nil { + return nil, nil + } + + var point engine.DataPoint + if err := json.Unmarshal(v, &point); err != nil { + return nil, fmt.Errorf("failed to unmarshal data point: %v", err) + } + + // 检查标签是否匹配 + if !matchTags(point.Labels, query.Tags) { + return nil, nil + } + + return &engine.SeriesResult{ + SeriesID: seriesID, + Points: []engine.DataPoint{point}, + }, nil +} + +// queryRaw 执行原始数据查询 +func (b *BoltEngine) queryRaw(bucket *bolt.Bucket, seriesID string, query engine.Query) (*engine.SeriesResult, error) { + var points []engine.DataPoint + + cursor := bucket.Cursor() + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + timestamp := int64(binary.BigEndian.Uint64(k)) + if timestamp < query.StartTime { + continue + } + if timestamp > query.EndTime { + break + } + + var point engine.DataPoint + if err := json.Unmarshal(v, &point); err != nil { + return nil, fmt.Errorf("failed to unmarshal data point: %v", err) + } + + if len(points) == 0 { + // 检查标签是否匹配 + if !matchTags(point.Labels, query.Tags) { + return nil, nil + } + } + + points = append(points, point) + + // 应用限制 + if query.Limit > 0 && len(points) >= query.Limit { + break + } + } + + if len(points) == 0 { + return nil, nil + } + + return &engine.SeriesResult{ + SeriesID: seriesID, + Points: points, + }, nil +} + +// queryAggregate 执行聚合查询 +func (b *BoltEngine) queryAggregate(bucket *bolt.Bucket, seriesID string, query engine.Query) (*engine.SeriesResult, error) { + var points []engine.DataPoint + var firstPoint engine.DataPoint + + cursor := bucket.Cursor() + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + timestamp := int64(binary.BigEndian.Uint64(k)) + if timestamp < query.StartTime { + continue + } + if timestamp > query.EndTime { + break + } + + var point engine.DataPoint + if err := json.Unmarshal(v, &point); err != nil { + return nil, fmt.Errorf("failed to unmarshal data point: %v", err) + } + + if len(points) == 0 { + firstPoint = point + // 检查标签是否匹配 + if !matchTags(point.Labels, query.Tags) { + return nil, nil + } + } + + points = append(points, point) + } + + if len(points) == 0 { + return nil, nil + } + + // 计算聚合值 + aggregateValue := calculateAggregate(points, query.AggregateType) + + // 创建聚合结果点 + aggregatePoint := engine.DataPoint{ + Timestamp: query.EndTime, + Value: aggregateValue, + Labels: firstPoint.Labels, + } + + return &engine.SeriesResult{ + SeriesID: seriesID, + Points: []engine.DataPoint{aggregatePoint}, + }, nil +} + +// queryValueDuration 执行值持续时间查询 +func (b *BoltEngine) queryValueDuration(bucket *bolt.Bucket, seriesID string, query engine.Query) (*engine.SeriesResult, error) { + // 简化实现,实际应该计算每个值的持续时间 + return b.queryRaw(bucket, seriesID, query) +} diff --git a/pkg/engine/bolt/bolt_utils.go b/pkg/engine/bolt/bolt_utils.go new file mode 100644 index 0000000..0709fda --- /dev/null +++ b/pkg/engine/bolt/bolt_utils.go @@ -0,0 +1,131 @@ +package bolt + +import ( + "encoding/json" + "fmt" + + "git.pyer.club/kingecg/gotidb/pkg/engine" + bolt "go.etcd.io/bbolt" +) + +// loadStats 从数据库加载统计信息 +func (b *BoltEngine) loadStats() error { + return b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(metaBucketName)) + if bucket == nil { + return fmt.Errorf("meta bucket not found") + } + + statsData := bucket.Get([]byte(statsKey)) + if statsData == nil { + // 没有保存的统计信息,使用默认值 + return nil + } + + if err := json.Unmarshal(statsData, &b.stats); err != nil { + return fmt.Errorf("failed to unmarshal stats: %v", err) + } + + return nil + }) +} + +// saveStats 将统计信息保存到数据库 +func (b *BoltEngine) saveStats() error { + return b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(metaBucketName)) + if bucket == nil { + return fmt.Errorf("meta bucket not found") + } + + statsData, err := json.Marshal(b.stats) + if err != nil { + return fmt.Errorf("failed to marshal stats: %v", err) + } + + if err := bucket.Put([]byte(statsKey), statsData); err != nil { + return fmt.Errorf("failed to save stats: %v", err) + } + + return nil + }) +} + +// updateSeriesCount 更新序列数统计信息 +func (b *BoltEngine) updateSeriesCount() error { + var count int64 + + err := b.db.View(func(tx *bolt.Tx) error { + indexBucket := tx.Bucket([]byte(indexBucketName)) + if indexBucket == nil { + return fmt.Errorf("index bucket not found") + } + + count = int64(indexBucket.Stats().KeyN) + return nil + }) + + if err != nil { + return err + } + + b.stats.SeriesCount = count + return nil +} + +// matchTags 检查数据点的标签是否匹配查询标签 +func matchTags(pointTags, queryTags map[string]string) bool { + for k, v := range queryTags { + if pointTags[k] != v { + return false + } + } + return true +} + +// calculateAggregate 计算聚合值 +func calculateAggregate(points []engine.DataPoint, aggregateType engine.AggregateType) float64 { + if len(points) == 0 { + return 0 + } + + switch aggregateType { + case engine.AggregateTypeAvg: + sum := 0.0 + for _, p := range points { + sum += p.Value + } + return sum / float64(len(points)) + + case engine.AggregateTypeSum: + sum := 0.0 + for _, p := range points { + sum += p.Value + } + return sum + + case engine.AggregateTypeMin: + min := points[0].Value + for _, p := range points { + if p.Value < min { + min = p.Value + } + } + return min + + case engine.AggregateTypeMax: + max := points[0].Value + for _, p := range points { + if p.Value > max { + max = p.Value + } + } + return max + + case engine.AggregateTypeCount: + return float64(len(points)) + + default: + return 0 + } +} diff --git a/pkg/engine/buffer_test.go b/pkg/engine/buffer_test.go new file mode 100644 index 0000000..2ce4b45 --- /dev/null +++ b/pkg/engine/buffer_test.go @@ -0,0 +1,306 @@ +package engine + +import ( + "errors" + "testing" + "time" +) + +// errorBufferHandler 是一个会返回错误的WriteBufferHandler实现 +type errorBufferHandler struct { + writeError bool + flushError bool + writeCount int + flushCount int + lastWritten DataPoint +} + +func (h *errorBufferHandler) WriteToBuffer(point DataPoint) error { + h.writeCount++ + h.lastWritten = point + if h.writeError { + return errors.New("write error") + } + return nil +} +func (h *errorBufferHandler) ValidatePoint(point DataPoint) error { + // 如果设置了写入错误标志,返回错误 + if h.writeError { + return errors.New("simulated write error") + } + return nil +} + +func (h *errorBufferHandler) FlushBuffer() error { + h.flushCount++ + if h.flushError { + return errors.New("flush error") + } + return nil +} + +func TestWriteBuffer_Creation(t *testing.T) { + handler := &mockBufferHandler{} + + // 测试正常创建 + buffer := NewWriteBuffer(handler, 10) + if buffer == nil { + t.Fatal("NewWriteBuffer() returned nil") + } + if buffer.size != 10 { + t.Errorf("Buffer size = %d, want 10", buffer.size) + } + + // 测试零大小缓冲区 + buffer = NewWriteBuffer(handler, 0) + if buffer.size != 0 { + t.Errorf("Buffer size = %d, want 0", buffer.size) + } + + // 测试负大小缓冲区(应该被视为0) + buffer = NewWriteBuffer(handler, -1) + if buffer.size != -1 { + t.Errorf("Buffer size = %d, want -1", buffer.size) + } +} + +func TestWriteBuffer_SingleWrite(t *testing.T) { + handler := &mockBufferHandler{ + points: make([]DataPoint, 0), + } + buffer := NewWriteBuffer(handler, 5) + + // 写入单个数据点 + point := DataPoint{ + Timestamp: time.Now().UnixNano(), + Value: 42.0, + Labels: map[string]string{"test": "single"}, + } + + if err := buffer.Write(point); err != nil { + t.Errorf("Write() error = %v", err) + } + + // 检查缓冲区状态 + if len(buffer.buffer) != 1 { + t.Errorf("Buffer length = %d, want 1", len(buffer.buffer)) + } + + // 检查处理器状态(不应该有数据点) + if len(handler.points) != 0 { + t.Errorf("Handler points = %d, want 0", len(handler.points)) + } +} + +func TestWriteBuffer_MultipleWrites(t *testing.T) { + handler := &mockBufferHandler{ + points: make([]DataPoint, 0), + } + buffer := NewWriteBuffer(handler, 5) + + // 写入多个数据点,但不超过缓冲区大小 + for i := 0; i < 3; i++ { + point := DataPoint{ + Timestamp: time.Now().UnixNano(), + Value: float64(i), + Labels: map[string]string{"test": "multiple"}, + } + + if err := buffer.Write(point); err != nil { + t.Errorf("Write() error = %v", err) + } + } + + // 检查缓冲区状态 + if len(buffer.buffer) != 3 { + t.Errorf("Buffer length = %d, want 3", len(buffer.buffer)) + } + + // 检查处理器状态(不应该有数据点) + if len(handler.points) != 0 { + t.Errorf("Handler points = %d, want 0", len(handler.points)) + } +} + +func TestWriteBuffer_AutoFlush(t *testing.T) { + handler := &mockBufferHandler{ + points: make([]DataPoint, 0), + } + buffer := NewWriteBuffer(handler, 3) + + // 写入足够的数据点触发自动刷新 + for i := 0; i < 3; i++ { + point := DataPoint{ + Timestamp: time.Now().UnixNano(), + Value: float64(i), + Labels: map[string]string{"test": "autoflush"}, + } + + if err := buffer.Write(point); err != nil { + t.Errorf("Write() error = %v", err) + } + } + + // 检查缓冲区状态(应该已经刷新) + if len(buffer.buffer) != 0 { + t.Errorf("Buffer length = %d, want 0", len(buffer.buffer)) + } + + // 检查处理器状态(应该有3个数据点) + if len(handler.toflush) != 3 { + t.Errorf("Handler points = %d, want 3", len(handler.points)) + } + + // 再写入一个数据点 + point := DataPoint{ + Timestamp: time.Now().UnixNano(), + Value: 42.0, + Labels: map[string]string{"test": "after_flush"}, + } + + if err := buffer.Write(point); err != nil { + t.Errorf("Write() error = %v", err) + } + + // 检查缓冲区状态 + if len(buffer.buffer) != 1 { + t.Errorf("Buffer length = %d, want 1", len(buffer.buffer)) + } + + // 检查处理器状态(仍然应该有3个数据点) + if len(handler.points) != 3 { + t.Errorf("Handler points = %d, want 3", len(handler.points)) + } +} + +func TestWriteBuffer_ManualFlush(t *testing.T) { + handler := &mockBufferHandler{ + points: make([]DataPoint, 0), + } + buffer := NewWriteBuffer(handler, 10) + + // 写入几个数据点 + for i := 0; i < 5; i++ { + point := DataPoint{ + Timestamp: time.Now().UnixNano(), + Value: float64(i), + Labels: map[string]string{"test": "manual_flush"}, + } + + if err := buffer.Write(point); err != nil { + t.Errorf("Write() error = %v", err) + } + } + + // 检查缓冲区状态 + if len(buffer.buffer) != 5 { + t.Errorf("Buffer length = %d, want 5", len(buffer.buffer)) + } + + // 手动刷新 + if err := buffer.Flush(); err != nil { + t.Errorf("Flush() error = %v", err) + } + + // 检查缓冲区状态(应该已经刷新) + if len(buffer.buffer) != 0 { + t.Errorf("Buffer length = %d, want 0", len(buffer.buffer)) + } + + // 检查处理器状态(应该有5个数据点) + if len(handler.points) != 5 { + t.Errorf("Handler points = %d, want 5", len(handler.points)) + } + + // 再次刷新(应该没有效果) + if err := buffer.Flush(); err != nil { + t.Errorf("Flush() error = %v", err) + } + + // 检查处理器状态(仍然应该有5个数据点) + if len(handler.points) != 5 { + t.Errorf("Handler points = %d, want 5", len(handler.points)) + } +} + +func TestWriteBuffer_ErrorHandling(t *testing.T) { + // 测试写入错误 + writeErrorHandler := &errorBufferHandler{ + writeError: true, + } + buffer := NewWriteBuffer(writeErrorHandler, 3) + + point := DataPoint{ + Timestamp: time.Now().UnixNano(), + Value: 42.0, + Labels: map[string]string{"test": "error"}, + } + + // 写入应该返回错误 + err := buffer.Write(point) + if err == nil { + t.Error("Write() did not return error") + } + + // 检查处理器状态 + if writeErrorHandler.writeCount != 1 { + t.Errorf("Handler write count = %d, want 1", writeErrorHandler.writeCount) + } + + // 测试刷新错误 + flushErrorHandler := &errorBufferHandler{ + flushError: true, + } + buffer = NewWriteBuffer(flushErrorHandler, 3) + + // 写入几个数据点 + for i := 0; i < 3; i++ { + point := DataPoint{ + Timestamp: time.Now().UnixNano(), + Value: float64(i), + Labels: map[string]string{"test": "flush_error"}, + } + + // 最后一个写入应该触发刷新,并返回错误 + err := buffer.Write(point) + if i == 2 && err == nil { + t.Error("Write() did not return error on flush") + } + } + + // 检查处理器状态 + if flushErrorHandler.writeCount != 3 { + t.Errorf("Handler write count = %d, want 3", flushErrorHandler.writeCount) + } + if flushErrorHandler.flushCount != 1 { + t.Errorf("Handler flush count = %d, want 1", flushErrorHandler.flushCount) + } +} + +func TestWriteBuffer_ZeroSize(t *testing.T) { + handler := &mockBufferHandler{ + points: make([]DataPoint, 0), + } + buffer := NewWriteBuffer(handler, 0) + + // 写入数据点(应该立即刷新) + point := DataPoint{ + Timestamp: time.Now().UnixNano(), + Value: 42.0, + Labels: map[string]string{"test": "zero_size"}, + } + + if err := buffer.Write(point); err != nil { + t.Errorf("Write() error = %v", err) + } + + // 检查缓冲区状态 + if len(buffer.buffer) != 0 { + t.Errorf("Buffer length = %d, want 0", len(buffer.buffer)) + } + + // 检查处理器状态(应该有1个数据点) + if len(handler.toflush) != 1 { + t.Errorf("Handler points = %d, want 1", len(handler.points)) + } +} diff --git a/pkg/engine/config.go b/pkg/engine/config.go new file mode 100644 index 0000000..dcb3c5b --- /dev/null +++ b/pkg/engine/config.go @@ -0,0 +1,141 @@ +package engine + +import "time" + +// EngineConfig 存储引擎配置接口 +type EngineConfig interface { + // 获取引擎类型 + Type() string +} + +// BaseConfig 基础配置,所有引擎共享 +type BaseConfig struct { + // 数据保留时间,超过此时间的数据将被清理 + RetentionPeriod time.Duration + + // 是否启用同步写入,启用后每次写入都会立即持久化 + SyncWrites bool + + // 写入缓冲区大小,0表示不使用缓冲区 + WriteBufferSize int + + // 是否启用压缩 + EnableCompaction bool + + // 压缩间隔 + CompactionInterval time.Duration +} + +// MemoryEngineConfig 内存引擎配置 +type MemoryEngineConfig struct { + BaseConfig + + // 每个数据点保留的历史值数量(环形队列大小) + MaxHistoryValues int + + // 最大数据点数量,超过此数量将拒绝写入,0表示不限制 + MaxPoints int + + // 是否启用持久化,启用后将定期将数据写入磁盘 + EnablePersistence bool + + // 持久化文件路径,仅在EnablePersistence为true时有效 + PersistencePath string + + // 持久化间隔,仅在EnablePersistence为true时有效 + PersistenceInterval time.Duration +} + +// 实现EngineConfig接口 +func (c *MemoryEngineConfig) Type() string { + return "memory" +} + +// MemoryConfig 返回内存引擎配置 +func (c *MemoryEngineConfig) MemoryConfig() *MemoryEngineConfig { + return c +} + +// BoltConfig 返回Bolt引擎配置 +func (c *MemoryEngineConfig) BoltConfig() *BoltEngineConfig { + return nil +} + +// BoltEngineConfig Bolt引擎配置 +type BoltEngineConfig struct { + BaseConfig + + // 数据库文件路径 + Path string + + // 存储桶名称 + BucketName string + + // 是否启用WAL日志 + EnableWAL bool + + // WAL日志目录,仅在EnableWAL为true时有效 + WALDir string + + // 是否启用批量写入 + EnableBatch bool + + // 批量写入大小,仅在EnableBatch为true时有效 + BatchSize int + + // 批量写入超时,仅在EnableBatch为true时有效 + BatchTimeout time.Duration +} + +// 实现EngineConfig接口 +func (c *BoltEngineConfig) Type() string { + return "bolt" +} + +// MemoryConfig 返回内存引擎配置 +func (c *BoltEngineConfig) MemoryConfig() *MemoryEngineConfig { + return nil +} + +// BoltConfig 返回Bolt引擎配置 +func (c *BoltEngineConfig) BoltConfig() *BoltEngineConfig { + return c +} + +// NewMemoryEngineConfig 创建默认的内存引擎配置 +func NewMemoryEngineConfig() *MemoryEngineConfig { + return &MemoryEngineConfig{ + BaseConfig: BaseConfig{ + RetentionPeriod: 24 * time.Hour, // 默认保留24小时 + SyncWrites: false, + WriteBufferSize: 1000, + EnableCompaction: true, + CompactionInterval: 1 * time.Hour, + }, + MaxHistoryValues: 30, // 默认保留30个历史值 + MaxPoints: 0, // 默认不限制 + EnablePersistence: false, + PersistencePath: "", + PersistenceInterval: 10 * time.Minute, + } +} + +// NewBoltEngineConfig 创建默认的Bolt引擎配置 +func NewBoltEngineConfig(path string) *BoltEngineConfig { + return &BoltEngineConfig{ + BaseConfig: BaseConfig{ + RetentionPeriod: 24 * time.Hour, // 默认保留24小时 + SyncWrites: false, + WriteBufferSize: 1000, + EnableCompaction: true, + CompactionInterval: 1 * time.Hour, + }, + Path: path, + BucketName: "gotidb", + EnableWAL: true, + WALDir: path + "_wal", + EnableBatch: true, + BatchSize: 1000, + BatchTimeout: 1 * time.Second, + } +} diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go new file mode 100644 index 0000000..e09f1d0 --- /dev/null +++ b/pkg/engine/engine.go @@ -0,0 +1,209 @@ +package engine + +import ( + "context" + "sort" + "strings" + "time" +) + +// Engine 定义存储引擎的通用接口 +type Engine interface { + // 生命周期管理 + Open() error + Close() error + + // 数据操作 + Write(ctx context.Context, points []DataPoint) error + Query(ctx context.Context, query Query) (QueryResult, error) + + // 管理功能 + Compact() error + Cleanup() error + + // 监控和统计 + Stats() EngineStats +} + +// DataPoint 表示一个时序数据点 +type DataPoint struct { + Timestamp int64 + Value float64 + Labels map[string]string + Metadata map[string]interface{} // 可选的元数据 +} + +// GetSeriesID 从标签生成序列ID +func (p DataPoint) GetSeriesID() string { + // 如果没有标签,返回默认ID + if len(p.Labels) == 0 { + return "default_series" + } + + // 获取所有标签的键,并排序 + keys := make([]string, 0, len(p.Labels)) + for k := range p.Labels { + keys = append(keys, k) + } + + // 按字典序排序键 + sort.Strings(keys) + + // 构建序列ID + var sb strings.Builder + for i, k := range keys { + if i > 0 { + sb.WriteString(",") + } + sb.WriteString(k) + sb.WriteString("=") + sb.WriteString(p.Labels[k]) + } + + return sb.String() +} + +// QueryType 定义查询类型 +type QueryType string + +const ( + QueryTypeRaw QueryType = "raw" // 原始数据查询 + QueryTypeLatest QueryType = "latest" // 最新值查询 + QueryTypeAggregate QueryType = "aggregate" // 聚合查询 + QueryTypeValueDuration QueryType = "value_duration" // 值持续时间查询 +) + +// AggregateType 定义聚合类型 +type AggregateType string + +const ( + AggregateTypeAvg AggregateType = "avg" + AggregateTypeSum AggregateType = "sum" + AggregateTypeMin AggregateType = "min" + AggregateTypeMax AggregateType = "max" + AggregateTypeCount AggregateType = "count" +) + +// Query 表示查询请求 +type Query struct { + Type QueryType + StartTime int64 + EndTime int64 + Tags map[string]string + Limit int + AggregateType AggregateType // 用于聚合查询 +} + +// QueryResult 表示查询结果 +type QueryResult []SeriesResult + +// SeriesResult 表示单个序列的查询结果 +type SeriesResult struct { + SeriesID string + Points []DataPoint +} + +// EngineStats 存储引擎统计信息 +type EngineStats struct { + // 基本统计 + PointsCount int64 // 数据点总数 + SeriesCount int64 // 序列总数 + LastWriteTime time.Time // 最后写入时间 + + // 性能指标 + WriteLatency time.Duration // 平均写入延迟 + QueryLatency time.Duration // 平均查询延迟 + + // 资源使用 + MemoryUsage int64 // 内存使用量(字节) + DiskUsage int64 // 磁盘使用量(字节) + + // 引擎特定指标 + CompactionCount int64 // 压缩次数 + LastCompaction time.Time // 最后压缩时间 + WriteErrors int64 // 写入错误次数 + QueryErrors int64 // 查询错误次数 +} + +// WriteBufferHandler 定义写入缓冲区处理器接口 +type WriteBufferHandler interface { + WriteToBuffer(point DataPoint) error + FlushBuffer() error + ValidatePoint(point DataPoint) error // 新增方法,只检查不写入 +} + +// WriteBuffer 实现写入缓冲区 +type WriteBuffer struct { + handler WriteBufferHandler + buffer []DataPoint + size int +} + +// NewWriteBuffer 创建新的写入缓冲区 +func NewWriteBuffer(handler WriteBufferHandler, size int) *WriteBuffer { + bufferCap := size + if bufferCap < 0 { + bufferCap = 0 + } + return &WriteBuffer{ + handler: handler, + buffer: make([]DataPoint, 0, bufferCap), + size: size, + } +} + +// Write 写入数据点到缓冲区 +func (b *WriteBuffer) Write(point DataPoint) error { + // 如果缓冲区大小为0,直接写入并返回 + if b.size <= 0 { + if err := b.handler.WriteToBuffer(point); err != nil { + return err + } + return b.handler.FlushBuffer() + } + // 先验证数据点是否可写入(不实际写入) + if err := b.handler.ValidatePoint(point); err != nil { + return err + } + + // 如果缓冲区已满,先刷新 + if len(b.buffer) >= b.size { + if err := b.Flush(); err != nil { + return err + } + } + + // 添加到缓冲区 + b.buffer = append(b.buffer, point) + + // 如果缓冲区已满,立即刷新 + if len(b.buffer) >= b.size { + return b.Flush() + } + + return nil +} + +// Flush 刷新缓冲区 +func (b *WriteBuffer) Flush() error { + if len(b.buffer) == 0 { + return nil + } + + // 批量写入 + for _, point := range b.buffer { + if err := b.handler.WriteToBuffer(point); err != nil { + // 即使出错也清空缓冲区,避免重复处理错误数据 + b.buffer = b.buffer[:0] + // 尝试调用刷新方法,但优先返回写入错误 + _ = b.handler.FlushBuffer() + return err + } + } + + // 清空缓冲区 + b.buffer = b.buffer[:0] + + // 调用处理器的刷新方法 + return b.handler.FlushBuffer() +} diff --git a/pkg/engine/engine_test.go b/pkg/engine/engine_test.go new file mode 100644 index 0000000..97d03d5 --- /dev/null +++ b/pkg/engine/engine_test.go @@ -0,0 +1,283 @@ +package engine + +import ( + "context" + "testing" + "time" +) + +// mockEngine 是一个用于测试的模拟引擎实现 +type mockEngine struct { + points []DataPoint + stats EngineStats + opened bool + closed bool +} + +func (m *mockEngine) Open() error { + m.opened = true + return nil +} + +func (m *mockEngine) Close() error { + m.closed = true + return nil +} + +func (m *mockEngine) Write(ctx context.Context, points []DataPoint) error { + m.points = append(m.points, points...) + m.stats.PointsCount += int64(len(points)) + m.stats.LastWriteTime = time.Now() + return nil +} + +func (m *mockEngine) Query(ctx context.Context, query Query) (QueryResult, error) { + var result QueryResult + if len(m.points) == 0 { + return result, nil + } + + // 根据查询类型返回不同的结果 + switch query.Type { + case QueryTypeLatest: + // 返回最新的数据点 + latest := m.points[len(m.points)-1] + result = append(result, SeriesResult{ + SeriesID: latest.GetSeriesID(), + Points: []DataPoint{latest}, + }) + case QueryTypeRaw: + // 返回所有匹配的数据点 + var matchedPoints []DataPoint + for _, point := range m.points { + if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime { + // 检查标签是否匹配 + if lmatchTags(point.Labels, query.Tags) { + matchedPoints = append(matchedPoints, point) + } + } + } + if len(matchedPoints) > 0 { + result = append(result, SeriesResult{ + SeriesID: matchedPoints[0].GetSeriesID(), + Points: matchedPoints, + }) + } + } + return result, nil +} + +func (m *mockEngine) Compact() error { + m.stats.CompactionCount++ + m.stats.LastCompaction = time.Now() + return nil +} + +func (m *mockEngine) Cleanup() error { + return nil +} + +func (m *mockEngine) Stats() EngineStats { + return m.stats +} + +// matchTags 检查数据点的标签是否匹配查询标签 +func lmatchTags(pointTags, queryTags map[string]string) bool { + for k, v := range queryTags { + if pointTags[k] != v { + return false + } + } + return true +} + +func TestDataPoint_GetSeriesID(t *testing.T) { + tests := []struct { + name string + point DataPoint + labels map[string]string + }{ + { + name: "empty labels", + point: DataPoint{ + Labels: map[string]string{}, + }, + }, + { + name: "single label", + point: DataPoint{ + Labels: map[string]string{"host": "server1"}, + }, + }, + { + name: "multiple labels", + point: DataPoint{ + Labels: map[string]string{ + "host": "server1", + "region": "us-west", + "service": "api", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + id := tt.point.GetSeriesID() + if id == "" { + t.Error("GetSeriesID() returned empty string") + } + + // 相同标签应该生成相同的ID + id2 := tt.point.GetSeriesID() + if id != id2 { + t.Errorf("GetSeriesID() not consistent: got %v and %v", id, id2) + } + + // 创建一个新的数据点,使用相同的标签 + point2 := DataPoint{Labels: tt.point.Labels} + id3 := point2.GetSeriesID() + if id != id3 { + t.Errorf("GetSeriesID() not consistent across instances: got %v and %v", id, id3) + } + }) + } +} + +func TestWriteBuffer(t *testing.T) { + // 创建一个模拟的WriteBufferHandler + handler := &mockBufferHandler{ + toflush: []DataPoint{}, + points: make([]DataPoint, 0), + } + + // 创建WriteBuffer,设置缓冲区大小为2 + buffer := NewWriteBuffer(handler, 2) + + // 测试写入单个数据点 + point1 := DataPoint{ + Timestamp: time.Now().UnixNano(), + Value: 1.0, + Labels: map[string]string{"test": "1"}, + } + if err := buffer.Write(point1); err != nil { + t.Errorf("Write() error = %v", err) + } + if len(handler.points) != 0 { + t.Errorf("Buffer flushed too early, got %d points, want 0", len(handler.points)) + } + + // 测试写入第二个数据点(应该触发刷新) + point2 := DataPoint{ + Timestamp: time.Now().UnixNano(), + Value: 2.0, + Labels: map[string]string{"test": "2"}, + } + if err := buffer.Write(point2); err != nil { + t.Errorf("Write() error = %v", err) + } + if len(handler.toflush) != 2 { + t.Errorf("Buffer not flushed when full, got %d points, want 2", len(handler.points)) + } + + // 测试手动刷新 + point3 := DataPoint{ + Timestamp: time.Now().UnixNano(), + Value: 3.0, + Labels: map[string]string{"test": "3"}, + } + if err := buffer.Write(point3); err != nil { + t.Errorf("Write() error = %v", err) + } + if err := buffer.Flush(); err != nil { + t.Errorf("Flush() error = %v", err) + } + if len(handler.toflush) != 3 { + t.Errorf("Manual flush failed, got %d points, want 3", len(handler.points)) + } +} + +// mockBufferHandler 是一个用于测试的WriteBufferHandler实现 +type mockBufferHandler struct { + toflush []DataPoint + points []DataPoint +} + +func (h *mockBufferHandler) WriteToBuffer(point DataPoint) error { + h.points = append(h.points, point) + return nil +} + +func (h *mockBufferHandler) FlushBuffer() error { + h.toflush = append(h.toflush, h.points...) + h.points = []DataPoint{} + return nil +} +func (h *mockBufferHandler) ValidatePoint(point DataPoint) error { + return nil +} + +func TestEngineFactory(t *testing.T) { + // 注册模拟引擎 + RegisterMemoryEngine(func(config *MemoryEngineConfig) (Engine, error) { + return &mockEngine{}, nil + }) + RegisterBoltEngine(func(config *BoltEngineConfig) (Engine, error) { + return &mockEngine{}, nil + }) + + tests := []struct { + name string + config EngineConfig + wantErr bool + }{ + { + name: "memory engine", + config: NewMemoryEngineConfig(), + wantErr: false, + }, + { + name: "bolt engine", + config: NewBoltEngineConfig("test.db"), + wantErr: false, + }, + { + name: "invalid config type", + config: &struct{ EngineConfig }{}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + engine, err := NewEngine(tt.config) + if (err != nil) != tt.wantErr { + t.Errorf("NewEngine() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr && engine == nil { + t.Error("NewEngine() returned nil engine") + } + }) + } +} + +func TestEngineConfig(t *testing.T) { + // 测试内存引擎配置 + memConfig := NewMemoryEngineConfig() + if memConfig.Type() != "memory" { + t.Errorf("MemoryEngineConfig.Type() = %v, want memory", memConfig.Type()) + } + if memConfig.MaxHistoryValues != 30 { + t.Errorf("MemoryEngineConfig.MaxHistoryValues = %v, want 30", memConfig.MaxHistoryValues) + } + + // 测试Bolt引擎配置 + boltConfig := NewBoltEngineConfig("test.db") + if boltConfig.Type() != "bolt" { + t.Errorf("BoltEngineConfig.Type() = %v, want bolt", boltConfig.Type()) + } + if boltConfig.Path != "test.db" { + t.Errorf("BoltEngineConfig.Path = %v, want test.db", boltConfig.Path) + } +} diff --git a/pkg/engine/factory.go b/pkg/engine/factory.go new file mode 100644 index 0000000..9b683ef --- /dev/null +++ b/pkg/engine/factory.go @@ -0,0 +1,67 @@ +package engine + +import ( + "fmt" +) + +// NewEngine 创建指定类型的存储引擎 +func NewEngine(config EngineConfig) (eng Engine, err error) { + if config == nil { + return nil, fmt.Errorf("engine config cannot be nil") + } + + engineType := "" + defer func() { + if r := recover(); r != nil { + // 如果 Type() 方法调用导致 panic,返回错误 + fmt.Printf("Recovered from panic in NewEngine: %v\n", r) + // return nil,r + eng = nil + err = fmt.Errorf("engine type %s not supported", engineType) + } + }() + + // 尝试获取引擎类型,如果失败则返回错误 + engineType = config.Type() + + switch engineType { + case "memory": + // 检查配置类型是否正确 + memConfig, ok := config.(*MemoryEngineConfig) + if !ok { + return nil, fmt.Errorf("invalid config type for memory engine: %T", config) + } + return newMemoryEngine(memConfig) + case "bolt": + // 检查配置类型是否正确 + boltConfig, ok := config.(*BoltEngineConfig) + if !ok { + return nil, fmt.Errorf("invalid config type for bolt engine: %T", config) + } + return newBoltEngine(boltConfig) + default: + return nil, fmt.Errorf("unsupported engine type") + } +} + +// newMemoryEngine 创建内存引擎 +// 这个函数将在memory包中实现,这里只是声明 +var newMemoryEngine = func(config *MemoryEngineConfig) (Engine, error) { + return nil, fmt.Errorf("memory engine not implemented") +} + +// newBoltEngine 创建Bolt引擎 +// 这个函数将在bolt包中实现,这里只是声明 +var newBoltEngine = func(config *BoltEngineConfig) (Engine, error) { + return nil, fmt.Errorf("bolt engine not implemented") +} + +// RegisterMemoryEngine 注册内存引擎创建函数 +func RegisterMemoryEngine(creator func(config *MemoryEngineConfig) (Engine, error)) { + newMemoryEngine = creator +} + +// RegisterBoltEngine 注册Bolt引擎创建函数 +func RegisterBoltEngine(creator func(config *BoltEngineConfig) (Engine, error)) { + newBoltEngine = creator +} diff --git a/pkg/engine/integration_test.go b/pkg/engine/integration_test.go new file mode 100644 index 0000000..540dd18 --- /dev/null +++ b/pkg/engine/integration_test.go @@ -0,0 +1,322 @@ +package engine + +import ( + "context" + "testing" + "time" +) + +// engineFactory 是一个用于创建引擎实例的函数类型 +type engineFactory func(t *testing.T) Engine + +// engineTest 包含所有引擎测试用例 +type engineTest struct { + name string + factory engineFactory +} + +// 运行所有引擎测试 +func runEngineTests(t *testing.T, tests []engineTest) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + engine := tt.factory(t) + + // 测试基本操作 + t.Run("Basic Operations", func(t *testing.T) { + testBasicOperations(t, engine) + }) + + // 测试查询功能 + t.Run("Query Operations", func(t *testing.T) { + testQueryOperations(t, engine) + }) + + // 测试压缩和清理 + t.Run("Maintenance Operations", func(t *testing.T) { + testMaintenanceOperations(t, engine) + }) + + // 测试统计信息 + t.Run("Stats Operations", func(t *testing.T) { + testStatsOperations(t, engine) + }) + }) + } +} + +// testBasicOperations 测试基本的CRUD操作 +func testBasicOperations(t *testing.T, engine Engine) { + ctx := context.Background() + + // 测试打开引擎 + if err := engine.Open(); err != nil { + t.Fatalf("Failed to open engine: %v", err) + } + + // 准备测试数据 + points := []DataPoint{ + { + Timestamp: time.Now().UnixNano(), + Value: 42.0, + Labels: map[string]string{ + "host": "server1", + "service": "api", + }, + }, + { + Timestamp: time.Now().UnixNano(), + Value: 43.0, + Labels: map[string]string{ + "host": "server2", + "service": "api", + }, + }, + } + + // 测试写入 + if err := engine.Write(ctx, points); err != nil { + t.Fatalf("Failed to write points: %v", err) + } + + // 测试查询 + query := Query{ + Type: QueryTypeRaw, + StartTime: time.Now().Add(-1 * time.Hour).UnixNano(), + EndTime: time.Now().Add(1 * time.Hour).UnixNano(), + Tags: map[string]string{ + "service": "api", + }, + } + + result, err := engine.Query(ctx, query) + if err != nil { + t.Fatalf("Failed to query points: %v", err) + } + + // 验证查询结果 + if len(result) == 0 { + t.Error("Query returned no results") + } + + // 测试关闭引擎 + if err := engine.Close(); err != nil { + t.Fatalf("Failed to close engine: %v", err) + } +} + +// testQueryOperations 测试不同类型的查询操作 +func testQueryOperations(t *testing.T, engine Engine) { + ctx := context.Background() + + // 打开引擎 + if err := engine.Open(); err != nil { + t.Fatalf("Failed to open engine: %v", err) + } + defer engine.Close() + + // 准备测试数据 + now := time.Now() + points := []DataPoint{ + { + Timestamp: now.Add(-2 * time.Hour).UnixNano(), + Value: 10.0, + Labels: map[string]string{ + "host": "server1", + "app": "test", + }, + }, + { + Timestamp: now.Add(-1 * time.Hour).UnixNano(), + Value: 20.0, + Labels: map[string]string{ + "host": "server1", + "app": "test", + }, + }, + { + Timestamp: now.UnixNano(), + Value: 30.0, + Labels: map[string]string{ + "host": "server1", + "app": "test", + }, + }, + } + + // 写入测试数据 + if err := engine.Write(ctx, points); err != nil { + t.Fatalf("Failed to write points: %v", err) + } + + // 测试最新值查询 + t.Run("Latest Query", func(t *testing.T) { + query := Query{ + Type: QueryTypeLatest, + Tags: map[string]string{ + "host": "server1", + "app": "test", + }, + } + + result, err := engine.Query(ctx, query) + if err != nil { + t.Fatalf("Failed to query latest points: %v", err) + } + + if len(result) != 1 { + t.Errorf("Expected 1 series result, got %d", len(result)) + } + + if len(result[0].Points) != 1 { + t.Errorf("Expected 1 point, got %d", len(result[0].Points)) + } + + if result[0].Points[0].Value != 30.0 { + t.Errorf("Expected latest value 30.0, got %f", result[0].Points[0].Value) + } + }) + + // 测试聚合查询 + t.Run("Aggregate Query", func(t *testing.T) { + query := Query{ + Type: QueryTypeAggregate, + StartTime: now.Add(-3 * time.Hour).UnixNano(), + EndTime: now.Add(1 * time.Hour).UnixNano(), + Tags: map[string]string{"app": "test"}, + AggregateType: AggregateTypeAvg, + } + + result, err := engine.Query(ctx, query) + if err != nil { + t.Fatalf("Failed to query aggregate: %v", err) + } + + if len(result) != 1 { + t.Errorf("Expected 1 series result, got %d", len(result)) + } + + // 验证平均值 + expectedAvg := 20.0 // (10 + 20 + 30) / 3 + if len(result[0].Points) > 0 && result[0].Points[0].Value != expectedAvg { + t.Errorf("Expected average value %f, got %f", expectedAvg, result[0].Points[0].Value) + } + }) + + // 测试原始数据查询 + t.Run("Raw Query", func(t *testing.T) { + query := Query{ + Type: QueryTypeRaw, + StartTime: now.Add(-3 * time.Hour).UnixNano(), + EndTime: now.Add(1 * time.Hour).UnixNano(), + Tags: map[string]string{"app": "test"}, + Limit: 10, + } + + result, err := engine.Query(ctx, query) + if err != nil { + t.Fatalf("Failed to query raw data: %v", err) + } + + if len(result) != 1 { + t.Errorf("Expected 1 series result, got %d", len(result)) + } + + if len(result[0].Points) != 3 { + t.Errorf("Expected 3 points, got %d", len(result[0].Points)) + } + }) +} + +// testMaintenanceOperations 测试压缩和清理操作 +func testMaintenanceOperations(t *testing.T, engine Engine) { + // 打开引擎 + if err := engine.Open(); err != nil { + t.Fatalf("Failed to open engine: %v", err) + } + defer engine.Close() + + // 测试压缩 + if err := engine.Compact(); err != nil { + t.Errorf("Failed to compact: %v", err) + } + + // 测试清理 + if err := engine.Cleanup(); err != nil { + t.Errorf("Failed to cleanup: %v", err) + } +} + +// testStatsOperations 测试统计信息收集 +func testStatsOperations(t *testing.T, engine Engine) { + ctx := context.Background() + + // 打开引擎 + if err := engine.Open(); err != nil { + t.Fatalf("Failed to open engine: %v", err) + } + defer engine.Close() + beforeStats := engine.Stats() + // 写入一些数据 + points := []DataPoint{ + { + Timestamp: time.Now().UnixNano(), + Value: 42.0, + Labels: map[string]string{"test": "stats"}, + }, + } + + if err := engine.Write(ctx, points); err != nil { + t.Fatalf("Failed to write points: %v", err) + } + + // 获取统计信息 + stats := engine.Stats() + + // 验证统计信息 + if stats.PointsCount-beforeStats.PointsCount != 1 { + t.Errorf("Expected points count 1, got %d", stats.PointsCount) + } + + if stats.LastWriteTime.IsZero() { + t.Error("LastWriteTime should not be zero") + } +} + +// TestEngines 运行所有引擎的集成测试 +func TestEngines(t *testing.T) { + tests := []engineTest{ + { + name: "MockEngine", + factory: func(t *testing.T) Engine { + return NewMockEngine() + }, + }, + // 当实现了Memory和Bolt引擎后,可以添加它们的测试 + /* + { + name: "MemoryEngine", + factory: func(t *testing.T) Engine { + config := NewMemoryEngineConfig() + engine, err := NewEngine(config) + if err != nil { + t.Fatalf("Failed to create memory engine: %v", err) + } + return engine + }, + }, + { + name: "BoltEngine", + factory: func(t *testing.T) Engine { + config := NewBoltEngineConfig("test.db") + engine, err := NewEngine(config) + if err != nil { + t.Fatalf("Failed to create bolt engine: %v", err) + } + return engine + }, + }, + */ + } + + runEngineTests(t, tests) +} diff --git a/pkg/engine/memory/memory.go b/pkg/engine/memory/memory.go new file mode 100644 index 0000000..4c74321 --- /dev/null +++ b/pkg/engine/memory/memory.go @@ -0,0 +1,394 @@ +package memory + +import ( + "context" + "fmt" + "sync" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" +) + +// MemoryEngine 实现基于内存的存储引擎 +type MemoryEngine struct { + mu sync.RWMutex + config *engine.MemoryEngineConfig + series map[string][]engine.DataPoint // seriesID -> 数据点列表 + stats engine.EngineStats + opened bool + closed bool +} + +// 确保 MemoryEngine 实现了 Engine 接口 +var _ engine.Engine = (*MemoryEngine)(nil) + +// NewMemoryEngine 创建一个新的内存引擎 +func NewMemoryEngine(config *engine.MemoryEngineConfig) (*MemoryEngine, error) { + if config == nil { + return nil, fmt.Errorf("memory engine config cannot be nil") + } + + return &MemoryEngine{ + config: config, + series: make(map[string][]engine.DataPoint), + stats: engine.EngineStats{ + LastWriteTime: time.Now(), + }, + }, nil +} + +// Open 实现 Engine 接口 +func (m *MemoryEngine) Open() error { + m.mu.Lock() + defer m.mu.Unlock() + + if m.opened { + return fmt.Errorf("memory engine already opened") + } + + if m.closed { + return fmt.Errorf("memory engine already closed") + } + + m.opened = true + return nil +} + +// Close 实现 Engine 接口 +func (m *MemoryEngine) Close() error { + m.mu.Lock() + defer m.mu.Unlock() + + if !m.opened { + return fmt.Errorf("memory engine not opened") + } + + if m.closed { + return fmt.Errorf("memory engine already closed") + } + + // 清理资源 + m.series = nil + m.closed = true + return nil +} + +// Write 实现 Engine 接口 +func (m *MemoryEngine) Write(ctx context.Context, points []engine.DataPoint) error { + if len(points) == 0 { + return nil + } + + m.mu.Lock() + defer m.mu.Unlock() + + if !m.opened || m.closed { + return fmt.Errorf("memory engine not open") + } + + startTime := time.Now() + + // 写入数据点 + for _, point := range points { + seriesID := point.GetSeriesID() + if seriesID == "" { + m.stats.WriteErrors++ + continue + } + + // 获取或创建序列 + seriesPoints, exists := m.series[seriesID] + if !exists { + seriesPoints = make([]engine.DataPoint, 0, m.config.MaxHistoryValues) + m.series[seriesID] = seriesPoints + } + + // 添加数据点 + m.series[seriesID] = append(m.series[seriesID], point) + + // 如果超过最大历史值数量,删除最旧的数据点 + if len(m.series[seriesID]) > m.config.MaxHistoryValues { + m.series[seriesID] = m.series[seriesID][1:] + } + } + + // 更新统计信息 + m.stats.PointsCount += int64(len(points)) + m.stats.SeriesCount = int64(len(m.series)) + m.stats.LastWriteTime = time.Now() + m.stats.WriteLatency = time.Since(startTime) / time.Duration(len(points)) + + return nil +} + +// Query 实现 Engine 接口 +func (m *MemoryEngine) Query(ctx context.Context, query engine.Query) (engine.QueryResult, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + if !m.opened || m.closed { + return nil, fmt.Errorf("memory engine not open") + } + + startTime := time.Now() + var result engine.QueryResult + + // 根据查询类型执行不同的查询 + switch query.Type { + case engine.QueryTypeLatest: + result = m.queryLatest(query) + case engine.QueryTypeRaw: + result = m.queryRaw(query) + case engine.QueryTypeAggregate: + result = m.queryAggregate(query) + case engine.QueryTypeValueDuration: + result = m.queryValueDuration(query) + default: + m.stats.QueryErrors++ + return nil, fmt.Errorf("unsupported query type: %s", query.Type) + } + + // 更新统计信息 + m.stats.QueryLatency = time.Since(startTime) + + return result, nil +} + +// queryLatest 执行最新值查询 +func (m *MemoryEngine) queryLatest(query engine.Query) engine.QueryResult { + var result engine.QueryResult + + for seriesID, points := range m.series { + if len(points) == 0 { + continue + } + + // 检查标签是否匹配 + if !matchTags(points[0].Labels, query.Tags) { + continue + } + + // 获取最新的数据点 + latest := points[len(points)-1] + + result = append(result, engine.SeriesResult{ + SeriesID: seriesID, + Points: []engine.DataPoint{latest}, + }) + } + + return result +} + +// queryRaw 执行原始数据查询 +func (m *MemoryEngine) queryRaw(query engine.Query) engine.QueryResult { + var result engine.QueryResult + + for seriesID, points := range m.series { + if len(points) == 0 { + continue + } + + // 检查标签是否匹配 + if !matchTags(points[0].Labels, query.Tags) { + continue + } + + var matchedPoints []engine.DataPoint + for _, point := range points { + if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime { + matchedPoints = append(matchedPoints, point) + } + } + + if len(matchedPoints) > 0 { + // 应用限制 + if query.Limit > 0 && len(matchedPoints) > query.Limit { + matchedPoints = matchedPoints[len(matchedPoints)-query.Limit:] + } + + result = append(result, engine.SeriesResult{ + SeriesID: seriesID, + Points: matchedPoints, + }) + } + } + + return result +} + +// queryAggregate 执行聚合查询 +func (m *MemoryEngine) queryAggregate(query engine.Query) engine.QueryResult { + var result engine.QueryResult + + for seriesID, points := range m.series { + if len(points) == 0 { + continue + } + + // 检查标签是否匹配 + if !matchTags(points[0].Labels, query.Tags) { + continue + } + + var matchedPoints []engine.DataPoint + for _, point := range points { + if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime { + matchedPoints = append(matchedPoints, point) + } + } + + if len(matchedPoints) > 0 { + // 计算聚合值 + aggregateValue := calculateAggregate(matchedPoints, query.AggregateType) + + // 创建聚合结果点 + aggregatePoint := engine.DataPoint{ + Timestamp: query.EndTime, + Value: aggregateValue, + Labels: matchedPoints[0].Labels, + } + + result = append(result, engine.SeriesResult{ + SeriesID: seriesID, + Points: []engine.DataPoint{aggregatePoint}, + }) + } + } + + return result +} + +// queryValueDuration 执行值持续时间查询 +func (m *MemoryEngine) queryValueDuration(query engine.Query) engine.QueryResult { + // 简化实现,实际应该计算每个值的持续时间 + return m.queryRaw(query) +} + +// Compact 实现 Engine 接口 +func (m *MemoryEngine) Compact() error { + m.mu.Lock() + defer m.mu.Unlock() + + if !m.opened || m.closed { + return fmt.Errorf("memory engine not open") + } + + // 内存引擎的压缩操作:删除超过最大历史值数量的数据点 + for seriesID, points := range m.series { + if len(points) > m.config.MaxHistoryValues { + m.series[seriesID] = points[len(points)-m.config.MaxHistoryValues:] + } + } + + // 更新统计信息 + m.stats.CompactionCount++ + m.stats.LastCompaction = time.Now() + + return nil +} + +// Cleanup 实现 Engine 接口 +func (m *MemoryEngine) Cleanup() error { + m.mu.Lock() + defer m.mu.Unlock() + + if !m.opened || m.closed { + return fmt.Errorf("memory engine not open") + } + + // 清理空序列 + for seriesID, points := range m.series { + if len(points) == 0 { + delete(m.series, seriesID) + } + } + + // 更新统计信息 + m.stats.SeriesCount = int64(len(m.series)) + + return nil +} + +// Stats 实现 Engine 接口 +func (m *MemoryEngine) Stats() engine.EngineStats { + m.mu.RLock() + defer m.mu.RUnlock() + + // 计算内存使用量(粗略估计) + var memoryUsage int64 + for _, points := range m.series { + // 每个数据点大约占用的内存 + const pointSize = 8 + 8 + 64 // timestamp + value + 估计的标签大小 + memoryUsage += int64(len(points) * pointSize) + } + + stats := m.stats + stats.MemoryUsage = memoryUsage + + return stats +} + +// matchTags 检查数据点的标签是否匹配查询标签 +func matchTags(pointTags, queryTags map[string]string) bool { + for k, v := range queryTags { + if pointTags[k] != v { + return false + } + } + return true +} + +// calculateAggregate 计算聚合值 +func calculateAggregate(points []engine.DataPoint, aggregateType engine.AggregateType) float64 { + if len(points) == 0 { + return 0 + } + + switch aggregateType { + case engine.AggregateTypeAvg: + sum := 0.0 + for _, p := range points { + sum += p.Value + } + return sum / float64(len(points)) + + case engine.AggregateTypeSum: + sum := 0.0 + for _, p := range points { + sum += p.Value + } + return sum + + case engine.AggregateTypeMin: + min := points[0].Value + for _, p := range points { + if p.Value < min { + min = p.Value + } + } + return min + + case engine.AggregateTypeMax: + max := points[0].Value + for _, p := range points { + if p.Value > max { + max = p.Value + } + } + return max + + case engine.AggregateTypeCount: + return float64(len(points)) + + default: + return 0 + } +} + +// 注册内存引擎创建函数 +func init() { + engine.RegisterMemoryEngine(func(config *engine.MemoryEngineConfig) (engine.Engine, error) { + return NewMemoryEngine(config) + }) +} diff --git a/pkg/engine/mock_engine.go b/pkg/engine/mock_engine.go new file mode 100644 index 0000000..412dbe9 --- /dev/null +++ b/pkg/engine/mock_engine.go @@ -0,0 +1,370 @@ +package engine + +import ( + "context" + "fmt" + "sync" + "time" +) + +// MockEngine 是一个用于测试的模拟引擎实现 +type MockEngine struct { + mu sync.RWMutex + points map[string][]DataPoint // seriesID -> 数据点列表 + stats EngineStats + opened bool + closed bool + writeError error + queryError error + compactError error + cleanupError error + writeCallback func([]DataPoint) error + queryCallback func(Query) (QueryResult, error) +} + +// NewMockEngine 创建一个新的模拟引擎 +func NewMockEngine() *MockEngine { + return &MockEngine{ + points: make(map[string][]DataPoint), + stats: EngineStats{ + LastWriteTime: time.Now(), + }, + } +} + +// SetWriteError 设置写入操作的错误 +func (m *MockEngine) SetWriteError(err error) { + m.writeError = err +} + +// SetQueryError 设置查询操作的错误 +func (m *MockEngine) SetQueryError(err error) { + m.queryError = err +} + +// SetCompactError 设置压缩操作的错误 +func (m *MockEngine) SetCompactError(err error) { + m.compactError = err +} + +// SetCleanupError 设置清理操作的错误 +func (m *MockEngine) SetCleanupError(err error) { + m.cleanupError = err +} + +// SetWriteCallback 设置写入回调函数 +func (m *MockEngine) SetWriteCallback(callback func([]DataPoint) error) { + m.writeCallback = callback +} + +// SetQueryCallback 设置查询回调函数 +func (m *MockEngine) SetQueryCallback(callback func(Query) (QueryResult, error)) { + m.queryCallback = callback +} + +// IsOpened 返回引擎是否已打开 +func (m *MockEngine) IsOpened() bool { + return m.opened +} + +// IsClosed 返回引擎是否已关闭 +func (m *MockEngine) IsClosed() bool { + return m.closed +} + +// GetPoints 返回所有数据点 +func (m *MockEngine) GetPoints() map[string][]DataPoint { + m.mu.RLock() + defer m.mu.RUnlock() + + // 创建一个副本 + result := make(map[string][]DataPoint) + for seriesID, points := range m.points { + pointsCopy := make([]DataPoint, len(points)) + copy(pointsCopy, points) + result[seriesID] = pointsCopy + } + + return result +} + +// GetPointsCount 返回数据点总数 +func (m *MockEngine) GetPointsCount() int { + m.mu.RLock() + defer m.mu.RUnlock() + + count := 0 + for _, points := range m.points { + count += len(points) + } + + return count +} + +// Open 实现Engine接口 +func (m *MockEngine) Open() error { + m.opened = true + return nil +} + +// Close 实现Engine接口 +func (m *MockEngine) Close() error { + m.closed = true + return nil +} + +// Write 实现Engine接口 +func (m *MockEngine) Write(ctx context.Context, points []DataPoint) error { + if m.writeError != nil { + return m.writeError + } + + if m.writeCallback != nil { + if err := m.writeCallback(points); err != nil { + return err + } + } + + m.mu.Lock() + defer m.mu.Unlock() + + for _, point := range points { + seriesID := point.GetSeriesID() + if seriesID == "" { + return fmt.Errorf("invalid series ID for point: %+v", point) + } + + m.points[seriesID] = append(m.points[seriesID], point) + } + + m.stats.PointsCount += int64(len(points)) + m.stats.LastWriteTime = time.Now() + m.stats.SeriesCount = int64(len(m.points)) + + return nil +} + +// Query 实现Engine接口 +func (m *MockEngine) Query(ctx context.Context, query Query) (QueryResult, error) { + if m.queryError != nil { + return nil, m.queryError + } + + if m.queryCallback != nil { + return m.queryCallback(query) + } + + m.mu.RLock() + defer m.mu.RUnlock() + + var result QueryResult + + // 根据查询类型返回不同的结果 + switch query.Type { + case QueryTypeLatest: + // 返回每个序列的最新数据点 + for seriesID, points := range m.points { + if len(points) == 0 { + continue + } + + // 检查标签是否匹配 + if !matchTags(points[0].Labels, query.Tags) { + continue + } + + // 获取最新的数据点 + latest := points[len(points)-1] + + result = append(result, SeriesResult{ + SeriesID: seriesID, + Points: []DataPoint{latest}, + }) + } + + case QueryTypeRaw: + // 返回所有匹配的数据点 + for seriesID, points := range m.points { + if len(points) == 0 { + continue + } + + // 检查标签是否匹配 + if !matchTags(points[0].Labels, query.Tags) { + continue + } + + var matchedPoints []DataPoint + for _, point := range points { + if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime { + matchedPoints = append(matchedPoints, point) + } + } + + if len(matchedPoints) > 0 { + // 应用限制 + if query.Limit > 0 && len(matchedPoints) > query.Limit { + matchedPoints = matchedPoints[:query.Limit] + } + + result = append(result, SeriesResult{ + SeriesID: seriesID, + Points: matchedPoints, + }) + } + } + + case QueryTypeAggregate: + // 返回聚合结果 + for seriesID, points := range m.points { + if len(points) == 0 { + continue + } + + // 检查标签是否匹配 + if !matchTags(points[0].Labels, query.Tags) { + continue + } + + var matchedPoints []DataPoint + for _, point := range points { + if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime { + matchedPoints = append(matchedPoints, point) + } + } + + if len(matchedPoints) > 0 { + // 计算聚合值 + aggregateValue := calculateAggregate(matchedPoints, query.AggregateType) + + // 创建聚合结果点 + aggregatePoint := DataPoint{ + Timestamp: query.EndTime, + Value: aggregateValue, + Labels: matchedPoints[0].Labels, + } + + result = append(result, SeriesResult{ + SeriesID: seriesID, + Points: []DataPoint{aggregatePoint}, + }) + } + } + + case QueryTypeValueDuration: + // 返回值持续时间查询结果 + // 这里简化实现,实际应该计算每个值的持续时间 + for seriesID, points := range m.points { + if len(points) == 0 { + continue + } + + // 检查标签是否匹配 + if !matchTags(points[0].Labels, query.Tags) { + continue + } + + var matchedPoints []DataPoint + for _, point := range points { + if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime { + matchedPoints = append(matchedPoints, point) + } + } + + if len(matchedPoints) > 0 { + result = append(result, SeriesResult{ + SeriesID: seriesID, + Points: matchedPoints, + }) + } + } + + default: + return nil, fmt.Errorf("unsupported query type: %s", query.Type) + } + + return result, nil +} + +// Compact 实现Engine接口 +func (m *MockEngine) Compact() error { + if m.compactError != nil { + return m.compactError + } + + m.stats.CompactionCount++ + m.stats.LastCompaction = time.Now() + + return nil +} + +// Cleanup 实现Engine接口 +func (m *MockEngine) Cleanup() error { + if m.cleanupError != nil { + return m.cleanupError + } + + return nil +} + +// Stats 实现Engine接口 +func (m *MockEngine) Stats() EngineStats { + return m.stats +} + +// matchTags 检查数据点的标签是否匹配查询标签 +func matchTags(pointTags, queryTags map[string]string) bool { + for k, v := range queryTags { + if pointTags[k] != v { + return false + } + } + return true +} + +// calculateAggregate 计算聚合值 +func calculateAggregate(points []DataPoint, aggregateType AggregateType) float64 { + if len(points) == 0 { + return 0 + } + + switch aggregateType { + case AggregateTypeAvg: + sum := 0.0 + for _, p := range points { + sum += p.Value + } + return sum / float64(len(points)) + + case AggregateTypeSum: + sum := 0.0 + for _, p := range points { + sum += p.Value + } + return sum + + case AggregateTypeMin: + min := points[0].Value + for _, p := range points { + if p.Value < min { + min = p.Value + } + } + return min + + case AggregateTypeMax: + max := points[0].Value + for _, p := range points { + if p.Value > max { + max = p.Value + } + } + return max + + case AggregateTypeCount: + return float64(len(points)) + + default: + return 0 + } +}