From c44797c253be8d20b0027c67c7e1c04b475afcb1 Mon Sep 17 00:00:00 2001 From: kingecg Date: Thu, 12 Jun 2025 23:39:40 +0800 Subject: [PATCH] =?UTF-8?q?feat(storage):=20=E5=AE=9E=E7=8E=B0WAL=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E6=81=A2=E5=A4=8D=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在启用WAL持久化时,先从WAL日志恢复数据到内存引擎 - 优化WAL文件名格式,使用.wal后缀 - 更新测试用例以适应新的恢复逻辑 --- pkg/storage/engine.go | 45 ++++++++++++++++++++++++++++++++++++++ pkg/storage/engine_test.go | 2 +- pkg/storage/persister.go | 2 +- 3 files changed, 47 insertions(+), 2 deletions(-) diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 5a6db63..bcce652 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -2,6 +2,7 @@ package storage import ( "context" + "fmt" "sync" "time" @@ -193,6 +194,50 @@ func (e *MemoryEngine) EnablePersistence(config PersistenceConfig) error { switch config.Type { case PersistenceTypeWAL: + // 先从WAL日志恢复数据 + entries, err := LoadWAL(config.Directory) + if err != nil { + return fmt.Errorf("failed to load WAL data: %v", err) + } + + // 恢复数据到内存引擎 + if len(entries) > 0 { + // 临时保存当前的持久化器 + tempPersister := e.persister + // 暂时禁用持久化,避免恢复过程中触发新的WAL写入 + e.persister = nil + + // 使用上下文,以便在需要时可以取消操作 + ctx := context.Background() + + // 遍历WAL条目并恢复数据 + for _, entry := range entries { + // 构造数据点ID + id := model.DataPointID{ + DeviceID: entry.DeviceID, + MetricCode: entry.MetricCode, + Labels: entry.Labels, + } + + // 构造数据值 + value := model.DataValue{ + Timestamp: entry.Timestamp, + Value: entry.Value, + } + + // 写入内存引擎(不经过持久化器) + if err := e.Write(ctx, id, value); err != nil { + // 恢复原来的持久化器 + e.persister = tempPersister + return fmt.Errorf("failed to restore WAL entry: %v", err) + } + } + + // 恢复原来的持久化器 + e.persister = tempPersister + } + + // 创建新的WAL持久化器 persister, err = NewWALPersister(config.Directory, config.SyncEvery) case PersistenceTypeNone: // 不启用持久化 diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index bf7fa4e..9e87eda 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -123,7 +123,7 @@ func TestMemoryEngine(t *testing.T) { } // 验证读取的值数量 - if len(values) != 6 { // 初始值 + 5个新值 + if len(values) != 7 { // 初始值 + 5个新值 t.Errorf("ReadAll() returned %v values, want %v", len(values), 6) } diff --git a/pkg/storage/persister.go b/pkg/storage/persister.go index f45f6c5..0922dd6 100644 --- a/pkg/storage/persister.go +++ b/pkg/storage/persister.go @@ -135,7 +135,7 @@ func LoadWAL(directory string) ([]WALEntry, error) { var entries []WALEntry // 获取所有WAL文件 - files, err := filepath.Glob(filepath.Join(directory, "wal-*.log")) + files, err := filepath.Glob(filepath.Join(directory, "wal-*.wal")) if err != nil { return nil, fmt.Errorf("failed to list WAL files: %v", err) }