package gocache import ( "bytes" "encoding/binary" "encoding/json" "fmt" "os" "sync" "time" ) // KVStore represents a simple in-memory key/value store. type KVStore struct { mu sync.RWMutex store map[string]interface{} // 修改为interface{}以支持多种类型 file string transaction *Transaction logFile string memoryLimit int64 memoryUsage int64 dirtyKeys map[string]bool bucketCount int index map[string]int64 // 添加索引字段 wg sync.WaitGroup // 添加WaitGroup以等待异步操作 } // Transaction represents an ongoing transaction type Transaction struct { store map[string]interface{} } // NewKVStore creates a new instance of KVStore. func NewKVStore(file string, memoryLimit int64, bucketCount int) *KVStore { store := &KVStore{ store: make(map[string]interface{}), file: file, logFile: file + ".log", memoryLimit: memoryLimit, memoryUsage: 0, dirtyKeys: make(map[string]bool), bucketCount: bucketCount, index: make(map[string]int64), // 初始化索引 } // 启动时自动恢复日志 if err := store.RecoverFromLog(); err != nil { panic(err) } // 启动定时器,定期存储变化 go store.periodicSave() return store } // periodicSave periodically saves dirty keys to file and clears the log func (k *KVStore) periodicSave() { ticker := time.NewTicker(5 * time.Minute) // 每5分钟保存一次 for range ticker.C { k.mu.Lock() if len(k.dirtyKeys) > 0 { // 保存变化到文件 for key := range k.dirtyKeys { k.saveKeyToBucket(key) } // 清空dirtyKeys k.dirtyKeys = make(map[string]bool) // 清空日志文件 os.Truncate(k.logFile, 0) } k.mu.Unlock() } } func (k *KVStore) getBucketNo(key string) int { return hashKey(key) % k.bucketCount } // saveKeyToBucket saves a key to its corresponding bucket file func (k *KVStore) saveKeyToBucket(key string) error { // 计算哈希值确定桶 hash := k.getBucketNo(key) bucketFile := fmt.Sprintf("%s.bucket%d", k.file, hash) // 创建索引文件 indexFile := fmt.Sprintf("%s.index%d", k.file, hash) indexData := make(map[string]int64) // 读取整个桶文件,过滤掉旧键值对 var entries [][]byte if _, err := os.Stat(bucketFile); err == nil { file, err := os.Open(bucketFile) if err != nil { return err } defer file.Close() // 读取索引文件 if _, err := os.Stat(indexFile); err == nil { indexFileData, err := os.ReadFile(indexFile) if err == nil { json.Unmarshal(indexFileData, &indexData) } } for { // 读取键长度 var keyLen int64 if err := binary.Read(file, binary.LittleEndian, &keyLen); err != nil { break } // 读取键 keyBytes := make([]byte, keyLen) if _, err := file.Read(keyBytes); err != nil { return err } // 读取值长度 var valueLen int64 if err := binary.Read(file, binary.LittleEndian, &valueLen); err != nil { return err } // 读取值 valueBytes := make([]byte, valueLen) if _, err := file.Read(valueBytes); err != nil { return err } // 如果键不等于当前键,保留该记录 if string(keyBytes) != key { entry := new(bytes.Buffer) if err := binary.Write(entry, binary.LittleEndian, keyLen); err != nil { return err } entry.Write(keyBytes) if err := binary.Write(entry, binary.LittleEndian, valueLen); err != nil { return err } entry.Write(valueBytes) entries = append(entries, entry.Bytes()) } } } // 重新写入所有保留的记录和新记录 file, err := os.Create(bucketFile) if err != nil { return err } defer file.Close() // 更新索引 offset, _ := file.Seek(0, 1) indexData[key] = offset // 写入保留的记录 for _, entry := range entries { if _, err := file.Write(entry); err != nil { return err } } // 写入新记录 value, exists := k.store[key] if !exists { return nil } // 序列化值 valueBytes, _ := json.Marshal(value) // switch v := value.(type) { // case string: // valueBytes = []byte(v) // case int: // valueBytes = []byte(fmt.Sprintf("%d", v)) // case bool: // valueBytes = []byte(fmt.Sprintf("%t", v)) // case []string, []int, []bool: // valueBytes, _ = json.Marshal(v) // default: // return fmt.Errorf("unsupported value type") // } // 写入键长度和键 if err := binary.Write(file, binary.LittleEndian, int64(len(key))); err != nil { return err } if _, err := file.WriteString(key); err != nil { return err } // 写入值长度和值 if err := binary.Write(file, binary.LittleEndian, int64(len(valueBytes))); err != nil { return err } if _, err := file.Write(valueBytes); err != nil { return err } // 保存索引文件 indexDataBytes, _ := json.Marshal(indexData) os.WriteFile(indexFile, indexDataBytes, 0644) return nil } func (k *KVStore) getKeyFromBucket(key string) (interface{}, bool) { // 计算哈希值确定桶 hash := k.getBucketNo(key) bucketFile := fmt.Sprintf("%s.bucket%d", k.file, hash) bucketIndexFile := fmt.Sprintf("%s.index%d", k.file, hash) // 读取索引文件 indexData := make(map[string]int64) if _, err := os.Stat(bucketIndexFile); err == nil { indexFileData, err := os.ReadFile(bucketIndexFile) if err == nil { json.Unmarshal(indexFileData, &indexData) } } // 检查索引中是否 if offset, exists := indexData[key]; exists { file, err := os.Open(bucketFile) if err != nil { return nil, false } defer file.Close() // 定位到偏移量 _, err = file.Seek(offset, 0) if err != nil { return nil, false } // 读取键长度 var keyLen int64 if err := binary.Read(file, binary.LittleEndian, &keyLen); err != nil { return nil, false } // 读取键 keyBytes := make([]byte, keyLen) if _, err := file.Read(keyBytes); err != nil { return nil, false } // 读取值长度 var valueLen int64 if err := binary.Read(file, binary.LittleEndian, &valueLen); err != nil { return nil, false } // 读取值 valueBytes := make([]byte, valueLen) if _, err := file.Read(valueBytes); err != nil { return nil, false } // 反序列化值 var value interface{} if err := json.Unmarshal(valueBytes, &value); err != nil { return nil, false } return value, true } return nil, false } // hashKey computes a hash for the key func hashKey(key string) int { hash := 0 for _, char := range key { hash = 31*hash + int(char) } return hash } // Flush saves dirty keys to file and clears the memory func (k *KVStore) Flush() error { k.mu.Lock() defer k.mu.Unlock() if len(k.dirtyKeys) == 0 { return nil } // 保存变化到文件 for key := range k.dirtyKeys { if err := k.saveKeyToBucket(key); err != nil { return err } } // 清空dirtyKeys k.dirtyKeys = make(map[string]bool) // 清空日志文件 if err := os.Truncate(k.logFile, 0); err != nil { return err } // 清空内存 k.store = make(map[string]interface{}) k.memoryUsage = 0 return nil } func (k *KVStore) valueSize(value interface{}) int64 { // 计算值的内存占用 var valueSize int64 switch v := value.(type) { case string: valueSize = int64(len(v)) case int: valueSize = 8 // int在64位系统上通常占8字节 case bool: valueSize = 1 // bool占1字节 case []string: for _, s := range v { valueSize += int64(len(s)) } case []int: valueSize = int64(len(v)) * 8 case []bool: valueSize = int64(len(v)) } return valueSize } // Put adds a key/value pair to the store. func (k *KVStore) Put(key string, value interface{}) { k.mu.Lock() defer k.mu.Unlock() // 检查值类型 switch value.(type) { case string, int, bool, []string, []int, []bool: // 有效类型,继续 default: panic("unsupported value type") } // 如果处于事务中,不检查内存限制 if k.transaction != nil { k.transaction.store[key] = value return } // 计算值的内存占用 valueSize := k.valueSize(value) // 检查内存使用量 newMemoryUsage := k.memoryUsage + int64(len(key)) + valueSize if newMemoryUsage > k.memoryLimit { if err := k.Flush(); err != nil { panic("Failed to flush store") } newMemoryUsage = int64(len(key)) + valueSize } k.store[key] = value k.memoryUsage = newMemoryUsage k.dirtyKeys[key] = true k.wg.Add(1) // 增加WaitGroup计数器 go func() { defer k.wg.Done() // 减少WaitGroup计数器 k.mu.Lock() defer k.mu.Unlock() k.LogOperation("put", key, value) // 异步记录操作日志 }() } // Delete removes the key/value pair associated with the given key. func (k *KVStore) Delete(key string) { k.mu.Lock() defer k.mu.Unlock() if value, exists := k.store[key]; exists { delete(k.store, key) valueSize := k.valueSize(value) k.memoryUsage -= int64(len(key)) + valueSize k.dirtyKeys[key] = true k.wg.Add(1) // 增加WaitGroup计数器 go func() { defer k.wg.Done() // 减少WaitGroup计数器 k.mu.Lock() defer k.mu.Unlock() k.LogOperation("delete", key, "") // 异步记录操作日志 }() } } // Get retrieves the value associated with the given key. func (k *KVStore) Get(key string) (interface{}, bool) { k.mu.RLock() defer k.mu.RUnlock() if k.transaction != nil { value, exists := k.transaction.store[key] if exists { return value, true } } value, exists := k.store[key] if !exists { value, exists = k.getKeyFromBucket(key) if exists { k.store[key] = value } else { return nil, false } } return value, exists } // BeginTransaction starts a new transaction func (k *KVStore) BeginTransaction() { k.mu.Lock() defer k.mu.Unlock() k.transaction = &Transaction{ store: make(map[string]interface{}), // 修改为interface{}以支持多种类型 } } // PutInTransaction puts a key/value pair in the current transaction func (k *KVStore) PutInTransaction(key string, value interface{}) { k.mu.Lock() defer k.mu.Unlock() if k.transaction != nil { // 检查值类型 switch value.(type) { case string, int, bool, []string, []int, []bool: k.transaction.store[key] = value default: panic("unsupported value type") } } else { k.store[key] = value } } // Commit commits the current transaction func (k *KVStore) Commit() error { k.mu.Lock() defer k.mu.Unlock() if k.transaction == nil { return nil } // 计算事务中键值对的内存使用量 var transactionMemoryUsage int64 for key, value := range k.transaction.store { var valueSize int64 switch v := value.(type) { case string: valueSize = int64(len(v)) case int: valueSize = 8 // int在64位系统上通常占8字节 case bool: valueSize = 1 // bool占1字节 case []string: for _, s := range v { valueSize += int64(len(s)) } case []int: valueSize = int64(len(v)) * 8 case []bool: valueSize = int64(len(v)) } transactionMemoryUsage += int64(len(key)) + valueSize } // 检查内存使用量 newMemoryUsage := k.memoryUsage + transactionMemoryUsage if newMemoryUsage > k.memoryLimit { if err := k.Flush(); err != nil { return err } newMemoryUsage = transactionMemoryUsage } // 提交事务 for key, value := range k.transaction.store { k.store[key] = value k.dirtyKeys[key] = true } k.memoryUsage = newMemoryUsage k.transaction = nil return nil } // Rollback rolls back the current transaction func (k *KVStore) Rollback() { k.mu.Lock() defer k.mu.Unlock() k.transaction = nil } // RecoverFromLog recovers data from log file func (k *KVStore) RecoverFromLog() error { k.mu.Lock() defer k.mu.Unlock() // 检查日志文件是否存在 if _, err := os.Stat(k.logFile); os.IsNotExist(err) { return nil } // 读取日志文件 data, err := os.ReadFile(k.logFile) if err != nil { return err } // 逐行处理日志 lines := bytes.Split(data, []byte{'\n'}) for _, line := range lines { if len(line) == 0 { continue } var logEntry map[string]interface{} if err := json.Unmarshal(line, &logEntry); err != nil { return err } switch logEntry["op"] { case "put": k.store[logEntry["key"].(string)] = logEntry["value"] case "delete": delete(k.store, logEntry["key"].(string)) } } // 清空日志文件 return os.Truncate(k.logFile, 0) } // LogOperation logs a key/value operation to the log file func (k *KVStore) LogOperation(op string, key, value interface{}) error { logEntry := map[string]interface{}{ "op": op, "key": key, "value": value, } data, err := json.Marshal(logEntry) if err != nil { return err } data = append(data, '\n') f, err := os.OpenFile(k.logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } defer f.Close() _, err = f.Write(data) return err } // LoadFromFile loads the store from a file with partial loading support func (k *KVStore) LoadFromFile(keys ...string) error { k.mu.Lock() defer k.mu.Unlock() // 先加载主数据文件 if _, err := os.Stat(k.file); err == nil { file, err := os.Open(k.file) if err != nil { return err } defer file.Close() // 读取键值对数量 var count int64 if err := binary.Read(file, binary.LittleEndian, &count); err != nil { return err } // 逐个读取键值对 for i := int64(0); i < count; i++ { // 读取键长度和键 var keyLen int64 if err := binary.Read(file, binary.LittleEndian, &keyLen); err != nil { return err } key := make([]byte, keyLen) if _, err := file.Read(key); err != nil { return err } // 读取值长度和值 var valueLen int64 if err := binary.Read(file, binary.LittleEndian, &valueLen); err != nil { return err } value := make([]byte, valueLen) if _, err := file.Read(value); err != nil { return err } // Partial loading if len(keys) == 0 { k.store[string(key)] = string(value) } else { for _, targetKey := range keys { if targetKey == string(key) { k.store[string(key)] = string(value) break } } } } } // 然后应用日志文件中的操作 if _, err := os.Stat(k.logFile); err == nil { data, err := os.ReadFile(k.logFile) if err != nil { return err } lines := bytes.Split(data, []byte{'\n'}) for _, line := range lines { if len(line) == 0 { continue } var logEntry map[string]string if err := json.Unmarshal(line, &logEntry); err != nil { return err } switch logEntry["op"] { case "put": k.store[logEntry["key"]] = logEntry["value"] case "delete": delete(k.store, logEntry["key"]) } } } return nil }