From bcf7eac85fa44b0e429f4a3aec9f3e1372198e6a Mon Sep 17 00:00:00 2001 From: kingecg Date: Sun, 23 Feb 2025 13:34:39 +0800 Subject: [PATCH] fix save structure --- kvstore.go | 400 +++++++++++++++++++++++++++++++++--------------- kvstore_test.go | 17 +- 2 files changed, 283 insertions(+), 134 deletions(-) diff --git a/kvstore.go b/kvstore.go index 62d8000..ddd34e3 100644 --- a/kvstore.go +++ b/kvstore.go @@ -2,9 +2,12 @@ package gocache import ( "bytes" + "encoding/binary" "encoding/json" + "fmt" "os" "sync" + "time" ) // KVStore represents a simple in-memory key/value store. @@ -13,7 +16,11 @@ type KVStore struct { store map[string]string file string transaction *Transaction - logFile string // 添加日志文件路径 + logFile string + memoryLimit int64 + memoryUsage int64 + dirtyKeys map[string]bool // 记录发生变化的Key + bucketCount int // 哈希桶数量 } // Transaction represents an ongoing transaction @@ -22,149 +29,103 @@ type Transaction struct { } // NewKVStore creates a new instance of KVStore. -func NewKVStore(file string) *KVStore { +func NewKVStore(file string, memoryLimit int64, bucketCount int) *KVStore { store := &KVStore{ - store: make(map[string]string), - file: file, - logFile: file + ".log", + store: make(map[string]string), + file: file, + logFile: file + ".log", + memoryLimit: memoryLimit, + memoryUsage: 0, + dirtyKeys: make(map[string]bool), + bucketCount: bucketCount, } // 启动时自动恢复日志 if err := store.RecoverFromLog(); err != nil { panic(err) } + // 启动定时器,定期存储变化 + go store.periodicSave() return store } -// RecoverFromLog recovers data from log file -func (k *KVStore) RecoverFromLog() error { - k.mu.Lock() - defer k.mu.Unlock() +// periodicSave periodically saves dirty keys to file and clears the log +func (k *KVStore) periodicSave() { + ticker := time.NewTicker(5 * time.Minute) // 每5分钟保存一次 + for range ticker.C { + k.mu.Lock() + if len(k.dirtyKeys) > 0 { + // 保存变化到文件 + for key := range k.dirtyKeys { + k.saveKeyToBucket(key) + } + // 清空dirtyKeys + k.dirtyKeys = make(map[string]bool) + // 清空日志文件 + os.Truncate(k.logFile, 0) + } + k.mu.Unlock() + } +} - // 检查日志文件是否存在 - if _, err := os.Stat(k.logFile); os.IsNotExist(err) { +// saveKeyToBucket saves a key to its corresponding bucket file +func (k *KVStore) saveKeyToBucket(key string) error { + // 计算哈希值确定桶 + hash := hashKey(key) % k.bucketCount + bucketFile := fmt.Sprintf("%s.bucket%d", k.file, hash) + + file, err := os.OpenFile(bucketFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer file.Close() + + value, exists := k.store[key] + if !exists { return nil } - // 读取日志文件 - data, err := os.ReadFile(k.logFile) - if err != nil { + // 写入键长度和键 + if err := binary.Write(file, binary.LittleEndian, int64(len(key))); err != nil { + return err + } + if _, err := file.WriteString(key); err != nil { return err } - // 逐行处理日志 - lines := bytes.Split(data, []byte{'\n'}) - for _, line := range lines { - if len(line) == { - continue - } - var logEntry map[string]string - if err := json.Unmarshal(line, &logEntry); err != nil { - return err - } - switch logEntry["op"] { - case "put": - k.store[logEntry["key"]] = logEntry["value"] - case "delete": - delete(k.store, logEntry["key"]) - } - } - - // 清空日志文件 - return os.Truncate(k.logFile, 0) -} - -// SaveToFile saves the current store to a file. -func (k *KVStore) SaveToFile() error { - k.mu.RLock() - defer k.mu.RUnlock() - data, err := json.Marshal(k.store) - if err != nil { + // 写入值长度和值 + if err := binary.Write(file, binary.LittleEndian, int64(len(value))); err != nil { return err } - return os.WriteFile(k.file, data, 0644) -} - -// LogOperation logs a key/value operation to the log file -func (k *KVStore) LogOperation(op string, key, value string) error { - logEntry := map[string]string{ - "op": op, - "key": key, - "value": value, - } - data, err := json.Marshal(logEntry) - if err != nil { + if _, err := file.WriteString(value); err != nil { return err } - data = append(data, '\n') - f, err := os.OpenFile(k.logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return err - } - defer f.Close() - _, err = f.Write(data) - return err -} - -// LoadFromFile loads the store from a file with partial loading support -func (k *KVStore) LoadFromFile(keys ...string) error { - k.mu.Lock() - defer k.mu.Unlock() - - // 先加载主数据文件 - if _, err := os.Stat(k.file); err == nil { - data, err := os.ReadFile(k.file) - if err != nil { - return err - } - - var tempStore map[string]string - if err := json.Unmarshal(data, &tempStore); err != nil { - return err - } - - // Partial loading - if len(keys) > 0 { - for _, key := range keys { - if value, exists := tempStore[key]; exists { - k.store[key] = value - } - } - } - } - - // 然后应用日志文件中的操作 - if _, err := os.Stat(k.logFile); err == nil { - data, err := os.ReadFile(k.logFile) - if err != nil { - return err - } - - lines := bytes.Split(data, []byte{'\n'}) - for _, line := range lines { - if len(line) == 0 { - continue - } - var logEntry map[string]string - if err := json.Unmarshal(line, &logEntry); err != nil { - return err - } - switch logEntry["op"] { - case "put": - k.store[logEntry["key"]] = logEntry["value"] - case "delete": - delete(k.store, logEntry["key"]) - } - } - } return nil } +// hashKey computes a hash for the key +func hashKey(key string) int { + hash := 0 + for _, char := range key { + hash = 31*hash + int(char) + } + return hash +} + // Put adds a key/value pair to the store. func (k *KVStore) Put(key, value string) { k.mu.Lock() defer k.mu.Unlock() + + // 检查内存使用量 + newMemoryUsage := k.memoryUsage + int64(len(key)+len(value)) + if newMemoryUsage > k.memoryLimit { + panic("Memory limit exceeded") + } + k.store[key] = value + k.memoryUsage = newMemoryUsage + k.dirtyKeys[key] = true go func() { k.mu.Lock() defer k.mu.Unlock() @@ -172,6 +133,23 @@ func (k *KVStore) Put(key, value string) { }() } +// Delete removes the key/value pair associated with the given key. +func (k *KVStore) Delete(key string) { + k.mu.Lock() + defer k.mu.Unlock() + + if value, exists := k.store[key]; exists { + delete(k.store, key) + k.memoryUsage -= int64(len(key) + len(value)) + k.dirtyKeys[key] = true + go func() { + k.mu.Lock() + defer k.mu.Unlock() + k.LogOperation("delete", key, "") // 异步记录操作日志 + }() + } +} + // Get retrieves the value associated with the given key. func (k *KVStore) Get(key string) (string, bool) { k.mu.RLock() @@ -186,18 +164,6 @@ func (k *KVStore) Get(key string) (string, bool) { return value, exists } -// Delete removes the key/value pair associated with the given key. -func (k *KVStore) Delete(key string) { - k.mu.Lock() - defer k.mu.Unlock() - delete(k.store, key) - go func() { - k.mu.Lock() - defer k.mu.Unlock() - k.LogOperation("delete", key, "") // 异步记录操作日志 - }() -} - // BeginTransaction starts a new transaction func (k *KVStore) BeginTransaction() { k.mu.Lock() @@ -239,3 +205,183 @@ func (k *KVStore) PutInTransaction(key, value string) { k.store[key] = value } } + +// RecoverFromLog recovers data from log file +func (k *KVStore) RecoverFromLog() error { + k.mu.Lock() + defer k.mu.Unlock() + + // 检查日志文件是否存在 + if _, err := os.Stat(k.logFile); os.IsNotExist(err) { + return nil + } + + // 读取日志文件 + data, err := os.ReadFile(k.logFile) + if err != nil { + return err + } + + // 逐行处理日志 + lines := bytes.Split(data, []byte{'\n'}) + for _, line := range lines { + if len(line) == 0 { + continue + } + var logEntry map[string]string + if err := json.Unmarshal(line, &logEntry); err != nil { + return err + } + switch logEntry["op"] { + case "put": + k.store[logEntry["key"]] = logEntry["value"] + case "delete": + delete(k.store, logEntry["key"]) + } + } + + // 清空日志文件 + return os.Truncate(k.logFile, 0) +} + +// SaveToFile saves the current store to a file using binary format. +func (k *KVStore) SaveToFile() error { + k.mu.RLock() + defer k.mu.RUnlock() + + file, err := os.Create(k.file) + if err != nil { + return err + } + defer file.Close() + + // 写入键值对数量 + if err := binary.Write(file, binary.LittleEndian, int64(len(k.store))); err != nil { + return err + } + + // 逐个写入键值对 + for key, value := range k.store { + // 写入键长度和键 + if err := binary.Write(file, binary.LittleEndian, int64(len(key))); err != nil { + return err + } + if _, err := file.WriteString(key); err != nil { + return err + } + + // 写入值长度和值 + if err := binary.Write(file, binary.LittleEndian, int64(len(value))); err != nil { + return err + } + if _, err := file.WriteString(value); err != nil { + return err + } + } + + return nil +} + +// LogOperation logs a key/value operation to the log file +func (k *KVStore) LogOperation(op string, key, value string) error { + logEntry := map[string]string{ + "op": op, + "key": key, + "value": value, + } + data, err := json.Marshal(logEntry) + if err != nil { + return err + } + data = append(data, '\n') + f, err := os.OpenFile(k.logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + _, err = f.Write(data) + return err +} + +// LoadFromFile loads the store from a file with partial loading support +func (k *KVStore) LoadFromFile(keys ...string) error { + k.mu.Lock() + defer k.mu.Unlock() + + // 先加载主数据文件 + if _, err := os.Stat(k.file); err == nil { + file, err := os.Open(k.file) + if err != nil { + return err + } + defer file.Close() + + // 读取键值对数量 + var count int64 + if err := binary.Read(file, binary.LittleEndian, &count); err != nil { + return err + } + + // 逐个读取键值对 + for i := int64(0); i < count; i++ { + // 读取键长度和键 + var keyLen int64 + if err := binary.Read(file, binary.LittleEndian, &keyLen); err != nil { + return err + } + key := make([]byte, keyLen) + if _, err := file.Read(key); err != nil { + return err + } + + // 读取值长度和值 + var valueLen int64 + if err := binary.Read(file, binary.LittleEndian, &valueLen); err != nil { + return err + } + value := make([]byte, valueLen) + if _, err := file.Read(value); err != nil { + return err + } + + // Partial loading + if len(keys) == 0 { + k.store[string(key)] = string(value) + } else { + for _, targetKey := range keys { + if targetKey == string(key) { + k.store[string(key)] = string(value) + break + } + } + } + } + } + + // 然后应用日志文件中的操作 + if _, err := os.Stat(k.logFile); err == nil { + data, err := os.ReadFile(k.logFile) + if err != nil { + return err + } + + lines := bytes.Split(data, []byte{'\n'}) + for _, line := range lines { + if len(line) == 0 { + continue + } + var logEntry map[string]string + if err := json.Unmarshal(line, &logEntry); err != nil { + return err + } + switch logEntry["op"] { + case "put": + k.store[logEntry["key"]] = logEntry["value"] + case "delete": + delete(k.store, logEntry["key"]) + } + } + } + + return nil +} diff --git a/kvstore_test.go b/kvstore_test.go index 550604b..d706104 100644 --- a/kvstore_test.go +++ b/kvstore_test.go @@ -5,15 +5,18 @@ import ( "testing" ) +var mumlimit int64 = 1024 * 1024 * 1024 +var bucketCount = 16 + func TestNewKVStore(t *testing.T) { - store := NewKVStore("test.db") + store := NewKVStore("test.db", mumlimit, bucketCount) if store == nil { t.Error("Expected a new KVStore instance, got nil") } } func TestPutAndGet(t *testing.T) { - store := NewKVStore("test.db") + store := NewKVStore("test.db", mumlimit, bucketCount) store.Put("key1", "value1") value, exists := store.Get("key1") if !exists || value != "value1" { @@ -22,7 +25,7 @@ func TestPutAndGet(t *testing.T) { } func TestDelete(t *testing.T) { - store := NewKVStore("test.db") + store := NewKVStore("test.db", mumlimit, bucketCount) store.Put("key1", "value1") store.Delete("key1") _, exists := store.Get("key1") @@ -32,14 +35,14 @@ func TestDelete(t *testing.T) { } func TestSaveAndLoadFromFile(t *testing.T) { - store := NewKVStore("test.db") + store := NewKVStore("test.db", mumlimit, bucketCount) store.Put("key1", "value1") err := store.SaveToFile() if err != nil { t.Error("Failed to save store to file") } - newStore := NewKVStore("test.db") + newStore := NewKVStore("test.db", mumlimit, bucketCount) err = newStore.LoadFromFile() if err != nil { t.Error("Failed to load store from file") @@ -56,7 +59,7 @@ func TestSaveAndLoadFromFile(t *testing.T) { } func TestLogOperation(t *testing.T) { - store := NewKVStore("test.db") + store := NewKVStore("test.db", mumlimit, bucketCount) err := store.LogOperation("put", "key1", "value1") if err != nil { t.Error("Failed to log operation") @@ -67,7 +70,7 @@ func TestLogOperation(t *testing.T) { } func TestTransaction(t *testing.T) { - store := NewKVStore("test.db") + store := NewKVStore("test.db", mumlimit, bucketCount) store.BeginTransaction() store.PutInTransaction("key1", "value1") store.Commit()