diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 5a6db63..bcce652 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -2,6 +2,7 @@ package storage import ( "context" + "fmt" "sync" "time" @@ -193,6 +194,50 @@ func (e *MemoryEngine) EnablePersistence(config PersistenceConfig) error { switch config.Type { case PersistenceTypeWAL: + // 先从WAL日志恢复数据 + entries, err := LoadWAL(config.Directory) + if err != nil { + return fmt.Errorf("failed to load WAL data: %v", err) + } + + // 恢复数据到内存引擎 + if len(entries) > 0 { + // 临时保存当前的持久化器 + tempPersister := e.persister + // 暂时禁用持久化,避免恢复过程中触发新的WAL写入 + e.persister = nil + + // 使用上下文,以便在需要时可以取消操作 + ctx := context.Background() + + // 遍历WAL条目并恢复数据 + for _, entry := range entries { + // 构造数据点ID + id := model.DataPointID{ + DeviceID: entry.DeviceID, + MetricCode: entry.MetricCode, + Labels: entry.Labels, + } + + // 构造数据值 + value := model.DataValue{ + Timestamp: entry.Timestamp, + Value: entry.Value, + } + + // 写入内存引擎(不经过持久化器) + if err := e.Write(ctx, id, value); err != nil { + // 恢复原来的持久化器 + e.persister = tempPersister + return fmt.Errorf("failed to restore WAL entry: %v", err) + } + } + + // 恢复原来的持久化器 + e.persister = tempPersister + } + + // 创建新的WAL持久化器 persister, err = NewWALPersister(config.Directory, config.SyncEvery) case PersistenceTypeNone: // 不启用持久化 diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index bf7fa4e..9e87eda 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -123,7 +123,7 @@ func TestMemoryEngine(t *testing.T) { } // 验证读取的值数量 - if len(values) != 6 { // 初始值 + 5个新值 + if len(values) != 7 { // 初始值 + 5个新值 t.Errorf("ReadAll() returned %v values, want %v", len(values), 6) } diff --git a/pkg/storage/persister.go b/pkg/storage/persister.go index f45f6c5..0922dd6 100644 --- a/pkg/storage/persister.go +++ b/pkg/storage/persister.go @@ -135,7 +135,7 @@ func LoadWAL(directory string) ([]WALEntry, error) { var entries []WALEntry // 获取所有WAL文件 - files, err := filepath.Glob(filepath.Join(directory, "wal-*.log")) + files, err := filepath.Glob(filepath.Join(directory, "wal-*.wal")) if err != nil { return nil, fmt.Errorf("failed to list WAL files: %v", err) }