package gocache import ( "bytes" "encoding/binary" "encoding/json" "fmt" "os" "sync" "time" ) // KVStore represents a simple in-memory key/value store. type KVStore struct { mu sync.RWMutex store map[string]string file string transaction *Transaction logFile string memoryLimit int64 memoryUsage int64 dirtyKeys map[string]bool // 记录发生变化的Key bucketCount int // 哈希桶数量 } // Transaction represents an ongoing transaction type Transaction struct { store map[string]string } // NewKVStore creates a new instance of KVStore. func NewKVStore(file string, memoryLimit int64, bucketCount int) *KVStore { store := &KVStore{ store: make(map[string]string), file: file, logFile: file + ".log", memoryLimit: memoryLimit, memoryUsage: 0, dirtyKeys: make(map[string]bool), bucketCount: bucketCount, } // 启动时自动恢复日志 if err := store.RecoverFromLog(); err != nil { panic(err) } // 启动定时器,定期存储变化 go store.periodicSave() return store } // periodicSave periodically saves dirty keys to file and clears the log func (k *KVStore) periodicSave() { ticker := time.NewTicker(5 * time.Minute) // 每5分钟保存一次 for range ticker.C { k.mu.Lock() if len(k.dirtyKeys) > 0 { // 保存变化到文件 for key := range k.dirtyKeys { k.saveKeyToBucket(key) } // 清空dirtyKeys k.dirtyKeys = make(map[string]bool) // 清空日志文件 os.Truncate(k.logFile, 0) } k.mu.Unlock() } } // saveKeyToBucket saves a key to its corresponding bucket file func (k *KVStore) saveKeyToBucket(key string) error { // 计算哈希值确定桶 hash := hashKey(key) % k.bucketCount bucketFile := fmt.Sprintf("%s.bucket%d", k.file, hash) file, err := os.OpenFile(bucketFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } defer file.Close() value, exists := k.store[key] if !exists { return nil } // 写入键长度和键 if err := binary.Write(file, binary.LittleEndian, int64(len(key))); err != nil { return err } if _, err := file.WriteString(key); err != nil { return err } // 写入值长度和值 if err := binary.Write(file, binary.LittleEndian, int64(len(value))); err != nil { return err } if _, err := file.WriteString(value); err != nil { return err } return nil } // hashKey computes a hash for the key func hashKey(key string) int { hash := 0 for _, char := range key { hash = 31*hash + int(char) } return hash } // Flush saves dirty keys to file and clears the memory func (k *KVStore) Flush() error { k.mu.Lock() defer k.mu.Unlock() if len(k.dirtyKeys) == 0 { return nil } // 保存变化到文件 for key := range k.dirtyKeys { if err := k.saveKeyToBucket(key); err != nil { return err } } // 清空dirtyKeys k.dirtyKeys = make(map[string]bool) // 清空日志文件 if err := os.Truncate(k.logFile, 0); err != nil { return err } // 清空内存 k.store = make(map[string]string) k.memoryUsage = 0 return nil } // Put adds a key/value pair to the store. func (k *KVStore) Put(key, value string) { k.mu.Lock() defer k.mu.Unlock() // 如果处于事务中,不检查内存限制 if k.transaction != nil { k.transaction.store[key] = value return } // 检查内存使用量 newMemoryUsage := k.memoryUsage + int64(len(key)+len(value)) if newMemoryUsage > k.memoryLimit { if err := k.Flush(); err != nil { panic("Failed to flush store") } newMemoryUsage = int64(len(key) + len(value)) } k.store[key] = value k.memoryUsage = newMemoryUsage k.dirtyKeys[key] = true go func() { k.mu.Lock() defer k.mu.Unlock() k.LogOperation("put", key, value) // 异步记录操作日志 }() } // Delete removes the key/value pair associated with the given key. func (k *KVStore) Delete(key string) { k.mu.Lock() defer k.mu.Unlock() if value, exists := k.store[key]; exists { delete(k.store, key) k.memoryUsage -= int64(len(key) + len(value)) k.dirtyKeys[key] = true go func() { k.mu.Lock() defer k.mu.Unlock() k.LogOperation("delete", key, "") // 异步记录操作日志 }() } } // Get retrieves the value associated with the given key. func (k *KVStore) Get(key string) (string, bool) { k.mu.RLock() defer k.mu.RUnlock() if k.transaction != nil { value, exists := k.transaction.store[key] if exists { return value, true } } value, exists := k.store[key] return value, exists } // BeginTransaction starts a new transaction func (k *KVStore) BeginTransaction() { k.mu.Lock() defer k.mu.Unlock() k.transaction = &Transaction{ store: make(map[string]string), } } // Commit commits the current transaction func (k *KVStore) Commit() error { k.mu.Lock() defer k.mu.Unlock() if k.transaction == nil { return nil } // 计算事务中键值对的内存使用量 var transactionMemoryUsage int64 for key, value := range k.transaction.store { transactionMemoryUsage += int64(len(key) + len(value)) } // 检查内存使用量 newMemoryUsage := k.memoryUsage + transactionMemoryUsage if newMemoryUsage > k.memoryLimit { if err := k.Flush(); err != nil { return err } newMemoryUsage = transactionMemoryUsage } // 提交事务 for key, value := range k.transaction.store { k.store[key] = value k.dirtyKeys[key] = true } k.memoryUsage = newMemoryUsage k.transaction = nil return nil } // Rollback rolls back the current transaction func (k *KVStore) Rollback() { k.mu.Lock() defer k.mu.Unlock() k.transaction = nil } // PutInTransaction puts a key/value pair in the current transaction func (k *KVStore) PutInTransaction(key, value string) { k.mu.Lock() defer k.mu.Unlock() if k.transaction != nil { k.transaction.store[key] = value } else { k.store[key] = value } } // RecoverFromLog recovers data from log file func (k *KVStore) RecoverFromLog() error { k.mu.Lock() defer k.mu.Unlock() // 检查日志文件是否存在 if _, err := os.Stat(k.logFile); os.IsNotExist(err) { return nil } // 读取日志文件 data, err := os.ReadFile(k.logFile) if err != nil { return err } // 逐行处理日志 lines := bytes.Split(data, []byte{'\n'}) for _, line := range lines { if len(line) == 0 { continue } var logEntry map[string]string if err := json.Unmarshal(line, &logEntry); err != nil { return err } switch logEntry["op"] { case "put": k.store[logEntry["key"]] = logEntry["value"] case "delete": delete(k.store, logEntry["key"]) } } // 清空日志文件 return os.Truncate(k.logFile, 0) } // SaveToFile saves the current store to a file using binary format. func (k *KVStore) SaveToFile() error { k.mu.RLock() defer k.mu.RUnlock() file, err := os.Create(k.file) if err != nil { return err } defer file.Close() // 写入键值对数量 if err := binary.Write(file, binary.LittleEndian, int64(len(k.store))); err != nil { return err } // 逐个写入键值对 for key, value := range k.store { // 写入键长度和键 if err := binary.Write(file, binary.LittleEndian, int64(len(key))); err != nil { return err } if _, err := file.WriteString(key); err != nil { return err } // 写入值长度和值 if err := binary.Write(file, binary.LittleEndian, int64(len(value))); err != nil { return err } if _, err := file.WriteString(value); err != nil { return err } } return nil } // LogOperation logs a key/value operation to the log file func (k *KVStore) LogOperation(op string, key, value string) error { logEntry := map[string]string{ "op": op, "key": key, "value": value, } data, err := json.Marshal(logEntry) if err != nil { return err } data = append(data, '\n') f, err := os.OpenFile(k.logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } defer f.Close() _, err = f.Write(data) return err } // LoadFromFile loads the store from a file with partial loading support func (k *KVStore) LoadFromFile(keys ...string) error { k.mu.Lock() defer k.mu.Unlock() // 先加载主数据文件 if _, err := os.Stat(k.file); err == nil { file, err := os.Open(k.file) if err != nil { return err } defer file.Close() // 读取键值对数量 var count int64 if err := binary.Read(file, binary.LittleEndian, &count); err != nil { return err } // 逐个读取键值对 for i := int64(0); i < count; i++ { // 读取键长度和键 var keyLen int64 if err := binary.Read(file, binary.LittleEndian, &keyLen); err != nil { return err } key := make([]byte, keyLen) if _, err := file.Read(key); err != nil { return err } // 读取值长度和值 var valueLen int64 if err := binary.Read(file, binary.LittleEndian, &valueLen); err != nil { return err } value := make([]byte, valueLen) if _, err := file.Read(value); err != nil { return err } // Partial loading if len(keys) == 0 { k.store[string(key)] = string(value) } else { for _, targetKey := range keys { if targetKey == string(key) { k.store[string(key)] = string(value) break } } } } } // 然后应用日志文件中的操作 if _, err := os.Stat(k.logFile); err == nil { data, err := os.ReadFile(k.logFile) if err != nil { return err } lines := bytes.Split(data, []byte{'\n'}) for _, line := range lines { if len(line) == 0 { continue } var logEntry map[string]string if err := json.Unmarshal(line, &logEntry); err != nil { return err } switch logEntry["op"] { case "put": k.store[logEntry["key"]] = logEntry["value"] case "delete": delete(k.store, logEntry["key"]) } } } return nil }