Compare commits

...

2 Commits

Author SHA1 Message Date
程广 abd8768aa4 refactory engine 2025-06-12 19:01:40 +08:00
程广 c12a5c9c0e docs(README): 更新项目文档并重构 README 结构
- 重写 README.md,优化项目介绍、功能特点和快速开始等内容
- 添加存储引擎设计文档
- 新增多引擎示例和性能对比演示
- 更新代码示例,展示如何使用 GoTiDB
2025-06-12 15:43:18 +08:00
22 changed files with 3831 additions and 154 deletions

7
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,7 @@
{
// 使 IntelliSense
//
// 访: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": []
}

277
README.md
View File

@ -1,169 +1,160 @@
# GoTiDB - 时序数据库
# GoTiDB - 轻量级时序数据库
GoTiDB 是一个用 Go 语言编写的轻量级时序数据库,专门用于存储和查询时间序列数据。它支持高效的数据写入、查询和实时数据推送功能。
GoTiDB 是一个用 Go 语言编写的轻量级时序数据库,专为高效存储和查询时间序列数据而设计。它提供了简单而强大的 API支持高吞吐量的数据写入和灵活的查询功能。
## 特性
## 功能特点
- 高性能内存存储引擎
- WAL预写日志持久化
- REST API 接口
- WebSocket 实时数据推送
- NATS 消息系统集成
- Prometheus 指标监控
- 支持自定义标签的数据点
- 环形缓冲区数据结构
- 支持多种查询类型(最新值、所有值、持续时间)
- **高效存储**: 使用基于文件的存储引擎,针对时间序列数据进行了优化
- **灵活查询**: 支持原始数据查询、最新值查询和聚合查询
- **标签索引**: 使用多维标签索引,支持按标签快速过滤数据
- **时间窗口**: 高效的时间窗口索引,加速时间范围查询
- **数据压缩**: 支持自动压缩旧数据,节省存储空间
- **数据保留**: 自动清理过期数据,支持配置保留策略
- **并发安全**: 支持多个并发读写操作
- **可扩展**: 模块化设计,易于扩展和定制
## 安装
确保你已经安装了 Go 1.16 或更高版本。
```bash
git clone git.pyer.club/kingecg/gotidb
cd gotidb
go mod download
go get git.pyer.club/kingecg/gotidb
```
## 构建
## 快速开始
```bash
go build -o gotidb cmd/server/main.go
```
以下是一个简单的示例,展示如何使用 GoTiDB
## 运行
```go
package main
```bash
./gotidb [options]
```
import (
"context"
"fmt"
"time"
### 可用选项
"git.pyer.club/kingecg/gotidb/pkg/engine"
_ "git.pyer.club/kingecg/gotidb/pkg/engine/file" // 导入文件引擎
)
- `-rest-addr`: REST API 服务地址(默认:":8080"
- `-ws-addr`: WebSocket 服务地址(默认:":8081"
- `-metrics-addr`: 指标服务地址(默认:":8082"
- `-quic-addr`: QUIC 服务地址(默认:":8083"
- `-nats-url`: NATS 服务器地址(默认:"nats://localhost:4222"
- `-persistence`: 持久化类型none, wal, boltdb默认"none"
- `-persistence-dir`: 持久化目录(默认:"./data"
- `-sync-every`: 每写入多少条数据同步一次默认100
- `-config`: 配置文件路径(默认:"config.yaml"
### 持久化选项
GoTiDB 支持多种持久化方式:
1. **内存存储(无持久化)**:数据仅保存在内存中,服务重启后数据丢失。
- 配置:`-persistence=none`
2. **WAL 日志持久化**使用预写日志Write-Ahead Log进行持久化支持数据恢复。
- 配置:`-persistence=wal -persistence-dir=./data -sync-every=100`
3. **BoltDB 持久化**:使用 BoltDB 进行持久化,提供更高的可靠性和查询性能。
- 配置:`-persistence=boltdb -persistence-dir=./data`
- 配置文件中可设置:`boltdb_filename`(数据库文件名)和 `boltdb_bucket_size`(数据分桶大小)
## API 使用
### REST API
#### 写入数据
```bash
curl -X POST http://localhost:8080/api/v1/write \
-H "Content-Type: application/json" \
-d '{
"device_id": "device1",
"metric_code": "temperature",
"labels": {
"location": "room1"
},
"value": 25.5
}'
```
#### 批量写入数据
```bash
curl -X POST http://localhost:8080/api/v1/batch_write \
-H "Content-Type: application/json" \
-d '{
"points": [
{
"device_id": "device1",
"metric_code": "temperature",
"labels": {
"location": "room1"
},
"value": 25.5
},
{
"device_id": "device2",
"metric_code": "humidity",
"labels": {
"location": "room2"
},
"value": 60
func main() {
// 创建引擎配置
config := &engine.FileEngineConfig{
DataDir: "/path/to/data",
SegmentSize: 1024 * 1024, // 1MB
MaxSegments: 10,
WriteBufferSize: 1000,
}
]
}'
```
#### 查询数据
```bash
curl -X POST http://localhost:8080/api/v1/query \
-H "Content-Type: application/json" \
-d '{
"device_id": "device1",
"metric_code": "temperature",
"labels": {
"location": "room1"
},
"query_type": "latest"
}'
```
### WebSocket API
连接 WebSocket 服务:
```javascript
const ws = new WebSocket('ws://localhost:8081/ws');
// 订阅数据点
ws.send(JSON.stringify({
device_id: "device1",
metric_code: "temperature",
labels: {
location: "room1"
// 创建引擎
e, err := engine.NewEngine(engine.EngineConfig{
Type: "file",
FileConfig: config,
})
if err != nil {
fmt.Printf("Failed to create engine: %v\n", err)
return
}
}));
// 接收数据更新
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log('Received update:', data);
};
// 打开引擎
if err := e.Open(); err != nil {
fmt.Printf("Failed to open engine: %v\n", err)
return
}
defer e.Close()
// 写入数据
points := []engine.DataPoint{
{
Timestamp: time.Now().UnixNano(),
Value: 42.0,
Labels: map[string]string{
"host": "server1",
"region": "us-west",
},
},
}
ctx := context.Background()
if err := e.Write(ctx, points); err != nil {
fmt.Printf("Failed to write points: %v\n", err)
return
}
// 查询数据
query := engine.Query{
Type: engine.QueryTypeRaw,
StartTime: time.Now().Add(-time.Hour).UnixNano(),
EndTime: time.Now().UnixNano(),
Tags: map[string]string{
"host": "server1",
},
}
result, err := e.Query(ctx, query)
if err != nil {
fmt.Printf("Failed to query: %v\n", err)
return
}
// 处理查询结果
for _, series := range result {
fmt.Printf("Series ID: %s\n", series.SeriesID)
for _, point := range series.Points {
fmt.Printf(" Timestamp: %s, Value: %f\n",
time.Unix(0, point.Timestamp).Format(time.RFC3339),
point.Value)
}
}
}
```
## 监控
更多示例请参考 [examples](./examples) 目录。
访问 `http://localhost:8082/metrics` 查看 Prometheus 指标。
## 配置选项
可用指标:
### 文件引擎配置
- `gotidb_write_total`: 写入操作总数
- `gotidb_query_total`: 查询操作总数
- `gotidb_write_latency_seconds`: 写入操作延迟
- `gotidb_query_latency_seconds`: 查询操作延迟
- `gotidb_active_connections`: 活跃连接数
- `gotidb_data_points_count`: 数据点数量
- `gotidb_persistence_latency_seconds`: 持久化操作延迟
- `gotidb_persistence_errors_total`: 持久化错误总数
- `gotidb_messaging_latency_seconds`: 消息操作延迟
- `gotidb_messaging_errors_total`: 消息错误总数
- `gotidb_websocket_connections`: WebSocket 连接数
| 选项 | 描述 | 默认值 |
|------|------|--------|
| DataDir | 数据存储目录 | 必填 |
| SegmentSize | 段文件大小限制(字节) | 64MB |
| MaxSegments | 最大段文件数量 | 100 |
| WriteBufferSize | 写入缓冲区大小(数据点数) | 1000 |
| IndexCacheSize | 索引缓存大小(字节) | 32MB |
| UseCompression | 是否启用压缩 | false |
| CompressionLevel | 压缩级别0-9 | 6 |
| CompactThreshold | 触发压缩的阈值(段文件数量比例) | 0.7 |
| MaxOpenFiles | 最大打开文件数 | 100 |
| SyncWrites | 是否同步写入(更安全但更慢) | false |
| RetentionPeriod | 数据保留时间 | 30d |
## 性能考虑
- **写入性能**: 使用写入缓冲区和异步刷新可以提高写入性能
- **查询性能**: 使用标签索引和时间窗口索引加速查询
- **存储效率**: 启用压缩可以减少存储空间占用,但会增加 CPU 使用率
- **内存使用**: 调整索引缓存大小可以平衡内存使用和查询性能
- **文件描述符**: 调整最大打开文件数以适应系统限制
## 架构
GoTiDB 的核心架构包括:
1. **引擎接口**: 定义了存储引擎的通用接口
2. **文件引擎**: 基于文件系统的存储引擎实现
3. **索引管理**: 标签索引和时间窗口索引
4. **查询处理**: 原始查询、最新值查询和聚合查询
5. **后台任务**: 数据压缩和过期数据清理
## 贡献
欢迎贡献代码、报告问题或提出改进建议!请遵循以下步骤:
1. Fork 项目
2. 创建功能分支 (`git checkout -b feature/amazing-feature`)
3. 提交更改 (`git commit -m 'Add some amazing feature'`)
4. 推送到分支 (`git push origin feature/amazing-feature`)
5. 创建 Pull Request
## 许可证
MIT License
本项目采用 MIT 许可证 - 详见 [LICENSE](LICENSE) 文件。

View File

@ -138,7 +138,7 @@ func main() {
quicConfig = config.QuicConfig // 如果配置文件中有配置,则使用配置文件中的配置
}
quicServer, err := api.NewQUICServer(dataManager, quicConfig)
quicServer, err = api.NewQUICServer(dataManager, quicConfig)
if err != nil {
log.Printf("Failed to create QUIC server: %v", err)
log.Println("Continuing without QUIC server")

View File

@ -0,0 +1,450 @@
# 存储引擎设计文档
## 1. 概述
GoTiDB存储引擎抽象层旨在提供统一的接口使不同的存储后端可以无缝集成到系统中。本文档描述了存储引擎的设计原则、接口定义和实现建议。
## 2. 设计目标
- **抽象统一**: 提供一致的API隐藏不同存储引擎的实现细节
- **可扩展性**: 支持添加新的存储引擎而无需修改核心代码
- **性能优化**: 针对时序数据的特点进行优化
- **可配置性**: 允许通过配置调整引擎行为
## 3. 存储引擎接口
### 3.1 核心接口
```go
// Engine 是所有存储引擎必须实现的基础接口
type Engine interface {
// 基本生命周期
Open() error
Close() error
// 数据操作
WritePoint(ctx context.Context, point DataPoint) error
WriteBatch(ctx context.Context, points []DataPoint) error
// 查询操作
Query(ctx context.Context, query Query) (QueryResult, error)
// 管理操作
Flush() error
Compact() error
// 监控
Stats() EngineStats
// 能力查询
Capabilities() EngineCapabilities
}
```
### 3.2 扩展接口
特定引擎可以实现额外接口来提供特殊功能:
```go
// PersistentEngine 提供持久化功能
type PersistentEngine interface {
Engine
Backup(path string) error
Restore(path string) error
}
// ReplicatedEngine 提供复制功能
type ReplicatedEngine interface {
Engine
AddReplica(addr string) error
RemoveReplica(addr string) error
}
```
## 4. 统一查询接口
所有读操作通过统一的Query接口实现提供灵活性和一致性
```go
// Query 定义查询参数
type Query struct {
// 查询类型
Type QueryType
// 时间范围
StartTime int64
EndTime int64
// 序列标识
SeriesID string
DeviceID string
MetricCode string
// 标签过滤
TagFilters []TagFilter
// 聚合选项
Aggregation AggregationType
AggInterval time.Duration
IncludeRawData bool
// 结果限制
Limit int
Offset int
// 其他查询选项
Options map[string]interface{}
}
// QueryType 定义查询类型
type QueryType int
const (
// 原始数据查询
QueryTypeRaw QueryType = iota
// 聚合查询
QueryTypeAggregate
// 最新值查询
QueryTypeLatest
// 标签查询
QueryTypeTags
// 元数据查询
QueryTypeMetadata
)
// TagFilter 定义标签过滤条件
type TagFilter struct {
Key string
Operator FilterOperator
Value string
}
// FilterOperator 定义过滤操作符
type FilterOperator int
const (
OpEqual FilterOperator = iota
OpNotEqual
OpRegex
OpGreaterThan
OpLessThan
// 更多操作符...
)
// AggregationType 定义聚合类型
type AggregationType int
const (
AggNone AggregationType = iota
AggSum
AggAvg
AggMin
AggMax
AggCount
// 更多聚合类型...
)
```
### 4.1 查询结果
```go
// QueryResult 定义查询结果
type QueryResult interface {
// 结果类型
Type() QueryType
}
// TimeSeriesResult 定义时间序列查询结果
type TimeSeriesResult struct {
SeriesID string
Points []DataPoint
}
// AggregateResult 定义聚合查询结果
type AggregateResult struct {
SeriesID string
Groups []AggregateGroup
}
type AggregateGroup struct {
StartTime int64
EndTime int64
Value float64
Count int
}
```
### 4.2 查询构建器
为了简化查询构建提供流式API
```go
query := NewQueryBuilder().
ForMetric("cpu.usage").
WithTimeRange(startTime, endTime).
WithTag("host", OpEqual, "server01").
WithAggregation(AggAvg, 5*time.Minute).
Build()
```
## 5. 配置抽象
```go
type EngineConfig interface {
// 通用配置方法
WithMaxRetention(duration time.Duration) EngineConfig
WithMaxPoints(points int) EngineConfig
WithFlushInterval(interval time.Duration) EngineConfig
// 获取特定引擎的配置
MemoryConfig() *MemoryEngineConfig
FileConfig() *FileEngineConfig
// 其他引擎...
}
// 内存引擎特定配置
type MemoryEngineConfig struct {
MaxPointsPerSeries int // 可配置的保留点数替代硬编码的30
UseCompression bool
// 其他内存引擎特定参数...
}
```
## 6. 引擎注册机制
```go
// EngineRegistry 管理所有可用的存储引擎
type EngineRegistry struct {
engines map[string]EngineFactory
}
// EngineFactory 创建存储引擎实例
type EngineFactory func(config EngineConfig) (Engine, error)
// 注册新引擎
func (r *EngineRegistry) Register(name string, factory EngineFactory) {
r.engines[name] = factory
}
// 创建引擎实例
func (r *EngineRegistry) Create(name string, config EngineConfig) (Engine, error) {
if factory, ok := r.engines[name]; ok {
return factory(config)
}
return nil, fmt.Errorf("unknown engine: %s", name)
}
```
## 7. 性能优化建议
### 7.1 写入路径优化
实现写入缓冲区合并小批量写入:
```go
type WriteBuffer struct {
points map[string][]DataPoint // 按序列ID分组
mu sync.Mutex
maxSize int
flushCh chan struct{}
engine Engine
}
func (wb *WriteBuffer) Add(point DataPoint) {
wb.mu.Lock()
seriesID := point.SeriesID()
wb.points[seriesID] = append(wb.points[seriesID], point)
size := len(wb.points)
wb.mu.Unlock()
if size >= wb.maxSize {
wb.Flush()
}
}
func (wb *WriteBuffer) Flush() {
wb.mu.Lock()
points := wb.points
wb.points = make(map[string][]DataPoint)
wb.mu.Unlock()
// 批量写入引擎
wb.engine.WriteBatch(context.Background(), points)
}
```
### 7.2 并发控制优化
实现分片锁减少锁竞争:
```go
type ShardedLock struct {
locks []sync.RWMutex
shardMask uint64
}
func NewShardedLock(shards int) *ShardedLock {
// 确保分片数是2的幂
shards = nextPowerOfTwo(shards)
return &ShardedLock{
locks: make([]sync.RWMutex, shards),
shardMask: uint64(shards - 1),
}
}
func (sl *ShardedLock) getLockForKey(key string) *sync.RWMutex {
h := fnv.New64()
h.Write([]byte(key))
hashVal := h.Sum64()
return &sl.locks[hashVal&sl.shardMask]
}
func (sl *ShardedLock) Lock(key string) {
sl.getLockForKey(key).Lock()
}
func (sl *ShardedLock) Unlock(key string) {
sl.getLockForKey(key).Unlock()
}
```
### 7.3 内存优化
实现时序数据的紧凑存储:
```go
// 紧凑存储时间戳和值
type CompactTimeSeriesBlock struct {
baseTime int64
deltaEncode []byte // 使用delta编码存储时间戳
values []byte // 压缩存储的值
}
func NewCompactBlock(baseTime int64, capacity int) *CompactTimeSeriesBlock {
return &CompactTimeSeriesBlock{
baseTime: baseTime,
deltaEncode: make([]byte, 0, capacity*binary.MaxVarintLen64),
values: make([]byte, 0, capacity*8), // 假设double值
}
}
func (b *CompactTimeSeriesBlock) AddPoint(timestamp int64, value float64) {
// 存储时间增量
delta := timestamp - b.baseTime
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutVarint(buf, delta)
b.deltaEncode = append(b.deltaEncode, buf[:n]...)
// 存储值
bits := math.Float64bits(value)
buf = make([]byte, 8)
binary.LittleEndian.PutUint64(buf, bits)
b.values = append(b.values, buf...)
}
```
### 7.4 查询优化
实现时间范围索引:
```go
type TimeRangeIndex struct {
// 每个时间窗口的起始位置
windows []timeWindow
blockSize int64 // 时间窗口大小如1小时
}
type timeWindow struct {
startTime int64
endTime int64
offset int // 数据块中的偏移
}
func (idx *TimeRangeIndex) FindBlocks(start, end int64) []int {
var result []int
for i, window := range idx.windows {
if window.endTime >= start && window.startTime <= end {
result = append(result, i)
}
}
return result
}
```
## 8. 实现路线图
1. **定义核心接口**
- 实现Engine接口
- 定义Query和QueryResult结构
2. **重构现有引擎**
- 调整内存引擎以实现新接口
- 使MaxPointsPerSeries可配置
3. **实现查询构建器**
- 创建流式API构建查询
4. **添加性能优化**
- 实现写入缓冲区
- 添加分片锁
- 优化内存使用
5. **实现引擎注册机制**
- 创建EngineRegistry
- 支持动态引擎选择
6. **添加监控和统计**
- 实现Stats接口
- 收集性能指标
## 9. 使用示例
```go
// 创建引擎
registry := NewEngineRegistry()
registry.Register("memory", NewMemoryEngine)
registry.Register("file", NewFileEngine)
config := NewEngineConfig().
WithMaxRetention(24 * time.Hour).
WithMaxPoints(1000)
engine, err := registry.Create("memory", config)
if err != nil {
log.Fatal(err)
}
// 写入数据
point := DataPoint{
DeviceID: "device1",
MetricCode: "temperature",
Labels: map[string]string{"location": "room1"},
Value: 25.5,
Timestamp: time.Now().UnixNano(),
}
err = engine.WritePoint(context.Background(), point)
// 查询数据
query := NewQueryBuilder().
ForMetric("temperature").
WithTimeRange(startTime, endTime).
WithTag("location", OpEqual, "room1").
Build()
result, err := engine.Query(context.Background(), query)
if err != nil {
log.Fatal(err)
}
// 处理结果
if tsResult, ok := result.(*TimeSeriesResult); ok {
for _, point := range tsResult.Points {
fmt.Printf("Time: %v, Value: %v\n",
time.Unix(0, point.Timestamp), point.Value)
}
}
```

View File

@ -0,0 +1,402 @@
# WriteBufferHandler 接口实现设计方案
本文档描述了 Bolt 和 Memory 引擎如何实现 `WriteBufferHandler` 接口的设计方案,以提高时序数据库的写入性能。
## 背景
当前的时序数据库实现中,`WriteBufferHandler` 接口和 `WriteBuffer` 结构体已经定义,但 Bolt 和 Memory 引擎尚未实现该接口。通过实现这个接口,我们可以利用缓冲机制来提高写入性能,特别是在高并发场景下。
## 设计目标
1. 实现 `WriteBufferHandler` 接口,包括 `WriteToBuffer`、`FlushBuffer` 和 `ValidatePoint` 方法
2. 优化批量写入性能,减少 I/O 操作
3. 确保数据一致性和错误处理
4. 支持高并发写入(>500K ops/sec
## 1. Bolt 引擎实现
Bolt 是一个基于文件的键值存储,通常使用事务来管理写入操作。以下是 Bolt 引擎实现 `WriteBufferHandler` 的方案:
```go
// pkg/engine/bolt/bolt_write.go
package bolt
import (
"encoding/binary"
"encoding/json"
"fmt"
"sync"
"time"
"git.pyer.club/kingecg/gotidb/pkg/engine"
bolt "go.etcd.io/bbolt"
)
// 确保 BoltEngine 实现了 WriteBufferHandler 接口
var _ engine.WriteBufferHandler = (*BoltEngine)(nil)
// 临时存储结构,用于在事务提交前缓存数据点
type writeCache struct {
points map[string][]engine.DataPoint // 按序列ID分组的数据点
mu sync.Mutex
}
// 初始化写缓存
func (b *BoltEngine) initWriteCache() {
b.writeCache = &writeCache{
points: make(map[string][]engine.DataPoint),
}
}
// WriteToBuffer 实现 WriteBufferHandler 接口
// 将数据点添加到临时缓存,不立即写入数据库
func (b *BoltEngine) WriteToBuffer(point engine.DataPoint) error {
// 验证引擎状态
if !b.opened || b.closed {
return fmt.Errorf("bolt engine not open")
}
// 验证数据点
if err := b.validateDataPoint(point); err != nil {
return err
}
// 获取序列ID
seriesID := point.GetSeriesID()
// 添加到临时缓存
b.writeCache.mu.Lock()
defer b.writeCache.mu.Unlock()
if b.writeCache.points == nil {
b.writeCache.points = make(map[string][]engine.DataPoint)
}
b.writeCache.points[seriesID] = append(b.writeCache.points[seriesID], point)
return nil
}
// FlushBuffer 实现 WriteBufferHandler 接口
// 将临时缓存中的数据点写入数据库
func (b *BoltEngine) FlushBuffer() error {
// 验证引擎状态
if !b.opened || b.closed {
return fmt.Errorf("bolt engine not open")
}
// 获取并清空临时缓存
b.writeCache.mu.Lock()
points := b.writeCache.points
b.writeCache.points = make(map[string][]engine.DataPoint)
b.writeCache.mu.Unlock()
// 如果没有数据点,直接返回
if len(points) == 0 {
return nil
}
// 开始写入事务
return b.db.Update(func(tx *bolt.Tx) error {
// 获取或创建索引桶
indexBucket, err := tx.CreateBucketIfNotExists([]byte(indexBucketName))
if err != nil {
return fmt.Errorf("failed to create index bucket: %v", err)
}
// 按序列处理数据点
for seriesID, seriesPoints := range points {
// 获取或创建序列桶
bucketName := seriesBucketPrefix + seriesID
bucket, err := tx.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
return fmt.Errorf("failed to create series bucket: %v", err)
}
// 更新索引
if err := indexBucket.Put([]byte(seriesID), []byte{1}); err != nil {
return fmt.Errorf("failed to update index: %v", err)
}
// 写入数据点
for _, point := range seriesPoints {
// 序列化数据点
data, err := json.Marshal(point)
if err != nil {
return fmt.Errorf("failed to marshal data point: %v", err)
}
// 使用时间戳作为键
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, uint64(point.Timestamp))
// 写入数据
if err := bucket.Put(key, data); err != nil {
return fmt.Errorf("failed to write data point: %v", err)
}
// 更新统计信息
b.stats.PointsCount++
}
}
// 更新统计信息
b.stats.LastWriteTime = time.Now()
return nil
})
}
// ValidatePoint 实现 WriteBufferHandler 接口
// 验证数据点是否可以写入,但不实际写入
func (b *BoltEngine) ValidatePoint(point engine.DataPoint) error {
// 验证引擎状态
if !b.opened || b.closed {
return fmt.Errorf("bolt engine not open")
}
// 验证数据点
return b.validateDataPoint(point)
}
// validateDataPoint 验证数据点
func (b *BoltEngine) validateDataPoint(point engine.DataPoint) error {
// 验证时间戳
if point.Timestamp <= 0 {
return fmt.Errorf("invalid timestamp: %d", point.Timestamp)
}
// 验证标签
if len(point.Labels) == 0 {
return fmt.Errorf("data point must have at least one label")
}
return nil
}
```
## 2. Memory 引擎实现
Memory 引擎将数据存储在内存中,通常使用 map 和锁来管理数据。以下是 Memory 引擎实现 `WriteBufferHandler` 的方案:
```go
// pkg/engine/memory/memory_write.go
package memory
import (
"fmt"
"math"
"sync"
"time"
"git.pyer.club/kingecg/gotidb/pkg/engine"
)
// 确保 MemoryEngine 实现了 WriteBufferHandler 接口
var _ engine.WriteBufferHandler = (*MemoryEngine)(nil)
// 临时写缓存
type memWriteCache struct {
points map[string][]engine.DataPoint
mu sync.Mutex
}
// 初始化写缓存
func (m *MemoryEngine) initWriteCache() {
m.writeCache = &memWriteCache{
points: make(map[string][]engine.DataPoint),
}
}
// WriteToBuffer 实现 WriteBufferHandler 接口
// 将数据点添加到临时缓存
func (m *MemoryEngine) WriteToBuffer(point engine.DataPoint) error {
// 验证引擎状态
if !m.opened || m.closed {
return fmt.Errorf("memory engine not open")
}
// 验证数据点
if err := m.validateDataPoint(point); err != nil {
return err
}
// 获取序列ID
seriesID := point.GetSeriesID()
// 添加到临时缓存
m.writeCache.mu.Lock()
defer m.writeCache.mu.Unlock()
if m.writeCache.points == nil {
m.writeCache.points = make(map[string][]engine.DataPoint)
}
m.writeCache.points[seriesID] = append(m.writeCache.points[seriesID], point)
return nil
}
// FlushBuffer 实现 WriteBufferHandler 接口
// 将临时缓存中的数据点写入内存存储
func (m *MemoryEngine) FlushBuffer() error {
// 验证引擎状态
if !m.opened || m.closed {
return fmt.Errorf("memory engine not open")
}
// 获取并清空临时缓存
m.writeCache.mu.Lock()
points := m.writeCache.points
m.writeCache.points = make(map[string][]engine.DataPoint)
m.writeCache.mu.Unlock()
// 如果没有数据点,直接返回
if len(points) == 0 {
return nil
}
// 写入数据
m.mu.Lock()
defer m.mu.Unlock()
for seriesID, seriesPoints := range points {
// 获取或创建序列
series, exists := m.series[seriesID]
if !exists {
series = &memorySeries{
id: seriesID,
points: make(map[int64]engine.DataPoint),
}
m.series[seriesID] = series
m.stats.SeriesCount++
}
// 写入数据点
for _, point := range seriesPoints {
// 在内存引擎中实现环形队列覆盖机制
// 如果序列中的数据点数量达到限制,删除最旧的数据点
if m.maxPointsPerSeries > 0 && len(series.points) >= m.maxPointsPerSeries {
// 找到最旧的时间戳
var oldestTimestamp int64 = math.MaxInt64
for ts := range series.points {
if ts < oldestTimestamp {
oldestTimestamp = ts
}
}
// 删除最旧的数据点
delete(series.points, oldestTimestamp)
}
// 添加新数据点
series.points[point.Timestamp] = point
m.stats.PointsCount++
}
}
// 更新统计信息
m.stats.LastWriteTime = time.Now()
return nil
}
// ValidatePoint 实现 WriteBufferHandler 接口
// 验证数据点是否可以写入,但不实际写入
func (m *MemoryEngine) ValidatePoint(point engine.DataPoint) error {
// 验证引擎状态
if !m.opened || m.closed {
return fmt.Errorf("memory engine not open")
}
// 验证数据点
return m.validateDataPoint(point)
}
// validateDataPoint 验证数据点
func (m *MemoryEngine) validateDataPoint(point engine.DataPoint) error {
// 验证时间戳
if point.Timestamp <= 0 {
return fmt.Errorf("invalid timestamp: %d", point.Timestamp)
}
// 验证标签
if len(point.Labels) == 0 {
return fmt.Errorf("data point must have at least one label")
}
return nil
}
```
## 3. 集成到引擎的 Write 方法中
最后,我们需要修改引擎的 `Write` 方法,使用 `WriteBuffer` 来处理批量写入:
```go
// 以 Bolt 引擎为例
func (b *BoltEngine) Write(ctx context.Context, points []DataPoint) error {
// 验证引擎状态
if !b.opened || b.closed {
return fmt.Errorf("bolt engine not open")
}
// 创建写缓冲区
// 缓冲区大小可以根据性能测试调整
buffer := engine.NewWriteBuffer(b, 1000)
// 写入数据点
for _, point := range points {
if err := buffer.Write(point); err != nil {
return fmt.Errorf("failed to write data point: %v", err)
}
}
// 刷新缓冲区,确保所有数据都被写入
if err := buffer.Flush(); err != nil {
return fmt.Errorf("failed to flush buffer: %v", err)
}
return nil
}
```
## 4. 初始化和清理
在引擎的 `Open``Close` 方法中,我们需要初始化和清理写缓存:
```go
// 以 Bolt 引擎为例
func (b *BoltEngine) Open() error {
// 现有的打开逻辑...
// 初始化写缓存
b.initWriteCache()
return nil
}
func (b *BoltEngine) Close() error {
// 现有的关闭逻辑...
// 清理写缓存
b.writeCache = nil
return nil
}
```
## 5. 性能优化建议
1. **批量大小调优**:根据实际工作负载调整 `WriteBuffer` 的大小
2. **并发控制**:使用细粒度锁减少锁竞争
3. **内存管理**:对于 Memory 引擎,实现数据点过期和清理策略
4. **监控指标**:添加缓冲区性能指标(如缓冲区命中率、平均批量大小等)
## 6. 实现注意事项
1. **错误处理**:确保在出错时正确清理资源
2. **事务管理**:对于 Bolt 引擎,确保事务正确提交或回滚
3. **并发安全**:确保所有操作都是线程安全的
4. **内存泄

View File

@ -1,7 +1,7 @@
时序数据库需求的总结及推荐类库
一、核心需求总结
1.数据存储逻辑
每个数据点仅保留最近30次变化值(环形队列覆盖机制)
每个数据点仅保留最近30次变化值(环形队列覆盖机制,只在内存引擎实现这个特性)
精确追踪当前值的持续时间(记录值变更的起始时间戳)
2.读写性能要求
高速内存处理:微秒级写入延迟,支持高并发(>500K ops/sec)

2
go.mod
View File

@ -12,7 +12,7 @@ require (
github.com/stretchr/testify v1.10.0
)
require go.etcd.io/bbolt v1.4.1 // indirect
require go.etcd.io/bbolt v1.4.1
require (
github.com/beorn7/perks v1.0.1 // indirect

7
go.sum
View File

@ -108,8 +108,7 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
@ -128,15 +127,11 @@ golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0=
golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=

98
pkg/engine/bolt/README.md Normal file
View File

@ -0,0 +1,98 @@
# Bolt 存储引擎
## 设计原则
1. **文件大小限制**每个源代码文件不超过300行以保持代码的可维护性和可读性。
2. **文件组织**
- `bolt.go` - 主要结构和基本方法
- `bolt_query.go` - 查询相关实现
- `bolt_utils.go` - 工具函数和辅助方法
- `bolt_maintenance.go` - 维护相关方法
3. **职责分离**
- 每个文件都有明确的职责
- 相关的功能被组织在同一个文件中
- 避免文件之间的循环依赖
## 主要功能
1. **基本操作**
- 创建和初始化引擎
- 打开和关闭数据库
- 写入数据点
2. **查询功能**
- 最新值查询
- 原始数据查询
- 聚合查询
- 值持续时间查询
3. **维护功能**
- 数据压缩
- 空序列清理
- 统计信息管理
## 数据组织
1. **Bucket 结构**
- `series_*` - 存储序列数据
- `meta` - 存储元数据
- `index` - 存储序列索引
2. **键值设计**
- 序列数据时间戳8字节作为键
- 元数据:预定义的键(如 "stats"
- 索引序列ID作为键和值
## 性能考虑
1. **并发控制**
- 使用读写锁控制并发访问
- 细粒度的事务管理
2. **内存管理**
- 复用对象和缓冲区
- 批量操作优化
3. **磁盘优化**
- 定期压缩
- 空间回收
## 使用示例
```go
// 创建引擎实例
config := &engine.BoltEngineConfig{
Path: "/path/to/data.db",
}
eng, err := NewBoltEngine(config)
if err != nil {
log.Fatal(err)
}
// 打开引擎
if err := eng.Open(); err != nil {
log.Fatal(err)
}
defer eng.Close()
// 写入数据
points := []engine.DataPoint{
{
Timestamp: time.Now().UnixNano(),
Value: 42.0,
Labels: map[string]string{"sensor": "temp"},
},
}
if err := eng.Write(context.Background(), points); err != nil {
log.Fatal(err)
}
```
## 注意事项
1. 在添加新功能时,确保遵循文件大小限制
2. 保持一致的错误处理和日志记录方式
3. 定期运行维护操作(压缩和清理)
4. 监控引擎统计信息以优化性能

228
pkg/engine/bolt/bolt.go Normal file
View File

@ -0,0 +1,228 @@
package bolt
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"git.pyer.club/kingecg/gotidb/pkg/engine"
bolt "go.etcd.io/bbolt"
)
const (
// bucket 名称
seriesBucketPrefix = "series_" // 序列数据 bucket 前缀
metaBucketName = "meta" // 元数据 bucket
indexBucketName = "index" // 索引 bucket
// 元数据键
statsKey = "stats" // 统计信息键
)
// BoltEngine 实现基于 BoltDB 的存储引擎
type BoltEngine struct {
mu sync.RWMutex
config *engine.BoltEngineConfig
db *bolt.DB
stats engine.EngineStats
opened bool
closed bool
}
// 确保 BoltEngine 实现了 Engine 接口
var _ engine.Engine = (*BoltEngine)(nil)
// NewBoltEngine 创建一个新的 Bolt 引擎
func NewBoltEngine(config *engine.BoltEngineConfig) (engine.Engine, error) {
if config == nil {
return nil, fmt.Errorf("bolt engine config cannot be nil")
}
// 确保数据目录存在
if err := os.MkdirAll(filepath.Dir(config.Path), 0755); err != nil {
return nil, fmt.Errorf("failed to create data directory: %v", err)
}
return &BoltEngine{
config: config,
stats: engine.EngineStats{
LastWriteTime: time.Now(),
},
}, nil
}
// Open 实现 Engine 接口
func (b *BoltEngine) Open() error {
b.mu.Lock()
defer b.mu.Unlock()
if b.opened {
return fmt.Errorf("bolt engine already opened")
}
if b.closed {
return fmt.Errorf("bolt engine already closed")
}
// 打开数据库
db, err := bolt.Open(b.config.Path, 0600, &bolt.Options{
Timeout: 1 * time.Second,
})
if err != nil {
return fmt.Errorf("failed to open bolt database: %v", err)
}
// 创建必要的 bucket
err = db.Update(func(tx *bolt.Tx) error {
// 创建元数据 bucket
if _, err := tx.CreateBucketIfNotExists([]byte(metaBucketName)); err != nil {
return fmt.Errorf("failed to create meta bucket: %v", err)
}
// 创建索引 bucket
if _, err := tx.CreateBucketIfNotExists([]byte(indexBucketName)); err != nil {
return fmt.Errorf("failed to create index bucket: %v", err)
}
return nil
})
if err != nil {
db.Close()
return fmt.Errorf("failed to create buckets: %v", err)
}
b.db = db
b.opened = true
// 加载统计信息
if err := b.loadStats(); err != nil {
return fmt.Errorf("failed to load stats: %v", err)
}
return nil
}
// Close 实现 Engine 接口
func (b *BoltEngine) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
if !b.opened {
return fmt.Errorf("bolt engine not opened")
}
if b.closed {
return fmt.Errorf("bolt engine already closed")
}
// 保存统计信息
if err := b.saveStats(); err != nil {
return fmt.Errorf("failed to save stats: %v", err)
}
// 关闭数据库
if err := b.db.Close(); err != nil {
return fmt.Errorf("failed to close bolt database: %v", err)
}
b.closed = true
return nil
}
// Write 实现 Engine 接口
func (b *BoltEngine) Write(ctx context.Context, points []engine.DataPoint) error {
if len(points) == 0 {
return nil
}
b.mu.Lock()
defer b.mu.Unlock()
if !b.opened || b.closed {
return fmt.Errorf("bolt engine not open")
}
startTime := time.Now()
// 写入数据点
err := b.db.Update(func(tx *bolt.Tx) error {
for _, point := range points {
seriesID := point.GetSeriesID()
if seriesID == "" {
b.stats.WriteErrors++
continue
}
// 获取或创建序列 bucket
bucketName := seriesBucketPrefix + seriesID
bucket, err := tx.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
return fmt.Errorf("failed to create series bucket: %v", err)
}
// 序列化数据点
value, err := json.Marshal(point)
if err != nil {
return fmt.Errorf("failed to marshal data point: %v", err)
}
// 使用时间戳作为键
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, uint64(point.Timestamp))
// 写入数据点
if err := bucket.Put(key, value); err != nil {
return fmt.Errorf("failed to write data point: %v", err)
}
// 更新索引
indexBucket := tx.Bucket([]byte(indexBucketName))
if err := indexBucket.Put([]byte(seriesID), []byte(seriesID)); err != nil {
return fmt.Errorf("failed to update index: %v", err)
}
}
return nil
})
if err != nil {
return fmt.Errorf("failed to write points: %v", err)
}
// 更新统计信息
b.stats.PointsCount += int64(len(points))
b.stats.LastWriteTime = time.Now()
b.stats.WriteLatency = time.Since(startTime) / time.Duration(len(points))
// 更新序列数
if err := b.updateSeriesCount(); err != nil {
return fmt.Errorf("failed to update series count: %v", err)
}
return nil
}
// Stats 实现 Engine 接口
func (b *BoltEngine) Stats() engine.EngineStats {
b.mu.RLock()
defer b.mu.RUnlock()
stats := b.stats
// 获取数据库大小
if b.db != nil {
stats.DiskUsage = b.db.Stats().TxStats.PageCount * int64(b.db.Info().PageSize)
}
return stats
}
// 注册 Bolt 引擎创建函数
func init() {
engine.RegisterBoltEngine(NewBoltEngine)
}

View File

@ -0,0 +1,80 @@
package bolt
import (
"fmt"
"time"
bolt "go.etcd.io/bbolt"
)
// Compact 实现 Engine 接口
func (b *BoltEngine) Compact() error {
b.mu.Lock()
defer b.mu.Unlock()
if !b.opened || b.closed {
return fmt.Errorf("bolt engine not open")
}
// 执行数据库压缩
if err := b.db.Sync(); err != nil {
return fmt.Errorf("failed to sync database: %v", err)
}
// 更新统计信息
b.stats.CompactionCount++
b.stats.LastCompaction = time.Now()
return nil
}
// Cleanup 实现 Engine 接口
func (b *BoltEngine) Cleanup() error {
b.mu.Lock()
defer b.mu.Unlock()
if !b.opened || b.closed {
return fmt.Errorf("bolt engine not open")
}
// 清理空序列
err := b.db.Update(func(tx *bolt.Tx) error {
indexBucket := tx.Bucket([]byte(indexBucketName))
if indexBucket == nil {
return fmt.Errorf("index bucket not found")
}
return indexBucket.ForEach(func(k, v []byte) error {
seriesID := string(k)
bucket := tx.Bucket([]byte(seriesBucketPrefix + seriesID))
if bucket == nil {
return nil
}
// 如果序列为空,删除它
if bucket.Stats().KeyN == 0 {
if err := tx.DeleteBucket([]byte(seriesBucketPrefix + seriesID)); err != nil {
return fmt.Errorf("failed to delete empty series bucket: %v", err)
}
// 从索引中删除
if err := indexBucket.Delete(k); err != nil {
return fmt.Errorf("failed to delete series from index: %v", err)
}
}
return nil
})
})
if err != nil {
return fmt.Errorf("failed to cleanup: %v", err)
}
// 更新序列数
if err := b.updateSeriesCount(); err != nil {
return fmt.Errorf("failed to update series count: %v", err)
}
return nil
}

View File

@ -0,0 +1,203 @@
package bolt
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"time"
"git.pyer.club/kingecg/gotidb/pkg/engine"
bolt "go.etcd.io/bbolt"
)
// Query 实现 Engine 接口
func (b *BoltEngine) Query(ctx context.Context, query engine.Query) (engine.QueryResult, error) {
b.mu.RLock()
defer b.mu.RUnlock()
if !b.opened || b.closed {
return nil, fmt.Errorf("bolt engine not open")
}
startTime := time.Now()
var result engine.QueryResult
err := b.db.View(func(tx *bolt.Tx) error {
// 遍历所有序列
indexBucket := tx.Bucket([]byte(indexBucketName))
if indexBucket == nil {
return fmt.Errorf("index bucket not found")
}
return indexBucket.ForEach(func(k, v []byte) error {
seriesID := string(k)
bucket := tx.Bucket([]byte(seriesBucketPrefix + seriesID))
if bucket == nil {
return nil
}
// 根据查询类型执行不同的查询
var seriesResult *engine.SeriesResult
var err error
switch query.Type {
case engine.QueryTypeLatest:
seriesResult, err = b.queryLatest(bucket, seriesID, query)
case engine.QueryTypeRaw:
seriesResult, err = b.queryRaw(bucket, seriesID, query)
case engine.QueryTypeAggregate:
seriesResult, err = b.queryAggregate(bucket, seriesID, query)
case engine.QueryTypeValueDuration:
seriesResult, err = b.queryValueDuration(bucket, seriesID, query)
default:
return fmt.Errorf("unsupported query type: %s", query.Type)
}
if err != nil {
return err
}
if seriesResult != nil {
result = append(result, *seriesResult)
}
return nil
})
})
if err != nil {
b.stats.QueryErrors++
return nil, fmt.Errorf("failed to execute query: %v", err)
}
// 更新统计信息
b.stats.QueryLatency = time.Since(startTime)
return result, nil
}
// queryLatest 执行最新值查询
func (b *BoltEngine) queryLatest(bucket *bolt.Bucket, seriesID string, query engine.Query) (*engine.SeriesResult, error) {
cursor := bucket.Cursor()
k, v := cursor.Last()
if k == nil {
return nil, nil
}
var point engine.DataPoint
if err := json.Unmarshal(v, &point); err != nil {
return nil, fmt.Errorf("failed to unmarshal data point: %v", err)
}
// 检查标签是否匹配
if !matchTags(point.Labels, query.Tags) {
return nil, nil
}
return &engine.SeriesResult{
SeriesID: seriesID,
Points: []engine.DataPoint{point},
}, nil
}
// queryRaw 执行原始数据查询
func (b *BoltEngine) queryRaw(bucket *bolt.Bucket, seriesID string, query engine.Query) (*engine.SeriesResult, error) {
var points []engine.DataPoint
cursor := bucket.Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
timestamp := int64(binary.BigEndian.Uint64(k))
if timestamp < query.StartTime {
continue
}
if timestamp > query.EndTime {
break
}
var point engine.DataPoint
if err := json.Unmarshal(v, &point); err != nil {
return nil, fmt.Errorf("failed to unmarshal data point: %v", err)
}
if len(points) == 0 {
// 检查标签是否匹配
if !matchTags(point.Labels, query.Tags) {
return nil, nil
}
}
points = append(points, point)
// 应用限制
if query.Limit > 0 && len(points) >= query.Limit {
break
}
}
if len(points) == 0 {
return nil, nil
}
return &engine.SeriesResult{
SeriesID: seriesID,
Points: points,
}, nil
}
// queryAggregate 执行聚合查询
func (b *BoltEngine) queryAggregate(bucket *bolt.Bucket, seriesID string, query engine.Query) (*engine.SeriesResult, error) {
var points []engine.DataPoint
var firstPoint engine.DataPoint
cursor := bucket.Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
timestamp := int64(binary.BigEndian.Uint64(k))
if timestamp < query.StartTime {
continue
}
if timestamp > query.EndTime {
break
}
var point engine.DataPoint
if err := json.Unmarshal(v, &point); err != nil {
return nil, fmt.Errorf("failed to unmarshal data point: %v", err)
}
if len(points) == 0 {
firstPoint = point
// 检查标签是否匹配
if !matchTags(point.Labels, query.Tags) {
return nil, nil
}
}
points = append(points, point)
}
if len(points) == 0 {
return nil, nil
}
// 计算聚合值
aggregateValue := calculateAggregate(points, query.AggregateType)
// 创建聚合结果点
aggregatePoint := engine.DataPoint{
Timestamp: query.EndTime,
Value: aggregateValue,
Labels: firstPoint.Labels,
}
return &engine.SeriesResult{
SeriesID: seriesID,
Points: []engine.DataPoint{aggregatePoint},
}, nil
}
// queryValueDuration 执行值持续时间查询
func (b *BoltEngine) queryValueDuration(bucket *bolt.Bucket, seriesID string, query engine.Query) (*engine.SeriesResult, error) {
// 简化实现,实际应该计算每个值的持续时间
return b.queryRaw(bucket, seriesID, query)
}

View File

@ -0,0 +1,131 @@
package bolt
import (
"encoding/json"
"fmt"
"git.pyer.club/kingecg/gotidb/pkg/engine"
bolt "go.etcd.io/bbolt"
)
// loadStats 从数据库加载统计信息
func (b *BoltEngine) loadStats() error {
return b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(metaBucketName))
if bucket == nil {
return fmt.Errorf("meta bucket not found")
}
statsData := bucket.Get([]byte(statsKey))
if statsData == nil {
// 没有保存的统计信息,使用默认值
return nil
}
if err := json.Unmarshal(statsData, &b.stats); err != nil {
return fmt.Errorf("failed to unmarshal stats: %v", err)
}
return nil
})
}
// saveStats 将统计信息保存到数据库
func (b *BoltEngine) saveStats() error {
return b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(metaBucketName))
if bucket == nil {
return fmt.Errorf("meta bucket not found")
}
statsData, err := json.Marshal(b.stats)
if err != nil {
return fmt.Errorf("failed to marshal stats: %v", err)
}
if err := bucket.Put([]byte(statsKey), statsData); err != nil {
return fmt.Errorf("failed to save stats: %v", err)
}
return nil
})
}
// updateSeriesCount 更新序列数统计信息
func (b *BoltEngine) updateSeriesCount() error {
var count int64
err := b.db.View(func(tx *bolt.Tx) error {
indexBucket := tx.Bucket([]byte(indexBucketName))
if indexBucket == nil {
return fmt.Errorf("index bucket not found")
}
count = int64(indexBucket.Stats().KeyN)
return nil
})
if err != nil {
return err
}
b.stats.SeriesCount = count
return nil
}
// matchTags 检查数据点的标签是否匹配查询标签
func matchTags(pointTags, queryTags map[string]string) bool {
for k, v := range queryTags {
if pointTags[k] != v {
return false
}
}
return true
}
// calculateAggregate 计算聚合值
func calculateAggregate(points []engine.DataPoint, aggregateType engine.AggregateType) float64 {
if len(points) == 0 {
return 0
}
switch aggregateType {
case engine.AggregateTypeAvg:
sum := 0.0
for _, p := range points {
sum += p.Value
}
return sum / float64(len(points))
case engine.AggregateTypeSum:
sum := 0.0
for _, p := range points {
sum += p.Value
}
return sum
case engine.AggregateTypeMin:
min := points[0].Value
for _, p := range points {
if p.Value < min {
min = p.Value
}
}
return min
case engine.AggregateTypeMax:
max := points[0].Value
for _, p := range points {
if p.Value > max {
max = p.Value
}
}
return max
case engine.AggregateTypeCount:
return float64(len(points))
default:
return 0
}
}

306
pkg/engine/buffer_test.go Normal file
View File

@ -0,0 +1,306 @@
package engine
import (
"errors"
"testing"
"time"
)
// errorBufferHandler 是一个会返回错误的WriteBufferHandler实现
type errorBufferHandler struct {
writeError bool
flushError bool
writeCount int
flushCount int
lastWritten DataPoint
}
func (h *errorBufferHandler) WriteToBuffer(point DataPoint) error {
h.writeCount++
h.lastWritten = point
if h.writeError {
return errors.New("write error")
}
return nil
}
func (h *errorBufferHandler) ValidatePoint(point DataPoint) error {
// 如果设置了写入错误标志,返回错误
if h.writeError {
return errors.New("simulated write error")
}
return nil
}
func (h *errorBufferHandler) FlushBuffer() error {
h.flushCount++
if h.flushError {
return errors.New("flush error")
}
return nil
}
func TestWriteBuffer_Creation(t *testing.T) {
handler := &mockBufferHandler{}
// 测试正常创建
buffer := NewWriteBuffer(handler, 10)
if buffer == nil {
t.Fatal("NewWriteBuffer() returned nil")
}
if buffer.size != 10 {
t.Errorf("Buffer size = %d, want 10", buffer.size)
}
// 测试零大小缓冲区
buffer = NewWriteBuffer(handler, 0)
if buffer.size != 0 {
t.Errorf("Buffer size = %d, want 0", buffer.size)
}
// 测试负大小缓冲区应该被视为0
buffer = NewWriteBuffer(handler, -1)
if buffer.size != -1 {
t.Errorf("Buffer size = %d, want -1", buffer.size)
}
}
func TestWriteBuffer_SingleWrite(t *testing.T) {
handler := &mockBufferHandler{
points: make([]DataPoint, 0),
}
buffer := NewWriteBuffer(handler, 5)
// 写入单个数据点
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: 42.0,
Labels: map[string]string{"test": "single"},
}
if err := buffer.Write(point); err != nil {
t.Errorf("Write() error = %v", err)
}
// 检查缓冲区状态
if len(buffer.buffer) != 1 {
t.Errorf("Buffer length = %d, want 1", len(buffer.buffer))
}
// 检查处理器状态(不应该有数据点)
if len(handler.points) != 0 {
t.Errorf("Handler points = %d, want 0", len(handler.points))
}
}
func TestWriteBuffer_MultipleWrites(t *testing.T) {
handler := &mockBufferHandler{
points: make([]DataPoint, 0),
}
buffer := NewWriteBuffer(handler, 5)
// 写入多个数据点,但不超过缓冲区大小
for i := 0; i < 3; i++ {
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: float64(i),
Labels: map[string]string{"test": "multiple"},
}
if err := buffer.Write(point); err != nil {
t.Errorf("Write() error = %v", err)
}
}
// 检查缓冲区状态
if len(buffer.buffer) != 3 {
t.Errorf("Buffer length = %d, want 3", len(buffer.buffer))
}
// 检查处理器状态(不应该有数据点)
if len(handler.points) != 0 {
t.Errorf("Handler points = %d, want 0", len(handler.points))
}
}
func TestWriteBuffer_AutoFlush(t *testing.T) {
handler := &mockBufferHandler{
points: make([]DataPoint, 0),
}
buffer := NewWriteBuffer(handler, 3)
// 写入足够的数据点触发自动刷新
for i := 0; i < 3; i++ {
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: float64(i),
Labels: map[string]string{"test": "autoflush"},
}
if err := buffer.Write(point); err != nil {
t.Errorf("Write() error = %v", err)
}
}
// 检查缓冲区状态(应该已经刷新)
if len(buffer.buffer) != 0 {
t.Errorf("Buffer length = %d, want 0", len(buffer.buffer))
}
// 检查处理器状态应该有3个数据点
if len(handler.toflush) != 3 {
t.Errorf("Handler points = %d, want 3", len(handler.points))
}
// 再写入一个数据点
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: 42.0,
Labels: map[string]string{"test": "after_flush"},
}
if err := buffer.Write(point); err != nil {
t.Errorf("Write() error = %v", err)
}
// 检查缓冲区状态
if len(buffer.buffer) != 1 {
t.Errorf("Buffer length = %d, want 1", len(buffer.buffer))
}
// 检查处理器状态仍然应该有3个数据点
if len(handler.points) != 3 {
t.Errorf("Handler points = %d, want 3", len(handler.points))
}
}
func TestWriteBuffer_ManualFlush(t *testing.T) {
handler := &mockBufferHandler{
points: make([]DataPoint, 0),
}
buffer := NewWriteBuffer(handler, 10)
// 写入几个数据点
for i := 0; i < 5; i++ {
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: float64(i),
Labels: map[string]string{"test": "manual_flush"},
}
if err := buffer.Write(point); err != nil {
t.Errorf("Write() error = %v", err)
}
}
// 检查缓冲区状态
if len(buffer.buffer) != 5 {
t.Errorf("Buffer length = %d, want 5", len(buffer.buffer))
}
// 手动刷新
if err := buffer.Flush(); err != nil {
t.Errorf("Flush() error = %v", err)
}
// 检查缓冲区状态(应该已经刷新)
if len(buffer.buffer) != 0 {
t.Errorf("Buffer length = %d, want 0", len(buffer.buffer))
}
// 检查处理器状态应该有5个数据点
if len(handler.points) != 5 {
t.Errorf("Handler points = %d, want 5", len(handler.points))
}
// 再次刷新(应该没有效果)
if err := buffer.Flush(); err != nil {
t.Errorf("Flush() error = %v", err)
}
// 检查处理器状态仍然应该有5个数据点
if len(handler.points) != 5 {
t.Errorf("Handler points = %d, want 5", len(handler.points))
}
}
func TestWriteBuffer_ErrorHandling(t *testing.T) {
// 测试写入错误
writeErrorHandler := &errorBufferHandler{
writeError: true,
}
buffer := NewWriteBuffer(writeErrorHandler, 3)
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: 42.0,
Labels: map[string]string{"test": "error"},
}
// 写入应该返回错误
err := buffer.Write(point)
if err == nil {
t.Error("Write() did not return error")
}
// 检查处理器状态
if writeErrorHandler.writeCount != 1 {
t.Errorf("Handler write count = %d, want 1", writeErrorHandler.writeCount)
}
// 测试刷新错误
flushErrorHandler := &errorBufferHandler{
flushError: true,
}
buffer = NewWriteBuffer(flushErrorHandler, 3)
// 写入几个数据点
for i := 0; i < 3; i++ {
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: float64(i),
Labels: map[string]string{"test": "flush_error"},
}
// 最后一个写入应该触发刷新,并返回错误
err := buffer.Write(point)
if i == 2 && err == nil {
t.Error("Write() did not return error on flush")
}
}
// 检查处理器状态
if flushErrorHandler.writeCount != 3 {
t.Errorf("Handler write count = %d, want 3", flushErrorHandler.writeCount)
}
if flushErrorHandler.flushCount != 1 {
t.Errorf("Handler flush count = %d, want 1", flushErrorHandler.flushCount)
}
}
func TestWriteBuffer_ZeroSize(t *testing.T) {
handler := &mockBufferHandler{
points: make([]DataPoint, 0),
}
buffer := NewWriteBuffer(handler, 0)
// 写入数据点(应该立即刷新)
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: 42.0,
Labels: map[string]string{"test": "zero_size"},
}
if err := buffer.Write(point); err != nil {
t.Errorf("Write() error = %v", err)
}
// 检查缓冲区状态
if len(buffer.buffer) != 0 {
t.Errorf("Buffer length = %d, want 0", len(buffer.buffer))
}
// 检查处理器状态应该有1个数据点
if len(handler.toflush) != 1 {
t.Errorf("Handler points = %d, want 1", len(handler.points))
}
}

141
pkg/engine/config.go Normal file
View File

@ -0,0 +1,141 @@
package engine
import "time"
// EngineConfig 存储引擎配置接口
type EngineConfig interface {
// 获取引擎类型
Type() string
}
// BaseConfig 基础配置,所有引擎共享
type BaseConfig struct {
// 数据保留时间,超过此时间的数据将被清理
RetentionPeriod time.Duration
// 是否启用同步写入,启用后每次写入都会立即持久化
SyncWrites bool
// 写入缓冲区大小0表示不使用缓冲区
WriteBufferSize int
// 是否启用压缩
EnableCompaction bool
// 压缩间隔
CompactionInterval time.Duration
}
// MemoryEngineConfig 内存引擎配置
type MemoryEngineConfig struct {
BaseConfig
// 每个数据点保留的历史值数量(环形队列大小)
MaxHistoryValues int
// 最大数据点数量超过此数量将拒绝写入0表示不限制
MaxPoints int
// 是否启用持久化,启用后将定期将数据写入磁盘
EnablePersistence bool
// 持久化文件路径仅在EnablePersistence为true时有效
PersistencePath string
// 持久化间隔仅在EnablePersistence为true时有效
PersistenceInterval time.Duration
}
// 实现EngineConfig接口
func (c *MemoryEngineConfig) Type() string {
return "memory"
}
// MemoryConfig 返回内存引擎配置
func (c *MemoryEngineConfig) MemoryConfig() *MemoryEngineConfig {
return c
}
// BoltConfig 返回Bolt引擎配置
func (c *MemoryEngineConfig) BoltConfig() *BoltEngineConfig {
return nil
}
// BoltEngineConfig Bolt引擎配置
type BoltEngineConfig struct {
BaseConfig
// 数据库文件路径
Path string
// 存储桶名称
BucketName string
// 是否启用WAL日志
EnableWAL bool
// WAL日志目录仅在EnableWAL为true时有效
WALDir string
// 是否启用批量写入
EnableBatch bool
// 批量写入大小仅在EnableBatch为true时有效
BatchSize int
// 批量写入超时仅在EnableBatch为true时有效
BatchTimeout time.Duration
}
// 实现EngineConfig接口
func (c *BoltEngineConfig) Type() string {
return "bolt"
}
// MemoryConfig 返回内存引擎配置
func (c *BoltEngineConfig) MemoryConfig() *MemoryEngineConfig {
return nil
}
// BoltConfig 返回Bolt引擎配置
func (c *BoltEngineConfig) BoltConfig() *BoltEngineConfig {
return c
}
// NewMemoryEngineConfig 创建默认的内存引擎配置
func NewMemoryEngineConfig() *MemoryEngineConfig {
return &MemoryEngineConfig{
BaseConfig: BaseConfig{
RetentionPeriod: 24 * time.Hour, // 默认保留24小时
SyncWrites: false,
WriteBufferSize: 1000,
EnableCompaction: true,
CompactionInterval: 1 * time.Hour,
},
MaxHistoryValues: 30, // 默认保留30个历史值
MaxPoints: 0, // 默认不限制
EnablePersistence: false,
PersistencePath: "",
PersistenceInterval: 10 * time.Minute,
}
}
// NewBoltEngineConfig 创建默认的Bolt引擎配置
func NewBoltEngineConfig(path string) *BoltEngineConfig {
return &BoltEngineConfig{
BaseConfig: BaseConfig{
RetentionPeriod: 24 * time.Hour, // 默认保留24小时
SyncWrites: false,
WriteBufferSize: 1000,
EnableCompaction: true,
CompactionInterval: 1 * time.Hour,
},
Path: path,
BucketName: "gotidb",
EnableWAL: true,
WALDir: path + "_wal",
EnableBatch: true,
BatchSize: 1000,
BatchTimeout: 1 * time.Second,
}
}

209
pkg/engine/engine.go Normal file
View File

@ -0,0 +1,209 @@
package engine
import (
"context"
"sort"
"strings"
"time"
)
// Engine 定义存储引擎的通用接口
type Engine interface {
// 生命周期管理
Open() error
Close() error
// 数据操作
Write(ctx context.Context, points []DataPoint) error
Query(ctx context.Context, query Query) (QueryResult, error)
// 管理功能
Compact() error
Cleanup() error
// 监控和统计
Stats() EngineStats
}
// DataPoint 表示一个时序数据点
type DataPoint struct {
Timestamp int64
Value float64
Labels map[string]string
Metadata map[string]interface{} // 可选的元数据
}
// GetSeriesID 从标签生成序列ID
func (p DataPoint) GetSeriesID() string {
// 如果没有标签返回默认ID
if len(p.Labels) == 0 {
return "default_series"
}
// 获取所有标签的键,并排序
keys := make([]string, 0, len(p.Labels))
for k := range p.Labels {
keys = append(keys, k)
}
// 按字典序排序键
sort.Strings(keys)
// 构建序列ID
var sb strings.Builder
for i, k := range keys {
if i > 0 {
sb.WriteString(",")
}
sb.WriteString(k)
sb.WriteString("=")
sb.WriteString(p.Labels[k])
}
return sb.String()
}
// QueryType 定义查询类型
type QueryType string
const (
QueryTypeRaw QueryType = "raw" // 原始数据查询
QueryTypeLatest QueryType = "latest" // 最新值查询
QueryTypeAggregate QueryType = "aggregate" // 聚合查询
QueryTypeValueDuration QueryType = "value_duration" // 值持续时间查询
)
// AggregateType 定义聚合类型
type AggregateType string
const (
AggregateTypeAvg AggregateType = "avg"
AggregateTypeSum AggregateType = "sum"
AggregateTypeMin AggregateType = "min"
AggregateTypeMax AggregateType = "max"
AggregateTypeCount AggregateType = "count"
)
// Query 表示查询请求
type Query struct {
Type QueryType
StartTime int64
EndTime int64
Tags map[string]string
Limit int
AggregateType AggregateType // 用于聚合查询
}
// QueryResult 表示查询结果
type QueryResult []SeriesResult
// SeriesResult 表示单个序列的查询结果
type SeriesResult struct {
SeriesID string
Points []DataPoint
}
// EngineStats 存储引擎统计信息
type EngineStats struct {
// 基本统计
PointsCount int64 // 数据点总数
SeriesCount int64 // 序列总数
LastWriteTime time.Time // 最后写入时间
// 性能指标
WriteLatency time.Duration // 平均写入延迟
QueryLatency time.Duration // 平均查询延迟
// 资源使用
MemoryUsage int64 // 内存使用量(字节)
DiskUsage int64 // 磁盘使用量(字节)
// 引擎特定指标
CompactionCount int64 // 压缩次数
LastCompaction time.Time // 最后压缩时间
WriteErrors int64 // 写入错误次数
QueryErrors int64 // 查询错误次数
}
// WriteBufferHandler 定义写入缓冲区处理器接口
type WriteBufferHandler interface {
WriteToBuffer(point DataPoint) error
FlushBuffer() error
ValidatePoint(point DataPoint) error // 新增方法,只检查不写入
}
// WriteBuffer 实现写入缓冲区
type WriteBuffer struct {
handler WriteBufferHandler
buffer []DataPoint
size int
}
// NewWriteBuffer 创建新的写入缓冲区
func NewWriteBuffer(handler WriteBufferHandler, size int) *WriteBuffer {
bufferCap := size
if bufferCap < 0 {
bufferCap = 0
}
return &WriteBuffer{
handler: handler,
buffer: make([]DataPoint, 0, bufferCap),
size: size,
}
}
// Write 写入数据点到缓冲区
func (b *WriteBuffer) Write(point DataPoint) error {
// 如果缓冲区大小为0直接写入并返回
if b.size <= 0 {
if err := b.handler.WriteToBuffer(point); err != nil {
return err
}
return b.handler.FlushBuffer()
}
// 先验证数据点是否可写入(不实际写入)
if err := b.handler.ValidatePoint(point); err != nil {
return err
}
// 如果缓冲区已满,先刷新
if len(b.buffer) >= b.size {
if err := b.Flush(); err != nil {
return err
}
}
// 添加到缓冲区
b.buffer = append(b.buffer, point)
// 如果缓冲区已满,立即刷新
if len(b.buffer) >= b.size {
return b.Flush()
}
return nil
}
// Flush 刷新缓冲区
func (b *WriteBuffer) Flush() error {
if len(b.buffer) == 0 {
return nil
}
// 批量写入
for _, point := range b.buffer {
if err := b.handler.WriteToBuffer(point); err != nil {
// 即使出错也清空缓冲区,避免重复处理错误数据
b.buffer = b.buffer[:0]
// 尝试调用刷新方法,但优先返回写入错误
_ = b.handler.FlushBuffer()
return err
}
}
// 清空缓冲区
b.buffer = b.buffer[:0]
// 调用处理器的刷新方法
return b.handler.FlushBuffer()
}

283
pkg/engine/engine_test.go Normal file
View File

@ -0,0 +1,283 @@
package engine
import (
"context"
"testing"
"time"
)
// mockEngine 是一个用于测试的模拟引擎实现
type mockEngine struct {
points []DataPoint
stats EngineStats
opened bool
closed bool
}
func (m *mockEngine) Open() error {
m.opened = true
return nil
}
func (m *mockEngine) Close() error {
m.closed = true
return nil
}
func (m *mockEngine) Write(ctx context.Context, points []DataPoint) error {
m.points = append(m.points, points...)
m.stats.PointsCount += int64(len(points))
m.stats.LastWriteTime = time.Now()
return nil
}
func (m *mockEngine) Query(ctx context.Context, query Query) (QueryResult, error) {
var result QueryResult
if len(m.points) == 0 {
return result, nil
}
// 根据查询类型返回不同的结果
switch query.Type {
case QueryTypeLatest:
// 返回最新的数据点
latest := m.points[len(m.points)-1]
result = append(result, SeriesResult{
SeriesID: latest.GetSeriesID(),
Points: []DataPoint{latest},
})
case QueryTypeRaw:
// 返回所有匹配的数据点
var matchedPoints []DataPoint
for _, point := range m.points {
if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime {
// 检查标签是否匹配
if lmatchTags(point.Labels, query.Tags) {
matchedPoints = append(matchedPoints, point)
}
}
}
if len(matchedPoints) > 0 {
result = append(result, SeriesResult{
SeriesID: matchedPoints[0].GetSeriesID(),
Points: matchedPoints,
})
}
}
return result, nil
}
func (m *mockEngine) Compact() error {
m.stats.CompactionCount++
m.stats.LastCompaction = time.Now()
return nil
}
func (m *mockEngine) Cleanup() error {
return nil
}
func (m *mockEngine) Stats() EngineStats {
return m.stats
}
// matchTags 检查数据点的标签是否匹配查询标签
func lmatchTags(pointTags, queryTags map[string]string) bool {
for k, v := range queryTags {
if pointTags[k] != v {
return false
}
}
return true
}
func TestDataPoint_GetSeriesID(t *testing.T) {
tests := []struct {
name string
point DataPoint
labels map[string]string
}{
{
name: "empty labels",
point: DataPoint{
Labels: map[string]string{},
},
},
{
name: "single label",
point: DataPoint{
Labels: map[string]string{"host": "server1"},
},
},
{
name: "multiple labels",
point: DataPoint{
Labels: map[string]string{
"host": "server1",
"region": "us-west",
"service": "api",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
id := tt.point.GetSeriesID()
if id == "" {
t.Error("GetSeriesID() returned empty string")
}
// 相同标签应该生成相同的ID
id2 := tt.point.GetSeriesID()
if id != id2 {
t.Errorf("GetSeriesID() not consistent: got %v and %v", id, id2)
}
// 创建一个新的数据点,使用相同的标签
point2 := DataPoint{Labels: tt.point.Labels}
id3 := point2.GetSeriesID()
if id != id3 {
t.Errorf("GetSeriesID() not consistent across instances: got %v and %v", id, id3)
}
})
}
}
func TestWriteBuffer(t *testing.T) {
// 创建一个模拟的WriteBufferHandler
handler := &mockBufferHandler{
toflush: []DataPoint{},
points: make([]DataPoint, 0),
}
// 创建WriteBuffer设置缓冲区大小为2
buffer := NewWriteBuffer(handler, 2)
// 测试写入单个数据点
point1 := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: 1.0,
Labels: map[string]string{"test": "1"},
}
if err := buffer.Write(point1); err != nil {
t.Errorf("Write() error = %v", err)
}
if len(handler.points) != 0 {
t.Errorf("Buffer flushed too early, got %d points, want 0", len(handler.points))
}
// 测试写入第二个数据点(应该触发刷新)
point2 := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: 2.0,
Labels: map[string]string{"test": "2"},
}
if err := buffer.Write(point2); err != nil {
t.Errorf("Write() error = %v", err)
}
if len(handler.toflush) != 2 {
t.Errorf("Buffer not flushed when full, got %d points, want 2", len(handler.points))
}
// 测试手动刷新
point3 := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: 3.0,
Labels: map[string]string{"test": "3"},
}
if err := buffer.Write(point3); err != nil {
t.Errorf("Write() error = %v", err)
}
if err := buffer.Flush(); err != nil {
t.Errorf("Flush() error = %v", err)
}
if len(handler.toflush) != 3 {
t.Errorf("Manual flush failed, got %d points, want 3", len(handler.points))
}
}
// mockBufferHandler 是一个用于测试的WriteBufferHandler实现
type mockBufferHandler struct {
toflush []DataPoint
points []DataPoint
}
func (h *mockBufferHandler) WriteToBuffer(point DataPoint) error {
h.points = append(h.points, point)
return nil
}
func (h *mockBufferHandler) FlushBuffer() error {
h.toflush = append(h.toflush, h.points...)
h.points = []DataPoint{}
return nil
}
func (h *mockBufferHandler) ValidatePoint(point DataPoint) error {
return nil
}
func TestEngineFactory(t *testing.T) {
// 注册模拟引擎
RegisterMemoryEngine(func(config *MemoryEngineConfig) (Engine, error) {
return &mockEngine{}, nil
})
RegisterBoltEngine(func(config *BoltEngineConfig) (Engine, error) {
return &mockEngine{}, nil
})
tests := []struct {
name string
config EngineConfig
wantErr bool
}{
{
name: "memory engine",
config: NewMemoryEngineConfig(),
wantErr: false,
},
{
name: "bolt engine",
config: NewBoltEngineConfig("test.db"),
wantErr: false,
},
{
name: "invalid config type",
config: &struct{ EngineConfig }{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
engine, err := NewEngine(tt.config)
if (err != nil) != tt.wantErr {
t.Errorf("NewEngine() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !tt.wantErr && engine == nil {
t.Error("NewEngine() returned nil engine")
}
})
}
}
func TestEngineConfig(t *testing.T) {
// 测试内存引擎配置
memConfig := NewMemoryEngineConfig()
if memConfig.Type() != "memory" {
t.Errorf("MemoryEngineConfig.Type() = %v, want memory", memConfig.Type())
}
if memConfig.MaxHistoryValues != 30 {
t.Errorf("MemoryEngineConfig.MaxHistoryValues = %v, want 30", memConfig.MaxHistoryValues)
}
// 测试Bolt引擎配置
boltConfig := NewBoltEngineConfig("test.db")
if boltConfig.Type() != "bolt" {
t.Errorf("BoltEngineConfig.Type() = %v, want bolt", boltConfig.Type())
}
if boltConfig.Path != "test.db" {
t.Errorf("BoltEngineConfig.Path = %v, want test.db", boltConfig.Path)
}
}

67
pkg/engine/factory.go Normal file
View File

@ -0,0 +1,67 @@
package engine
import (
"fmt"
)
// NewEngine 创建指定类型的存储引擎
func NewEngine(config EngineConfig) (eng Engine, err error) {
if config == nil {
return nil, fmt.Errorf("engine config cannot be nil")
}
engineType := ""
defer func() {
if r := recover(); r != nil {
// 如果 Type() 方法调用导致 panic返回错误
fmt.Printf("Recovered from panic in NewEngine: %v\n", r)
// return nil,r
eng = nil
err = fmt.Errorf("engine type %s not supported", engineType)
}
}()
// 尝试获取引擎类型,如果失败则返回错误
engineType = config.Type()
switch engineType {
case "memory":
// 检查配置类型是否正确
memConfig, ok := config.(*MemoryEngineConfig)
if !ok {
return nil, fmt.Errorf("invalid config type for memory engine: %T", config)
}
return newMemoryEngine(memConfig)
case "bolt":
// 检查配置类型是否正确
boltConfig, ok := config.(*BoltEngineConfig)
if !ok {
return nil, fmt.Errorf("invalid config type for bolt engine: %T", config)
}
return newBoltEngine(boltConfig)
default:
return nil, fmt.Errorf("unsupported engine type")
}
}
// newMemoryEngine 创建内存引擎
// 这个函数将在memory包中实现这里只是声明
var newMemoryEngine = func(config *MemoryEngineConfig) (Engine, error) {
return nil, fmt.Errorf("memory engine not implemented")
}
// newBoltEngine 创建Bolt引擎
// 这个函数将在bolt包中实现这里只是声明
var newBoltEngine = func(config *BoltEngineConfig) (Engine, error) {
return nil, fmt.Errorf("bolt engine not implemented")
}
// RegisterMemoryEngine 注册内存引擎创建函数
func RegisterMemoryEngine(creator func(config *MemoryEngineConfig) (Engine, error)) {
newMemoryEngine = creator
}
// RegisterBoltEngine 注册Bolt引擎创建函数
func RegisterBoltEngine(creator func(config *BoltEngineConfig) (Engine, error)) {
newBoltEngine = creator
}

View File

@ -0,0 +1,322 @@
package engine
import (
"context"
"testing"
"time"
)
// engineFactory 是一个用于创建引擎实例的函数类型
type engineFactory func(t *testing.T) Engine
// engineTest 包含所有引擎测试用例
type engineTest struct {
name string
factory engineFactory
}
// 运行所有引擎测试
func runEngineTests(t *testing.T, tests []engineTest) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
engine := tt.factory(t)
// 测试基本操作
t.Run("Basic Operations", func(t *testing.T) {
testBasicOperations(t, engine)
})
// 测试查询功能
t.Run("Query Operations", func(t *testing.T) {
testQueryOperations(t, engine)
})
// 测试压缩和清理
t.Run("Maintenance Operations", func(t *testing.T) {
testMaintenanceOperations(t, engine)
})
// 测试统计信息
t.Run("Stats Operations", func(t *testing.T) {
testStatsOperations(t, engine)
})
})
}
}
// testBasicOperations 测试基本的CRUD操作
func testBasicOperations(t *testing.T, engine Engine) {
ctx := context.Background()
// 测试打开引擎
if err := engine.Open(); err != nil {
t.Fatalf("Failed to open engine: %v", err)
}
// 准备测试数据
points := []DataPoint{
{
Timestamp: time.Now().UnixNano(),
Value: 42.0,
Labels: map[string]string{
"host": "server1",
"service": "api",
},
},
{
Timestamp: time.Now().UnixNano(),
Value: 43.0,
Labels: map[string]string{
"host": "server2",
"service": "api",
},
},
}
// 测试写入
if err := engine.Write(ctx, points); err != nil {
t.Fatalf("Failed to write points: %v", err)
}
// 测试查询
query := Query{
Type: QueryTypeRaw,
StartTime: time.Now().Add(-1 * time.Hour).UnixNano(),
EndTime: time.Now().Add(1 * time.Hour).UnixNano(),
Tags: map[string]string{
"service": "api",
},
}
result, err := engine.Query(ctx, query)
if err != nil {
t.Fatalf("Failed to query points: %v", err)
}
// 验证查询结果
if len(result) == 0 {
t.Error("Query returned no results")
}
// 测试关闭引擎
if err := engine.Close(); err != nil {
t.Fatalf("Failed to close engine: %v", err)
}
}
// testQueryOperations 测试不同类型的查询操作
func testQueryOperations(t *testing.T, engine Engine) {
ctx := context.Background()
// 打开引擎
if err := engine.Open(); err != nil {
t.Fatalf("Failed to open engine: %v", err)
}
defer engine.Close()
// 准备测试数据
now := time.Now()
points := []DataPoint{
{
Timestamp: now.Add(-2 * time.Hour).UnixNano(),
Value: 10.0,
Labels: map[string]string{
"host": "server1",
"app": "test",
},
},
{
Timestamp: now.Add(-1 * time.Hour).UnixNano(),
Value: 20.0,
Labels: map[string]string{
"host": "server1",
"app": "test",
},
},
{
Timestamp: now.UnixNano(),
Value: 30.0,
Labels: map[string]string{
"host": "server1",
"app": "test",
},
},
}
// 写入测试数据
if err := engine.Write(ctx, points); err != nil {
t.Fatalf("Failed to write points: %v", err)
}
// 测试最新值查询
t.Run("Latest Query", func(t *testing.T) {
query := Query{
Type: QueryTypeLatest,
Tags: map[string]string{
"host": "server1",
"app": "test",
},
}
result, err := engine.Query(ctx, query)
if err != nil {
t.Fatalf("Failed to query latest points: %v", err)
}
if len(result) != 1 {
t.Errorf("Expected 1 series result, got %d", len(result))
}
if len(result[0].Points) != 1 {
t.Errorf("Expected 1 point, got %d", len(result[0].Points))
}
if result[0].Points[0].Value != 30.0 {
t.Errorf("Expected latest value 30.0, got %f", result[0].Points[0].Value)
}
})
// 测试聚合查询
t.Run("Aggregate Query", func(t *testing.T) {
query := Query{
Type: QueryTypeAggregate,
StartTime: now.Add(-3 * time.Hour).UnixNano(),
EndTime: now.Add(1 * time.Hour).UnixNano(),
Tags: map[string]string{"app": "test"},
AggregateType: AggregateTypeAvg,
}
result, err := engine.Query(ctx, query)
if err != nil {
t.Fatalf("Failed to query aggregate: %v", err)
}
if len(result) != 1 {
t.Errorf("Expected 1 series result, got %d", len(result))
}
// 验证平均值
expectedAvg := 20.0 // (10 + 20 + 30) / 3
if len(result[0].Points) > 0 && result[0].Points[0].Value != expectedAvg {
t.Errorf("Expected average value %f, got %f", expectedAvg, result[0].Points[0].Value)
}
})
// 测试原始数据查询
t.Run("Raw Query", func(t *testing.T) {
query := Query{
Type: QueryTypeRaw,
StartTime: now.Add(-3 * time.Hour).UnixNano(),
EndTime: now.Add(1 * time.Hour).UnixNano(),
Tags: map[string]string{"app": "test"},
Limit: 10,
}
result, err := engine.Query(ctx, query)
if err != nil {
t.Fatalf("Failed to query raw data: %v", err)
}
if len(result) != 1 {
t.Errorf("Expected 1 series result, got %d", len(result))
}
if len(result[0].Points) != 3 {
t.Errorf("Expected 3 points, got %d", len(result[0].Points))
}
})
}
// testMaintenanceOperations 测试压缩和清理操作
func testMaintenanceOperations(t *testing.T, engine Engine) {
// 打开引擎
if err := engine.Open(); err != nil {
t.Fatalf("Failed to open engine: %v", err)
}
defer engine.Close()
// 测试压缩
if err := engine.Compact(); err != nil {
t.Errorf("Failed to compact: %v", err)
}
// 测试清理
if err := engine.Cleanup(); err != nil {
t.Errorf("Failed to cleanup: %v", err)
}
}
// testStatsOperations 测试统计信息收集
func testStatsOperations(t *testing.T, engine Engine) {
ctx := context.Background()
// 打开引擎
if err := engine.Open(); err != nil {
t.Fatalf("Failed to open engine: %v", err)
}
defer engine.Close()
beforeStats := engine.Stats()
// 写入一些数据
points := []DataPoint{
{
Timestamp: time.Now().UnixNano(),
Value: 42.0,
Labels: map[string]string{"test": "stats"},
},
}
if err := engine.Write(ctx, points); err != nil {
t.Fatalf("Failed to write points: %v", err)
}
// 获取统计信息
stats := engine.Stats()
// 验证统计信息
if stats.PointsCount-beforeStats.PointsCount != 1 {
t.Errorf("Expected points count 1, got %d", stats.PointsCount)
}
if stats.LastWriteTime.IsZero() {
t.Error("LastWriteTime should not be zero")
}
}
// TestEngines 运行所有引擎的集成测试
func TestEngines(t *testing.T) {
tests := []engineTest{
{
name: "MockEngine",
factory: func(t *testing.T) Engine {
return NewMockEngine()
},
},
// 当实现了Memory和Bolt引擎后可以添加它们的测试
/*
{
name: "MemoryEngine",
factory: func(t *testing.T) Engine {
config := NewMemoryEngineConfig()
engine, err := NewEngine(config)
if err != nil {
t.Fatalf("Failed to create memory engine: %v", err)
}
return engine
},
},
{
name: "BoltEngine",
factory: func(t *testing.T) Engine {
config := NewBoltEngineConfig("test.db")
engine, err := NewEngine(config)
if err != nil {
t.Fatalf("Failed to create bolt engine: %v", err)
}
return engine
},
},
*/
}
runEngineTests(t, tests)
}

394
pkg/engine/memory/memory.go Normal file
View File

@ -0,0 +1,394 @@
package memory
import (
"context"
"fmt"
"sync"
"time"
"git.pyer.club/kingecg/gotidb/pkg/engine"
)
// MemoryEngine 实现基于内存的存储引擎
type MemoryEngine struct {
mu sync.RWMutex
config *engine.MemoryEngineConfig
series map[string][]engine.DataPoint // seriesID -> 数据点列表
stats engine.EngineStats
opened bool
closed bool
}
// 确保 MemoryEngine 实现了 Engine 接口
var _ engine.Engine = (*MemoryEngine)(nil)
// NewMemoryEngine 创建一个新的内存引擎
func NewMemoryEngine(config *engine.MemoryEngineConfig) (*MemoryEngine, error) {
if config == nil {
return nil, fmt.Errorf("memory engine config cannot be nil")
}
return &MemoryEngine{
config: config,
series: make(map[string][]engine.DataPoint),
stats: engine.EngineStats{
LastWriteTime: time.Now(),
},
}, nil
}
// Open 实现 Engine 接口
func (m *MemoryEngine) Open() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.opened {
return fmt.Errorf("memory engine already opened")
}
if m.closed {
return fmt.Errorf("memory engine already closed")
}
m.opened = true
return nil
}
// Close 实现 Engine 接口
func (m *MemoryEngine) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
if !m.opened {
return fmt.Errorf("memory engine not opened")
}
if m.closed {
return fmt.Errorf("memory engine already closed")
}
// 清理资源
m.series = nil
m.closed = true
return nil
}
// Write 实现 Engine 接口
func (m *MemoryEngine) Write(ctx context.Context, points []engine.DataPoint) error {
if len(points) == 0 {
return nil
}
m.mu.Lock()
defer m.mu.Unlock()
if !m.opened || m.closed {
return fmt.Errorf("memory engine not open")
}
startTime := time.Now()
// 写入数据点
for _, point := range points {
seriesID := point.GetSeriesID()
if seriesID == "" {
m.stats.WriteErrors++
continue
}
// 获取或创建序列
seriesPoints, exists := m.series[seriesID]
if !exists {
seriesPoints = make([]engine.DataPoint, 0, m.config.MaxHistoryValues)
m.series[seriesID] = seriesPoints
}
// 添加数据点
m.series[seriesID] = append(m.series[seriesID], point)
// 如果超过最大历史值数量,删除最旧的数据点
if len(m.series[seriesID]) > m.config.MaxHistoryValues {
m.series[seriesID] = m.series[seriesID][1:]
}
}
// 更新统计信息
m.stats.PointsCount += int64(len(points))
m.stats.SeriesCount = int64(len(m.series))
m.stats.LastWriteTime = time.Now()
m.stats.WriteLatency = time.Since(startTime) / time.Duration(len(points))
return nil
}
// Query 实现 Engine 接口
func (m *MemoryEngine) Query(ctx context.Context, query engine.Query) (engine.QueryResult, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if !m.opened || m.closed {
return nil, fmt.Errorf("memory engine not open")
}
startTime := time.Now()
var result engine.QueryResult
// 根据查询类型执行不同的查询
switch query.Type {
case engine.QueryTypeLatest:
result = m.queryLatest(query)
case engine.QueryTypeRaw:
result = m.queryRaw(query)
case engine.QueryTypeAggregate:
result = m.queryAggregate(query)
case engine.QueryTypeValueDuration:
result = m.queryValueDuration(query)
default:
m.stats.QueryErrors++
return nil, fmt.Errorf("unsupported query type: %s", query.Type)
}
// 更新统计信息
m.stats.QueryLatency = time.Since(startTime)
return result, nil
}
// queryLatest 执行最新值查询
func (m *MemoryEngine) queryLatest(query engine.Query) engine.QueryResult {
var result engine.QueryResult
for seriesID, points := range m.series {
if len(points) == 0 {
continue
}
// 检查标签是否匹配
if !matchTags(points[0].Labels, query.Tags) {
continue
}
// 获取最新的数据点
latest := points[len(points)-1]
result = append(result, engine.SeriesResult{
SeriesID: seriesID,
Points: []engine.DataPoint{latest},
})
}
return result
}
// queryRaw 执行原始数据查询
func (m *MemoryEngine) queryRaw(query engine.Query) engine.QueryResult {
var result engine.QueryResult
for seriesID, points := range m.series {
if len(points) == 0 {
continue
}
// 检查标签是否匹配
if !matchTags(points[0].Labels, query.Tags) {
continue
}
var matchedPoints []engine.DataPoint
for _, point := range points {
if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime {
matchedPoints = append(matchedPoints, point)
}
}
if len(matchedPoints) > 0 {
// 应用限制
if query.Limit > 0 && len(matchedPoints) > query.Limit {
matchedPoints = matchedPoints[len(matchedPoints)-query.Limit:]
}
result = append(result, engine.SeriesResult{
SeriesID: seriesID,
Points: matchedPoints,
})
}
}
return result
}
// queryAggregate 执行聚合查询
func (m *MemoryEngine) queryAggregate(query engine.Query) engine.QueryResult {
var result engine.QueryResult
for seriesID, points := range m.series {
if len(points) == 0 {
continue
}
// 检查标签是否匹配
if !matchTags(points[0].Labels, query.Tags) {
continue
}
var matchedPoints []engine.DataPoint
for _, point := range points {
if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime {
matchedPoints = append(matchedPoints, point)
}
}
if len(matchedPoints) > 0 {
// 计算聚合值
aggregateValue := calculateAggregate(matchedPoints, query.AggregateType)
// 创建聚合结果点
aggregatePoint := engine.DataPoint{
Timestamp: query.EndTime,
Value: aggregateValue,
Labels: matchedPoints[0].Labels,
}
result = append(result, engine.SeriesResult{
SeriesID: seriesID,
Points: []engine.DataPoint{aggregatePoint},
})
}
}
return result
}
// queryValueDuration 执行值持续时间查询
func (m *MemoryEngine) queryValueDuration(query engine.Query) engine.QueryResult {
// 简化实现,实际应该计算每个值的持续时间
return m.queryRaw(query)
}
// Compact 实现 Engine 接口
func (m *MemoryEngine) Compact() error {
m.mu.Lock()
defer m.mu.Unlock()
if !m.opened || m.closed {
return fmt.Errorf("memory engine not open")
}
// 内存引擎的压缩操作:删除超过最大历史值数量的数据点
for seriesID, points := range m.series {
if len(points) > m.config.MaxHistoryValues {
m.series[seriesID] = points[len(points)-m.config.MaxHistoryValues:]
}
}
// 更新统计信息
m.stats.CompactionCount++
m.stats.LastCompaction = time.Now()
return nil
}
// Cleanup 实现 Engine 接口
func (m *MemoryEngine) Cleanup() error {
m.mu.Lock()
defer m.mu.Unlock()
if !m.opened || m.closed {
return fmt.Errorf("memory engine not open")
}
// 清理空序列
for seriesID, points := range m.series {
if len(points) == 0 {
delete(m.series, seriesID)
}
}
// 更新统计信息
m.stats.SeriesCount = int64(len(m.series))
return nil
}
// Stats 实现 Engine 接口
func (m *MemoryEngine) Stats() engine.EngineStats {
m.mu.RLock()
defer m.mu.RUnlock()
// 计算内存使用量(粗略估计)
var memoryUsage int64
for _, points := range m.series {
// 每个数据点大约占用的内存
const pointSize = 8 + 8 + 64 // timestamp + value + 估计的标签大小
memoryUsage += int64(len(points) * pointSize)
}
stats := m.stats
stats.MemoryUsage = memoryUsage
return stats
}
// matchTags 检查数据点的标签是否匹配查询标签
func matchTags(pointTags, queryTags map[string]string) bool {
for k, v := range queryTags {
if pointTags[k] != v {
return false
}
}
return true
}
// calculateAggregate 计算聚合值
func calculateAggregate(points []engine.DataPoint, aggregateType engine.AggregateType) float64 {
if len(points) == 0 {
return 0
}
switch aggregateType {
case engine.AggregateTypeAvg:
sum := 0.0
for _, p := range points {
sum += p.Value
}
return sum / float64(len(points))
case engine.AggregateTypeSum:
sum := 0.0
for _, p := range points {
sum += p.Value
}
return sum
case engine.AggregateTypeMin:
min := points[0].Value
for _, p := range points {
if p.Value < min {
min = p.Value
}
}
return min
case engine.AggregateTypeMax:
max := points[0].Value
for _, p := range points {
if p.Value > max {
max = p.Value
}
}
return max
case engine.AggregateTypeCount:
return float64(len(points))
default:
return 0
}
}
// 注册内存引擎创建函数
func init() {
engine.RegisterMemoryEngine(func(config *engine.MemoryEngineConfig) (engine.Engine, error) {
return NewMemoryEngine(config)
})
}

370
pkg/engine/mock_engine.go Normal file
View File

@ -0,0 +1,370 @@
package engine
import (
"context"
"fmt"
"sync"
"time"
)
// MockEngine 是一个用于测试的模拟引擎实现
type MockEngine struct {
mu sync.RWMutex
points map[string][]DataPoint // seriesID -> 数据点列表
stats EngineStats
opened bool
closed bool
writeError error
queryError error
compactError error
cleanupError error
writeCallback func([]DataPoint) error
queryCallback func(Query) (QueryResult, error)
}
// NewMockEngine 创建一个新的模拟引擎
func NewMockEngine() *MockEngine {
return &MockEngine{
points: make(map[string][]DataPoint),
stats: EngineStats{
LastWriteTime: time.Now(),
},
}
}
// SetWriteError 设置写入操作的错误
func (m *MockEngine) SetWriteError(err error) {
m.writeError = err
}
// SetQueryError 设置查询操作的错误
func (m *MockEngine) SetQueryError(err error) {
m.queryError = err
}
// SetCompactError 设置压缩操作的错误
func (m *MockEngine) SetCompactError(err error) {
m.compactError = err
}
// SetCleanupError 设置清理操作的错误
func (m *MockEngine) SetCleanupError(err error) {
m.cleanupError = err
}
// SetWriteCallback 设置写入回调函数
func (m *MockEngine) SetWriteCallback(callback func([]DataPoint) error) {
m.writeCallback = callback
}
// SetQueryCallback 设置查询回调函数
func (m *MockEngine) SetQueryCallback(callback func(Query) (QueryResult, error)) {
m.queryCallback = callback
}
// IsOpened 返回引擎是否已打开
func (m *MockEngine) IsOpened() bool {
return m.opened
}
// IsClosed 返回引擎是否已关闭
func (m *MockEngine) IsClosed() bool {
return m.closed
}
// GetPoints 返回所有数据点
func (m *MockEngine) GetPoints() map[string][]DataPoint {
m.mu.RLock()
defer m.mu.RUnlock()
// 创建一个副本
result := make(map[string][]DataPoint)
for seriesID, points := range m.points {
pointsCopy := make([]DataPoint, len(points))
copy(pointsCopy, points)
result[seriesID] = pointsCopy
}
return result
}
// GetPointsCount 返回数据点总数
func (m *MockEngine) GetPointsCount() int {
m.mu.RLock()
defer m.mu.RUnlock()
count := 0
for _, points := range m.points {
count += len(points)
}
return count
}
// Open 实现Engine接口
func (m *MockEngine) Open() error {
m.opened = true
return nil
}
// Close 实现Engine接口
func (m *MockEngine) Close() error {
m.closed = true
return nil
}
// Write 实现Engine接口
func (m *MockEngine) Write(ctx context.Context, points []DataPoint) error {
if m.writeError != nil {
return m.writeError
}
if m.writeCallback != nil {
if err := m.writeCallback(points); err != nil {
return err
}
}
m.mu.Lock()
defer m.mu.Unlock()
for _, point := range points {
seriesID := point.GetSeriesID()
if seriesID == "" {
return fmt.Errorf("invalid series ID for point: %+v", point)
}
m.points[seriesID] = append(m.points[seriesID], point)
}
m.stats.PointsCount += int64(len(points))
m.stats.LastWriteTime = time.Now()
m.stats.SeriesCount = int64(len(m.points))
return nil
}
// Query 实现Engine接口
func (m *MockEngine) Query(ctx context.Context, query Query) (QueryResult, error) {
if m.queryError != nil {
return nil, m.queryError
}
if m.queryCallback != nil {
return m.queryCallback(query)
}
m.mu.RLock()
defer m.mu.RUnlock()
var result QueryResult
// 根据查询类型返回不同的结果
switch query.Type {
case QueryTypeLatest:
// 返回每个序列的最新数据点
for seriesID, points := range m.points {
if len(points) == 0 {
continue
}
// 检查标签是否匹配
if !matchTags(points[0].Labels, query.Tags) {
continue
}
// 获取最新的数据点
latest := points[len(points)-1]
result = append(result, SeriesResult{
SeriesID: seriesID,
Points: []DataPoint{latest},
})
}
case QueryTypeRaw:
// 返回所有匹配的数据点
for seriesID, points := range m.points {
if len(points) == 0 {
continue
}
// 检查标签是否匹配
if !matchTags(points[0].Labels, query.Tags) {
continue
}
var matchedPoints []DataPoint
for _, point := range points {
if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime {
matchedPoints = append(matchedPoints, point)
}
}
if len(matchedPoints) > 0 {
// 应用限制
if query.Limit > 0 && len(matchedPoints) > query.Limit {
matchedPoints = matchedPoints[:query.Limit]
}
result = append(result, SeriesResult{
SeriesID: seriesID,
Points: matchedPoints,
})
}
}
case QueryTypeAggregate:
// 返回聚合结果
for seriesID, points := range m.points {
if len(points) == 0 {
continue
}
// 检查标签是否匹配
if !matchTags(points[0].Labels, query.Tags) {
continue
}
var matchedPoints []DataPoint
for _, point := range points {
if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime {
matchedPoints = append(matchedPoints, point)
}
}
if len(matchedPoints) > 0 {
// 计算聚合值
aggregateValue := calculateAggregate(matchedPoints, query.AggregateType)
// 创建聚合结果点
aggregatePoint := DataPoint{
Timestamp: query.EndTime,
Value: aggregateValue,
Labels: matchedPoints[0].Labels,
}
result = append(result, SeriesResult{
SeriesID: seriesID,
Points: []DataPoint{aggregatePoint},
})
}
}
case QueryTypeValueDuration:
// 返回值持续时间查询结果
// 这里简化实现,实际应该计算每个值的持续时间
for seriesID, points := range m.points {
if len(points) == 0 {
continue
}
// 检查标签是否匹配
if !matchTags(points[0].Labels, query.Tags) {
continue
}
var matchedPoints []DataPoint
for _, point := range points {
if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime {
matchedPoints = append(matchedPoints, point)
}
}
if len(matchedPoints) > 0 {
result = append(result, SeriesResult{
SeriesID: seriesID,
Points: matchedPoints,
})
}
}
default:
return nil, fmt.Errorf("unsupported query type: %s", query.Type)
}
return result, nil
}
// Compact 实现Engine接口
func (m *MockEngine) Compact() error {
if m.compactError != nil {
return m.compactError
}
m.stats.CompactionCount++
m.stats.LastCompaction = time.Now()
return nil
}
// Cleanup 实现Engine接口
func (m *MockEngine) Cleanup() error {
if m.cleanupError != nil {
return m.cleanupError
}
return nil
}
// Stats 实现Engine接口
func (m *MockEngine) Stats() EngineStats {
return m.stats
}
// matchTags 检查数据点的标签是否匹配查询标签
func matchTags(pointTags, queryTags map[string]string) bool {
for k, v := range queryTags {
if pointTags[k] != v {
return false
}
}
return true
}
// calculateAggregate 计算聚合值
func calculateAggregate(points []DataPoint, aggregateType AggregateType) float64 {
if len(points) == 0 {
return 0
}
switch aggregateType {
case AggregateTypeAvg:
sum := 0.0
for _, p := range points {
sum += p.Value
}
return sum / float64(len(points))
case AggregateTypeSum:
sum := 0.0
for _, p := range points {
sum += p.Value
}
return sum
case AggregateTypeMin:
min := points[0].Value
for _, p := range points {
if p.Value < min {
min = p.Value
}
}
return min
case AggregateTypeMax:
max := points[0].Value
for _, p := range points {
if p.Value > max {
max = p.Value
}
}
return max
case AggregateTypeCount:
return float64(len(points))
default:
return 0
}
}

View File

@ -15,7 +15,7 @@ import (
const (
// PersistenceTypeBoltDB BoltDB持久化类型
PersistenceTypeBoltDB PersistenceType = "boltdb"
// PersistenceTypeBoltDB PersistenceType = "boltdb"
// 默认bucket名称
devicesBucketName = "devices"
@ -296,7 +296,7 @@ func (e *BoltDBEngine) ReadAll(ctx context.Context, id model.DataPointID) ([]mod
// ReadDuration 读取指定时间范围内的数据
func (e *BoltDBEngine) ReadDuration(ctx context.Context, id model.DataPointID, from, to time.Time) ([]model.DataValue, error) {
deviceKey := id.String()
// deviceKey := id.String()
// 从数据库读取所有数据
values, err := e.Read(ctx, id)