package storage import ( "bufio" "encoding/json" "fmt" "os" "path/filepath" "sync" "time" "git.pyer.club/kingecg/gotidb/pkg/model" ) // Persister 持久化接口 type Persister interface { // Write 写入数据 Write(id model.DataPointID, value model.DataValue) error // Close 关闭持久化器 Close() error } // WALEntry WAL日志条目 type WALEntry struct { Timestamp time.Time `json:"timestamp"` DeviceID string `json:"device_id"` MetricCode string `json:"metric_code"` Labels map[string]string `json:"labels"` Value interface{} `json:"value"` } // WALPersister WAL持久化器 type WALPersister struct { directory string // WAL日志目录 file *os.File // 当前WAL文件 writer *bufio.Writer mutex sync.Mutex syncEvery int // 每写入多少条数据同步一次 count int // 当前写入计数 } // NewWALPersister 创建一个新的WAL持久化器 func NewWALPersister(directory string, syncEvery int) (*WALPersister, error) { // 确保目录存在 if err := os.MkdirAll(directory, 0755); err != nil { return nil, fmt.Errorf("failed to create WAL directory: %v", err) } // 创建新的WAL文件 filename := filepath.Join(directory, fmt.Sprintf("wal-%d.wal", time.Now().UnixNano())) file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return nil, fmt.Errorf("failed to create WAL file: %v", err) } return &WALPersister{ directory: directory, file: file, writer: bufio.NewWriter(file), syncEvery: syncEvery, }, nil } // Write 写入数据到WAL日志 func (w *WALPersister) Write(id model.DataPointID, value model.DataValue) error { w.mutex.Lock() defer w.mutex.Unlock() // 创建WAL条目 entry := WALEntry{ Timestamp: value.Timestamp, DeviceID: id.DeviceID, MetricCode: id.MetricCode, Labels: id.Labels, Value: value.Value, } // 序列化为JSON data, err := json.Marshal(entry) if err != nil { return fmt.Errorf("failed to marshal WAL entry: %v", err) } // 写入数据 if _, err := w.writer.Write(data); err != nil { return fmt.Errorf("failed to write WAL entry: %v", err) } if err := w.writer.WriteByte('\n'); err != nil { return fmt.Errorf("failed to write newline: %v", err) } // 增加计数 w.count++ // 检查是否需要同步 if w.count >= w.syncEvery { if err := w.sync(); err != nil { return fmt.Errorf("failed to sync WAL: %v", err) } w.count = 0 } return nil } // sync 同步WAL日志到磁盘 func (w *WALPersister) sync() error { if err := w.writer.Flush(); err != nil { return err } return w.file.Sync() } // Close 关闭WAL持久化器 func (w *WALPersister) Close() error { w.mutex.Lock() defer w.mutex.Unlock() // 刷新缓冲区 if err := w.writer.Flush(); err != nil { return fmt.Errorf("failed to flush WAL buffer: %v", err) } // 同步到磁盘 if err := w.file.Sync(); err != nil { return fmt.Errorf("failed to sync WAL file: %v", err) } // 关闭文件 return w.file.Close() } // LoadWAL 从WAL日志恢复数据 func LoadWAL(directory string) ([]WALEntry, error) { var entries []WALEntry // 获取所有WAL文件 files, err := filepath.Glob(filepath.Join(directory, "wal-*.wal")) if err != nil { return nil, fmt.Errorf("failed to list WAL files: %v", err) } // 按文件名排序(文件名包含时间戳) // TODO: 实现文件排序 // 读取每个文件 for _, file := range files { f, err := os.Open(file) if err != nil { return nil, fmt.Errorf("failed to open WAL file %s: %v", file, err) } defer f.Close() scanner := bufio.NewScanner(f) for scanner.Scan() { var entry WALEntry if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil { return nil, fmt.Errorf("failed to unmarshal WAL entry: %v", err) } entries = append(entries, entry) } if err := scanner.Err(); err != nil { return nil, fmt.Errorf("failed to scan WAL file %s: %v", file, err) } } return entries, nil }