633 lines
14 KiB
Go
633 lines
14 KiB
Go
package gocache
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// KVStore represents a simple in-memory key/value store.
|
|
type KVStore struct {
|
|
mu sync.RWMutex
|
|
store map[string]interface{} // 修改为interface{}以支持多种类型
|
|
file string
|
|
transaction *Transaction
|
|
logFile string
|
|
memoryLimit int64
|
|
memoryUsage int64
|
|
dirtyKeys map[string]bool
|
|
bucketCount int
|
|
index map[string]int64 // 添加索引字段
|
|
wg sync.WaitGroup // 添加WaitGroup以等待异步操作
|
|
}
|
|
|
|
// Transaction represents an ongoing transaction
|
|
type Transaction struct {
|
|
store map[string]interface{}
|
|
}
|
|
|
|
// NewKVStore creates a new instance of KVStore.
|
|
func NewKVStore(file string, memoryLimit int64, bucketCount int) *KVStore {
|
|
store := &KVStore{
|
|
store: make(map[string]interface{}),
|
|
file: file,
|
|
logFile: file + ".log",
|
|
memoryLimit: memoryLimit,
|
|
memoryUsage: 0,
|
|
dirtyKeys: make(map[string]bool),
|
|
bucketCount: bucketCount,
|
|
index: make(map[string]int64), // 初始化索引
|
|
}
|
|
// 启动时自动恢复日志
|
|
if err := store.RecoverFromLog(); err != nil {
|
|
panic(err)
|
|
}
|
|
// 启动定时器,定期存储变化
|
|
go store.periodicSave()
|
|
return store
|
|
}
|
|
|
|
// periodicSave periodically saves dirty keys to file and clears the log
|
|
func (k *KVStore) periodicSave() {
|
|
ticker := time.NewTicker(5 * time.Minute) // 每5分钟保存一次
|
|
for range ticker.C {
|
|
k.mu.Lock()
|
|
if len(k.dirtyKeys) > 0 {
|
|
// 保存变化到文件
|
|
for key := range k.dirtyKeys {
|
|
k.saveKeyToBucket(key)
|
|
}
|
|
// 清空dirtyKeys
|
|
k.dirtyKeys = make(map[string]bool)
|
|
// 清空日志文件
|
|
os.Truncate(k.logFile, 0)
|
|
}
|
|
k.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
func (k *KVStore) getBucketNo(key string) int {
|
|
return hashKey(key) % k.bucketCount
|
|
}
|
|
|
|
// saveKeyToBucket saves a key to its corresponding bucket file
|
|
func (k *KVStore) saveKeyToBucket(key string) error {
|
|
// 计算哈希值确定桶
|
|
hash := k.getBucketNo(key)
|
|
bucketFile := fmt.Sprintf("%s.bucket%d", k.file, hash)
|
|
|
|
// 创建索引文件
|
|
indexFile := fmt.Sprintf("%s.index%d", k.file, hash)
|
|
indexData := make(map[string]int64)
|
|
|
|
// 读取整个桶文件,过滤掉旧键值对
|
|
var entries [][]byte
|
|
if _, err := os.Stat(bucketFile); err == nil {
|
|
file, err := os.Open(bucketFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
|
|
// 读取索引文件
|
|
if _, err := os.Stat(indexFile); err == nil {
|
|
indexFileData, err := os.ReadFile(indexFile)
|
|
if err == nil {
|
|
json.Unmarshal(indexFileData, &indexData)
|
|
}
|
|
}
|
|
|
|
for {
|
|
// 读取键长度
|
|
var keyLen int64
|
|
if err := binary.Read(file, binary.LittleEndian, &keyLen); err != nil {
|
|
break
|
|
}
|
|
|
|
// 读取键
|
|
keyBytes := make([]byte, keyLen)
|
|
if _, err := file.Read(keyBytes); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 读取值长度
|
|
var valueLen int64
|
|
if err := binary.Read(file, binary.LittleEndian, &valueLen); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 读取值
|
|
valueBytes := make([]byte, valueLen)
|
|
if _, err := file.Read(valueBytes); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 如果键不等于当前键,保留该记录
|
|
if string(keyBytes) != key {
|
|
entry := new(bytes.Buffer)
|
|
if err := binary.Write(entry, binary.LittleEndian, keyLen); err != nil {
|
|
return err
|
|
}
|
|
entry.Write(keyBytes)
|
|
if err := binary.Write(entry, binary.LittleEndian, valueLen); err != nil {
|
|
return err
|
|
}
|
|
entry.Write(valueBytes)
|
|
entries = append(entries, entry.Bytes())
|
|
}
|
|
}
|
|
}
|
|
|
|
// 重新写入所有保留的记录和新记录
|
|
file, err := os.Create(bucketFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
|
|
// 更新索引
|
|
offset, _ := file.Seek(0, 1)
|
|
indexData[key] = offset
|
|
|
|
// 写入保留的记录
|
|
for _, entry := range entries {
|
|
if _, err := file.Write(entry); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// 写入新记录
|
|
value, exists := k.store[key]
|
|
if !exists {
|
|
return nil
|
|
}
|
|
|
|
// 序列化值
|
|
valueBytes, _ := json.Marshal(value)
|
|
// switch v := value.(type) {
|
|
// case string:
|
|
// valueBytes = []byte(v)
|
|
// case int:
|
|
// valueBytes = []byte(fmt.Sprintf("%d", v))
|
|
// case bool:
|
|
// valueBytes = []byte(fmt.Sprintf("%t", v))
|
|
// case []string, []int, []bool:
|
|
// valueBytes, _ = json.Marshal(v)
|
|
// default:
|
|
// return fmt.Errorf("unsupported value type")
|
|
// }
|
|
|
|
// 写入键长度和键
|
|
if err := binary.Write(file, binary.LittleEndian, int64(len(key))); err != nil {
|
|
return err
|
|
}
|
|
if _, err := file.WriteString(key); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 写入值长度和值
|
|
if err := binary.Write(file, binary.LittleEndian, int64(len(valueBytes))); err != nil {
|
|
return err
|
|
}
|
|
if _, err := file.Write(valueBytes); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 保存索引文件
|
|
indexDataBytes, _ := json.Marshal(indexData)
|
|
os.WriteFile(indexFile, indexDataBytes, 0644)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (k *KVStore) getKeyFromBucket(key string) (interface{}, bool) {
|
|
// 计算哈希值确定桶
|
|
hash := k.getBucketNo(key)
|
|
bucketFile := fmt.Sprintf("%s.bucket%d", k.file, hash)
|
|
bucketIndexFile := fmt.Sprintf("%s.index%d", k.file, hash)
|
|
// 读取索引文件
|
|
indexData := make(map[string]int64)
|
|
if _, err := os.Stat(bucketIndexFile); err == nil {
|
|
indexFileData, err := os.ReadFile(bucketIndexFile)
|
|
if err == nil {
|
|
json.Unmarshal(indexFileData, &indexData)
|
|
}
|
|
}
|
|
// 检查索引中是否
|
|
if offset, exists := indexData[key]; exists {
|
|
file, err := os.Open(bucketFile)
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
defer file.Close()
|
|
// 定位到偏移量
|
|
_, err = file.Seek(offset, 0)
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
// 读取键长度
|
|
var keyLen int64
|
|
if err := binary.Read(file, binary.LittleEndian, &keyLen); err != nil {
|
|
return nil, false
|
|
}
|
|
// 读取键
|
|
keyBytes := make([]byte, keyLen)
|
|
if _, err := file.Read(keyBytes); err != nil {
|
|
return nil, false
|
|
}
|
|
// 读取值长度
|
|
var valueLen int64
|
|
if err := binary.Read(file, binary.LittleEndian, &valueLen); err != nil {
|
|
return nil, false
|
|
}
|
|
// 读取值
|
|
valueBytes := make([]byte, valueLen)
|
|
if _, err := file.Read(valueBytes); err != nil {
|
|
return nil, false
|
|
}
|
|
// 反序列化值
|
|
var value interface{}
|
|
if err := json.Unmarshal(valueBytes, &value); err != nil {
|
|
return nil, false
|
|
}
|
|
return value, true
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
// hashKey computes a hash for the key
|
|
func hashKey(key string) int {
|
|
hash := 0
|
|
for _, char := range key {
|
|
hash = 31*hash + int(char)
|
|
}
|
|
return hash
|
|
}
|
|
|
|
// Flush saves dirty keys to file and clears the memory
|
|
func (k *KVStore) Flush() error {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
|
|
if len(k.dirtyKeys) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// 保存变化到文件
|
|
for key := range k.dirtyKeys {
|
|
if err := k.saveKeyToBucket(key); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// 清空dirtyKeys
|
|
k.dirtyKeys = make(map[string]bool)
|
|
// 清空日志文件
|
|
if err := os.Truncate(k.logFile, 0); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 清空内存
|
|
k.store = make(map[string]interface{})
|
|
k.memoryUsage = 0
|
|
|
|
return nil
|
|
}
|
|
|
|
func (k *KVStore) valueSize(value interface{}) int64 {
|
|
// 计算值的内存占用
|
|
var valueSize int64
|
|
switch v := value.(type) {
|
|
case string:
|
|
valueSize = int64(len(v))
|
|
case int:
|
|
valueSize = 8 // int在64位系统上通常占8字节
|
|
case bool:
|
|
valueSize = 1 // bool占1字节
|
|
case []string:
|
|
for _, s := range v {
|
|
valueSize += int64(len(s))
|
|
}
|
|
case []int:
|
|
valueSize = int64(len(v)) * 8
|
|
case []bool:
|
|
valueSize = int64(len(v))
|
|
}
|
|
return valueSize
|
|
}
|
|
|
|
// Put adds a key/value pair to the store.
|
|
func (k *KVStore) Put(key string, value interface{}) {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
|
|
// 检查值类型
|
|
switch value.(type) {
|
|
case string, int, bool, []string, []int, []bool:
|
|
// 有效类型,继续
|
|
default:
|
|
panic("unsupported value type")
|
|
}
|
|
|
|
// 如果处于事务中,不检查内存限制
|
|
if k.transaction != nil {
|
|
k.transaction.store[key] = value
|
|
return
|
|
}
|
|
|
|
// 计算值的内存占用
|
|
valueSize := k.valueSize(value)
|
|
|
|
// 检查内存使用量
|
|
newMemoryUsage := k.memoryUsage + int64(len(key)) + valueSize
|
|
if newMemoryUsage > k.memoryLimit {
|
|
if err := k.Flush(); err != nil {
|
|
panic("Failed to flush store")
|
|
}
|
|
newMemoryUsage = int64(len(key)) + valueSize
|
|
}
|
|
|
|
k.store[key] = value
|
|
k.memoryUsage = newMemoryUsage
|
|
k.dirtyKeys[key] = true
|
|
k.wg.Add(1) // 增加WaitGroup计数器
|
|
go func() {
|
|
defer k.wg.Done() // 减少WaitGroup计数器
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
k.LogOperation("put", key, value) // 异步记录操作日志
|
|
}()
|
|
}
|
|
|
|
// Delete removes the key/value pair associated with the given key.
|
|
func (k *KVStore) Delete(key string) {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
|
|
if value, exists := k.store[key]; exists {
|
|
delete(k.store, key)
|
|
valueSize := k.valueSize(value)
|
|
k.memoryUsage -= int64(len(key)) + valueSize
|
|
k.dirtyKeys[key] = true
|
|
k.wg.Add(1) // 增加WaitGroup计数器
|
|
go func() {
|
|
defer k.wg.Done() // 减少WaitGroup计数器
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
k.LogOperation("delete", key, "") // 异步记录操作日志
|
|
}()
|
|
}
|
|
}
|
|
|
|
// Get retrieves the value associated with the given key.
|
|
func (k *KVStore) Get(key string) (interface{}, bool) {
|
|
k.mu.RLock()
|
|
defer k.mu.RUnlock()
|
|
if k.transaction != nil {
|
|
value, exists := k.transaction.store[key]
|
|
if exists {
|
|
return value, true
|
|
}
|
|
}
|
|
value, exists := k.store[key]
|
|
if !exists {
|
|
value, exists = k.getKeyFromBucket(key)
|
|
if exists {
|
|
k.store[key] = value
|
|
} else {
|
|
return nil, false
|
|
}
|
|
}
|
|
return value, exists
|
|
}
|
|
|
|
// BeginTransaction starts a new transaction
|
|
func (k *KVStore) BeginTransaction() {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
k.transaction = &Transaction{
|
|
store: make(map[string]interface{}), // 修改为interface{}以支持多种类型
|
|
}
|
|
}
|
|
|
|
// PutInTransaction puts a key/value pair in the current transaction
|
|
func (k *KVStore) PutInTransaction(key string, value interface{}) {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
if k.transaction != nil {
|
|
// 检查值类型
|
|
switch value.(type) {
|
|
case string, int, bool, []string, []int, []bool:
|
|
k.transaction.store[key] = value
|
|
default:
|
|
panic("unsupported value type")
|
|
}
|
|
} else {
|
|
k.store[key] = value
|
|
}
|
|
}
|
|
|
|
// Commit commits the current transaction
|
|
func (k *KVStore) Commit() error {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
if k.transaction == nil {
|
|
return nil
|
|
}
|
|
|
|
// 计算事务中键值对的内存使用量
|
|
var transactionMemoryUsage int64
|
|
for key, value := range k.transaction.store {
|
|
var valueSize int64
|
|
switch v := value.(type) {
|
|
case string:
|
|
valueSize = int64(len(v))
|
|
case int:
|
|
valueSize = 8 // int在64位系统上通常占8字节
|
|
case bool:
|
|
valueSize = 1 // bool占1字节
|
|
case []string:
|
|
for _, s := range v {
|
|
valueSize += int64(len(s))
|
|
}
|
|
case []int:
|
|
valueSize = int64(len(v)) * 8
|
|
case []bool:
|
|
valueSize = int64(len(v))
|
|
}
|
|
transactionMemoryUsage += int64(len(key)) + valueSize
|
|
}
|
|
|
|
// 检查内存使用量
|
|
newMemoryUsage := k.memoryUsage + transactionMemoryUsage
|
|
if newMemoryUsage > k.memoryLimit {
|
|
if err := k.Flush(); err != nil {
|
|
return err
|
|
}
|
|
newMemoryUsage = transactionMemoryUsage
|
|
}
|
|
|
|
// 提交事务
|
|
for key, value := range k.transaction.store {
|
|
k.store[key] = value
|
|
k.dirtyKeys[key] = true
|
|
}
|
|
k.memoryUsage = newMemoryUsage
|
|
k.transaction = nil
|
|
|
|
return nil
|
|
}
|
|
|
|
// Rollback rolls back the current transaction
|
|
func (k *KVStore) Rollback() {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
k.transaction = nil
|
|
}
|
|
|
|
// RecoverFromLog recovers data from log file
|
|
func (k *KVStore) RecoverFromLog() error {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
|
|
// 检查日志文件是否存在
|
|
if _, err := os.Stat(k.logFile); os.IsNotExist(err) {
|
|
return nil
|
|
}
|
|
|
|
// 读取日志文件
|
|
data, err := os.ReadFile(k.logFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 逐行处理日志
|
|
lines := bytes.Split(data, []byte{'\n'})
|
|
for _, line := range lines {
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
var logEntry map[string]interface{}
|
|
if err := json.Unmarshal(line, &logEntry); err != nil {
|
|
return err
|
|
}
|
|
switch logEntry["op"] {
|
|
case "put":
|
|
k.store[logEntry["key"].(string)] = logEntry["value"]
|
|
case "delete":
|
|
delete(k.store, logEntry["key"].(string))
|
|
}
|
|
}
|
|
|
|
// 清空日志文件
|
|
return os.Truncate(k.logFile, 0)
|
|
}
|
|
|
|
// LogOperation logs a key/value operation to the log file
|
|
func (k *KVStore) LogOperation(op string, key, value interface{}) error {
|
|
logEntry := map[string]interface{}{
|
|
"op": op,
|
|
"key": key,
|
|
"value": value,
|
|
}
|
|
data, err := json.Marshal(logEntry)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
data = append(data, '\n')
|
|
f, err := os.OpenFile(k.logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
_, err = f.Write(data)
|
|
return err
|
|
}
|
|
|
|
// LoadFromFile loads the store from a file with partial loading support
|
|
func (k *KVStore) LoadFromFile(keys ...string) error {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
|
|
// 先加载主数据文件
|
|
if _, err := os.Stat(k.file); err == nil {
|
|
file, err := os.Open(k.file)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
|
|
// 读取键值对数量
|
|
var count int64
|
|
if err := binary.Read(file, binary.LittleEndian, &count); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 逐个读取键值对
|
|
for i := int64(0); i < count; i++ {
|
|
// 读取键长度和键
|
|
var keyLen int64
|
|
if err := binary.Read(file, binary.LittleEndian, &keyLen); err != nil {
|
|
return err
|
|
}
|
|
key := make([]byte, keyLen)
|
|
if _, err := file.Read(key); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 读取值长度和值
|
|
var valueLen int64
|
|
if err := binary.Read(file, binary.LittleEndian, &valueLen); err != nil {
|
|
return err
|
|
}
|
|
value := make([]byte, valueLen)
|
|
if _, err := file.Read(value); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Partial loading
|
|
if len(keys) == 0 {
|
|
k.store[string(key)] = string(value)
|
|
} else {
|
|
for _, targetKey := range keys {
|
|
if targetKey == string(key) {
|
|
k.store[string(key)] = string(value)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// 然后应用日志文件中的操作
|
|
if _, err := os.Stat(k.logFile); err == nil {
|
|
data, err := os.ReadFile(k.logFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
lines := bytes.Split(data, []byte{'\n'})
|
|
for _, line := range lines {
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
var logEntry map[string]string
|
|
if err := json.Unmarshal(line, &logEntry); err != nil {
|
|
return err
|
|
}
|
|
switch logEntry["op"] {
|
|
case "put":
|
|
k.store[logEntry["key"]] = logEntry["value"]
|
|
case "delete":
|
|
delete(k.store, logEntry["key"])
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|