gocache/kvstore.go

388 lines
8.3 KiB
Go

package gocache
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"os"
"sync"
"time"
)
// KVStore represents a simple in-memory key/value store.
type KVStore struct {
mu sync.RWMutex
store map[string]string
file string
transaction *Transaction
logFile string
memoryLimit int64
memoryUsage int64
dirtyKeys map[string]bool // 记录发生变化的Key
bucketCount int // 哈希桶数量
}
// Transaction represents an ongoing transaction
type Transaction struct {
store map[string]string
}
// NewKVStore creates a new instance of KVStore.
func NewKVStore(file string, memoryLimit int64, bucketCount int) *KVStore {
store := &KVStore{
store: make(map[string]string),
file: file,
logFile: file + ".log",
memoryLimit: memoryLimit,
memoryUsage: 0,
dirtyKeys: make(map[string]bool),
bucketCount: bucketCount,
}
// 启动时自动恢复日志
if err := store.RecoverFromLog(); err != nil {
panic(err)
}
// 启动定时器,定期存储变化
go store.periodicSave()
return store
}
// periodicSave periodically saves dirty keys to file and clears the log
func (k *KVStore) periodicSave() {
ticker := time.NewTicker(5 * time.Minute) // 每5分钟保存一次
for range ticker.C {
k.mu.Lock()
if len(k.dirtyKeys) > 0 {
// 保存变化到文件
for key := range k.dirtyKeys {
k.saveKeyToBucket(key)
}
// 清空dirtyKeys
k.dirtyKeys = make(map[string]bool)
// 清空日志文件
os.Truncate(k.logFile, 0)
}
k.mu.Unlock()
}
}
// saveKeyToBucket saves a key to its corresponding bucket file
func (k *KVStore) saveKeyToBucket(key string) error {
// 计算哈希值确定桶
hash := hashKey(key) % k.bucketCount
bucketFile := fmt.Sprintf("%s.bucket%d", k.file, hash)
file, err := os.OpenFile(bucketFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
value, exists := k.store[key]
if !exists {
return nil
}
// 写入键长度和键
if err := binary.Write(file, binary.LittleEndian, int64(len(key))); err != nil {
return err
}
if _, err := file.WriteString(key); err != nil {
return err
}
// 写入值长度和值
if err := binary.Write(file, binary.LittleEndian, int64(len(value))); err != nil {
return err
}
if _, err := file.WriteString(value); err != nil {
return err
}
return nil
}
// hashKey computes a hash for the key
func hashKey(key string) int {
hash := 0
for _, char := range key {
hash = 31*hash + int(char)
}
return hash
}
// Put adds a key/value pair to the store.
func (k *KVStore) Put(key, value string) {
k.mu.Lock()
defer k.mu.Unlock()
// 检查内存使用量
newMemoryUsage := k.memoryUsage + int64(len(key)+len(value))
if newMemoryUsage > k.memoryLimit {
panic("Memory limit exceeded")
}
k.store[key] = value
k.memoryUsage = newMemoryUsage
k.dirtyKeys[key] = true
go func() {
k.mu.Lock()
defer k.mu.Unlock()
k.LogOperation("put", key, value) // 异步记录操作日志
}()
}
// Delete removes the key/value pair associated with the given key.
func (k *KVStore) Delete(key string) {
k.mu.Lock()
defer k.mu.Unlock()
if value, exists := k.store[key]; exists {
delete(k.store, key)
k.memoryUsage -= int64(len(key) + len(value))
k.dirtyKeys[key] = true
go func() {
k.mu.Lock()
defer k.mu.Unlock()
k.LogOperation("delete", key, "") // 异步记录操作日志
}()
}
}
// Get retrieves the value associated with the given key.
func (k *KVStore) Get(key string) (string, bool) {
k.mu.RLock()
defer k.mu.RUnlock()
if k.transaction != nil {
value, exists := k.transaction.store[key]
if exists {
return value, true
}
}
value, exists := k.store[key]
return value, exists
}
// BeginTransaction starts a new transaction
func (k *KVStore) BeginTransaction() {
k.mu.Lock()
defer k.mu.Unlock()
k.transaction = &Transaction{
store: make(map[string]string),
}
}
// Commit commits the current transaction
func (k *KVStore) Commit() error {
k.mu.Lock()
defer k.mu.Unlock()
if k.transaction == nil {
return nil
}
for key, value := range k.transaction.store {
k.store[key] = value
}
k.transaction = nil
return nil
}
// Rollback rolls back the current transaction
func (k *KVStore) Rollback() {
k.mu.Lock()
defer k.mu.Unlock()
k.transaction = nil
}
// PutInTransaction puts a key/value pair in the current transaction
func (k *KVStore) PutInTransaction(key, value string) {
k.mu.Lock()
defer k.mu.Unlock()
if k.transaction != nil {
k.transaction.store[key] = value
} else {
k.store[key] = value
}
}
// RecoverFromLog recovers data from log file
func (k *KVStore) RecoverFromLog() error {
k.mu.Lock()
defer k.mu.Unlock()
// 检查日志文件是否存在
if _, err := os.Stat(k.logFile); os.IsNotExist(err) {
return nil
}
// 读取日志文件
data, err := os.ReadFile(k.logFile)
if err != nil {
return err
}
// 逐行处理日志
lines := bytes.Split(data, []byte{'\n'})
for _, line := range lines {
if len(line) == 0 {
continue
}
var logEntry map[string]string
if err := json.Unmarshal(line, &logEntry); err != nil {
return err
}
switch logEntry["op"] {
case "put":
k.store[logEntry["key"]] = logEntry["value"]
case "delete":
delete(k.store, logEntry["key"])
}
}
// 清空日志文件
return os.Truncate(k.logFile, 0)
}
// SaveToFile saves the current store to a file using binary format.
func (k *KVStore) SaveToFile() error {
k.mu.RLock()
defer k.mu.RUnlock()
file, err := os.Create(k.file)
if err != nil {
return err
}
defer file.Close()
// 写入键值对数量
if err := binary.Write(file, binary.LittleEndian, int64(len(k.store))); err != nil {
return err
}
// 逐个写入键值对
for key, value := range k.store {
// 写入键长度和键
if err := binary.Write(file, binary.LittleEndian, int64(len(key))); err != nil {
return err
}
if _, err := file.WriteString(key); err != nil {
return err
}
// 写入值长度和值
if err := binary.Write(file, binary.LittleEndian, int64(len(value))); err != nil {
return err
}
if _, err := file.WriteString(value); err != nil {
return err
}
}
return nil
}
// LogOperation logs a key/value operation to the log file
func (k *KVStore) LogOperation(op string, key, value string) error {
logEntry := map[string]string{
"op": op,
"key": key,
"value": value,
}
data, err := json.Marshal(logEntry)
if err != nil {
return err
}
data = append(data, '\n')
f, err := os.OpenFile(k.logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer f.Close()
_, err = f.Write(data)
return err
}
// LoadFromFile loads the store from a file with partial loading support
func (k *KVStore) LoadFromFile(keys ...string) error {
k.mu.Lock()
defer k.mu.Unlock()
// 先加载主数据文件
if _, err := os.Stat(k.file); err == nil {
file, err := os.Open(k.file)
if err != nil {
return err
}
defer file.Close()
// 读取键值对数量
var count int64
if err := binary.Read(file, binary.LittleEndian, &count); err != nil {
return err
}
// 逐个读取键值对
for i := int64(0); i < count; i++ {
// 读取键长度和键
var keyLen int64
if err := binary.Read(file, binary.LittleEndian, &keyLen); err != nil {
return err
}
key := make([]byte, keyLen)
if _, err := file.Read(key); err != nil {
return err
}
// 读取值长度和值
var valueLen int64
if err := binary.Read(file, binary.LittleEndian, &valueLen); err != nil {
return err
}
value := make([]byte, valueLen)
if _, err := file.Read(value); err != nil {
return err
}
// Partial loading
if len(keys) == 0 {
k.store[string(key)] = string(value)
} else {
for _, targetKey := range keys {
if targetKey == string(key) {
k.store[string(key)] = string(value)
break
}
}
}
}
}
// 然后应用日志文件中的操作
if _, err := os.Stat(k.logFile); err == nil {
data, err := os.ReadFile(k.logFile)
if err != nil {
return err
}
lines := bytes.Split(data, []byte{'\n'})
for _, line := range lines {
if len(line) == 0 {
continue
}
var logEntry map[string]string
if err := json.Unmarshal(line, &logEntry); err != nil {
return err
}
switch logEntry["op"] {
case "put":
k.store[logEntry["key"]] = logEntry["value"]
case "delete":
delete(k.store, logEntry["key"])
}
}
}
return nil
}