Compare commits
No commits in common. "a07d62577c211958ebcea441ab96511b49c3b7a1" and "857cad7feac27db04cb70d688e11f419878ce2a0" have entirely different histories.
a07d62577c
...
857cad7fea
|
@ -1,131 +0,0 @@
|
||||||
package api
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
import (
|
|
||||||
// "fmt"
|
|
||||||
|
|
||||||
// "go.mongodb.org/mongo-driver/bson"
|
|
||||||
"git.pyer.club/kingecg/godocdb/document"
|
|
||||||
"git.pyer.club/kingecg/godocdb/index"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Collection MongoDB兼容的集合结构
|
|
||||||
type Collection struct {
|
|
||||||
name string
|
|
||||||
indexStore *index.IndexStore
|
|
||||||
documentStore *document.DocumentStore
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
indexStores = make(map[string]*index.IndexStore)
|
|
||||||
indexStoreMu sync.RWMutex
|
|
||||||
)
|
|
||||||
|
|
||||||
// NewCollection 创建新的集合实例
|
|
||||||
func NewCollection(name string, storagePath string) (*Collection, error) {
|
|
||||||
ds, err := document.NewDocumentStore(storagePath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
is, err := getIndexStore(storagePath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Collection{
|
|
||||||
name: name,
|
|
||||||
documentStore: ds,
|
|
||||||
indexStore: is,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// InsertOne 插入单个文档
|
|
||||||
func (coll *Collection) InsertOne(doc interface{}) error {
|
|
||||||
// 自动生成文档ID
|
|
||||||
docID := generateID()
|
|
||||||
|
|
||||||
// 将collection信息传递给文档管理层和索引管理层
|
|
||||||
if err := coll.documentStore.StoreDocument(coll.name, docID, doc); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// 创建相关索引(需要实现generateID)
|
|
||||||
return coll.createRelatedIndexes(doc)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find 查询文档
|
|
||||||
func (coll *Collection) Find(filter interface{}) (*Cursor, error) {
|
|
||||||
// 使用collection信息进行索引查询
|
|
||||||
fieldValue := extractFilterValue(filter) // 需要实现过滤器解析
|
|
||||||
docIDs, err := coll.indexStore.GetIndexedDocuments(coll.name, "default_index", fieldValue)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// 获取实际文档数据
|
|
||||||
docs := make([]interface{}, 0, len(docIDs))
|
|
||||||
for _, id := range docIDs {
|
|
||||||
var doc interface{}
|
|
||||||
if err := coll.documentStore.GetDocument(coll.name, id, &doc); err == nil {
|
|
||||||
docs = append(docs, doc)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return newCursor(docs), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// 以下为简化实现,需要补充完整
|
|
||||||
|
|
||||||
func (coll *Collection) createRelatedIndexes(doc interface{}) error {
|
|
||||||
// 实现自动创建相关索引(根据注解或配置)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getIndexStore(path string) (*index.IndexStore, error) {
|
|
||||||
indexStoreMu.RLock()
|
|
||||||
if is, ok := indexStores[path]; ok {
|
|
||||||
indexStoreMu.RUnlock()
|
|
||||||
return is, nil
|
|
||||||
}
|
|
||||||
indexStoreMu.RUnlock()
|
|
||||||
|
|
||||||
indexStoreMu.Lock()
|
|
||||||
defer indexStoreMu.Unlock()
|
|
||||||
|
|
||||||
// Double-check in case it was added before acquiring the lock
|
|
||||||
if is, ok := indexStores[path]; ok {
|
|
||||||
return is, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
is, err := index.NewIndexStore(path)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
indexStores[path] = is
|
|
||||||
return is, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func generateID() string {
|
|
||||||
// 生成唯一文档ID
|
|
||||||
return "doc_123456"
|
|
||||||
}
|
|
||||||
|
|
||||||
func extractFilterValue(filter interface{}) interface{} {
|
|
||||||
// 解析过滤条件获取字段值
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cursor 游标用于遍历查询结果
|
|
||||||
type Cursor struct {
|
|
||||||
// ... 实现细节 ...
|
|
||||||
}
|
|
||||||
|
|
||||||
func newCursor(docs []interface{}) *Cursor {
|
|
||||||
// 创建新的游标实例
|
|
||||||
return &Cursor{}
|
|
||||||
}
|
|
|
@ -2,225 +2,52 @@ package document
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"git.pyer.club/kingecg/godocdb/index"
|
|
||||||
"git.pyer.club/kingecg/godocdb/storage"
|
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
"git.pyer.club/kingecg/godocdb/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DocumentStore 管理带命名空间的文档存储
|
// DocumentStore 管理文档的存储和检索
|
||||||
type DocumentStore struct {
|
type DocumentStore struct {
|
||||||
storage *storage.LevelDBStorage
|
storage *storage.LevelDBStorage
|
||||||
indexStore *index.IndexStore // 添加索引存储依赖
|
|
||||||
mu sync.RWMutex // 保护并发访问
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDocumentStore 创建新的文档存储实例
|
// NewDocumentStore 创建新的文档存储实例
|
||||||
func NewDocumentStore(path string) (*DocumentStore, error) {
|
func NewDocumentStore(path string) (*DocumentStore, error) {
|
||||||
// 确保目录存在
|
|
||||||
if err := os.MkdirAll(path, 0755); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create storage directory: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
storage, err := storage.NewLevelDBStorage(path)
|
storage, err := storage.NewLevelDBStorage(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create LevelDB storage: %v", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
|
return &DocumentStore{storage: storage}, nil
|
||||||
// 创建关联的索引存储
|
|
||||||
is, err := index.NewIndexStore(path)
|
|
||||||
if err != nil {
|
|
||||||
storage.Close()
|
|
||||||
return nil, fmt.Errorf("failed to create index store: %v", err)
|
|
||||||
}
|
|
||||||
if errCreate := is.CreateIndex("default_index", index.NonUnique, []string{"_id"}, []index.IndexSortOrder{index.Ascending}); errCreate != nil {
|
|
||||||
return nil, errCreate
|
|
||||||
}
|
|
||||||
|
|
||||||
return &DocumentStore{
|
|
||||||
storage: storage,
|
|
||||||
indexStore: is,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// StoreDocument 带collection标识的文档存储
|
// StoreDocument 存储文档
|
||||||
func (ds *DocumentStore) StoreDocument(collection string, id string, doc interface{}) error {
|
func (ds *DocumentStore) StoreDocument(id string, doc interface{}) error {
|
||||||
// 类型断言
|
// 使用BSON序列化文档
|
||||||
docMap, ok := doc.(map[string]interface{})
|
data, err := bson.Marshal(doc)
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("document must be a map[string]interface{}")
|
|
||||||
}
|
|
||||||
|
|
||||||
// 自动生成文档ID
|
|
||||||
if id == "" {
|
|
||||||
// 使用文档自带的_id字段
|
|
||||||
if docID, exists := docMap["_id"].(primitive.ObjectID); exists {
|
|
||||||
id = docID.Hex()
|
|
||||||
} else {
|
|
||||||
// 自动生成新ID
|
|
||||||
newID := primitive.NewObjectID()
|
|
||||||
docMap["_id"] = newID
|
|
||||||
id = newID.Hex()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 序列化文档
|
|
||||||
data, err := bson.Marshal(docMap)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to marshal document: %v", err)
|
return fmt.Errorf("failed to marshal document: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 存储文档到LevelDB
|
// 存储文档(格式: collection:id → BSON数据)
|
||||||
key := []byte(fmt.Sprintf("documents:%s:%s", collection, id))
|
key := []byte(fmt.Sprintf("documents:%s", id))
|
||||||
if err := ds.storage.Put(key, data); err != nil {
|
return ds.storage.Put(key, data)
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// 获取相关索引
|
|
||||||
relatedIndexes, err := getRelatedIndexes(docMap, collection, ds.indexStore)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新所有相关索引
|
|
||||||
for _, indexName := range relatedIndexes {
|
|
||||||
metadata, err := ds.indexStore.GetIndexMetadata(indexName)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, field := range metadata.KeyFields {
|
|
||||||
fieldValue, exists := extractFieldValue(docMap, field)
|
|
||||||
if !exists {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新索引并处理错误
|
|
||||||
if err := ds.indexStore.UpdateIndex(collection, indexName, fieldValue, id); err != nil {
|
|
||||||
// 如果是唯一索引冲突,返回错误
|
|
||||||
if strings.Contains(err.Error(), "duplicate key error") {
|
|
||||||
return fmt.Errorf("unique index constraint violated for %s.%s", collection, field)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 创建默认索引(如果不存在)
|
|
||||||
if _, err := ds.indexStore.GetIndexMetadata("default_index"); err != nil {
|
|
||||||
// 如果索引不存在,创建默认索引
|
|
||||||
if errCreate := ds.indexStore.CreateIndex("default_index", index.NonUnique, []string{"_id"}, nil); errCreate != nil {
|
|
||||||
return errCreate
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetDocument 带collection标识的文档获取
|
// GetDocument 获取文档
|
||||||
func (ds *DocumentStore) GetDocument(collection string, id string, doc interface{}) error {
|
func (ds *DocumentStore) GetDocument(id string, result interface{}) error {
|
||||||
// 在文档键中加入collection标识
|
key := []byte(fmt.Sprintf("documents:%s", id))
|
||||||
key := []byte(fmt.Sprintf("documents:%s:%s", collection, id))
|
data, err := ds.storage.Get(key)
|
||||||
|
|
||||||
// 从LevelDB获取数据
|
|
||||||
rawData, err := ds.storage.Get(key)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("document not found: %v", err)
|
return fmt.Errorf("document not found: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 反序列化BSON数据
|
// 使用BSON反序列化文档
|
||||||
if err := bson.Unmarshal(rawData, doc); err != nil {
|
return bson.Unmarshal(data, result)
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRelatedIndexes 获取文档关联的索引
|
// DeleteDocument 删除文档
|
||||||
func getRelatedIndexes(doc map[string]interface{}, coll string, is *index.IndexStore) ([]string, error) {
|
func (ds *DocumentStore) DeleteDocument(id string) error {
|
||||||
// 获取所有索引元数据
|
key := []byte(fmt.Sprintf("documents:%s", id))
|
||||||
metadataList, err := is.GetAllIndexMetadata(coll)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var related []string
|
|
||||||
for _, md := range metadataList {
|
|
||||||
// 检查文档是否包含索引字段
|
|
||||||
for _, field := range md.KeyFields {
|
|
||||||
if _, exists := doc[field]; exists {
|
|
||||||
related = append(related, md.Name)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 如果没有匹配的索引,尝试获取默认索引
|
|
||||||
if len(related) == 0 {
|
|
||||||
_, err := is.GetIndexMetadata("default_index")
|
|
||||||
if err == nil {
|
|
||||||
related = append(related, "default_index")
|
|
||||||
} else {
|
|
||||||
return nil, fmt.Errorf("no suitable index found for collection %s", coll)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return related, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExtractFieldValue 提取文档字段值
|
|
||||||
func extractFieldValue(doc map[string]interface{}, field string) (interface{}, bool) {
|
|
||||||
value, exists := doc[field]
|
|
||||||
return value, exists
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteDocument 删除文档并更新索引
|
|
||||||
func (ds *DocumentStore) DeleteDocument(collection string, id string) error {
|
|
||||||
ds.mu.Lock()
|
|
||||||
defer ds.mu.Unlock()
|
|
||||||
|
|
||||||
// 构建文档键
|
|
||||||
key := []byte(fmt.Sprintf("documents:%s:%s", collection, id))
|
|
||||||
|
|
||||||
// 获取文档内容以获取相关索引
|
|
||||||
rawData, err := ds.storage.Get(key)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("document not found: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 解析文档
|
|
||||||
var docMap map[string]interface{}
|
|
||||||
if err := bson.Unmarshal(rawData, &docMap); err != nil {
|
|
||||||
return fmt.Errorf("failed to unmarshal document: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 获取相关索引
|
|
||||||
relatedIndexes, err := getRelatedIndexes(docMap, collection, ds.indexStore)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// 删除相关索引条目
|
|
||||||
for _, indexName := range relatedIndexes {
|
|
||||||
metadata, err := ds.indexStore.GetIndexMetadata(indexName)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, field := range metadata.KeyFields {
|
|
||||||
fieldValue, exists := extractFieldValue(docMap, field)
|
|
||||||
if !exists {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// 从索引中删除
|
|
||||||
_ = ds.indexStore.DeleteIndex(collection, indexName, fieldValue, id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 最后删除文档
|
|
||||||
return ds.storage.Delete(key)
|
return ds.storage.Delete(key)
|
||||||
}
|
}
|
|
@ -26,33 +26,37 @@ func TestDocumentStore(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer ds.storage.Close()
|
defer ds.storage.Close()
|
||||||
|
|
||||||
// 测试文档ID和数据
|
// 测试文档ID
|
||||||
docID := "doc123"
|
testID := "001"
|
||||||
doc := map[string]interface{}{
|
|
||||||
"_id": docID,
|
// 测试文档内容
|
||||||
"name": "Test Document",
|
testDoc := TestDoc{
|
||||||
|
ID: testID,
|
||||||
|
Name: "Alice",
|
||||||
|
Age: 30,
|
||||||
}
|
}
|
||||||
|
|
||||||
// 测试基本操作(使用默认collection)
|
// 测试存储和获取
|
||||||
collection := "test_collection"
|
if err := ds.StoreDocument(testID, testDoc); err != nil {
|
||||||
if err := ds.StoreDocument(collection, docID, doc); err != nil {
|
|
||||||
t.Errorf("StoreDocument failed: %v", err)
|
t.Errorf("StoreDocument failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 验证存储结果
|
var result TestDoc
|
||||||
var result map[string]interface{}
|
if err := ds.GetDocument(testID, &result); err != nil {
|
||||||
if err := ds.GetDocument(collection, docID, &result); err != nil {
|
|
||||||
t.Errorf("GetDocument failed: %v", err)
|
t.Errorf("GetDocument failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 验证删除功能
|
if result.Name != testDoc.Name || result.Age != testDoc.Age {
|
||||||
if err := ds.DeleteDocument(collection, docID); err != nil {
|
t.Errorf("Retrieved document mismatch: got %+v want %+v", result, testDoc)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 测试删除功能
|
||||||
|
if err := ds.DeleteDocument(testID); err != nil {
|
||||||
t.Errorf("DeleteDocument failed: %v", err)
|
t.Errorf("DeleteDocument failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 删除后验证
|
if err := ds.GetDocument(testID, &result); err == nil {
|
||||||
if err := ds.GetDocument(collection, docID, &result); err == nil {
|
t.Errorf("Expected error after DeleteDocument")
|
||||||
t.Error("Expected error after DeleteDocument")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,20 +66,13 @@ func TestErrorHandling(t *testing.T) {
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
ds, _ := NewDocumentStore(dir)
|
ds, _ := NewDocumentStore(dir)
|
||||||
|
|
||||||
// 测试存储非map文档
|
|
||||||
invalidDoc := struct {
|
invalidDoc := struct {
|
||||||
F func()
|
F func()
|
||||||
}{}
|
}{}
|
||||||
|
|
||||||
if err := ds.StoreDocument("invalid", "doc1", invalidDoc); err == nil {
|
if err := ds.StoreDocument("invalid", invalidDoc); err == nil {
|
||||||
t.Error("Expected error for invalid document")
|
t.Error("Expected error for invalid document")
|
||||||
}
|
}
|
||||||
|
|
||||||
// 测试存储nil文档
|
|
||||||
if err := ds.StoreDocument("invalid", "doc2", nil); err == nil {
|
|
||||||
t.Error("Expected error for nil document")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConcurrentAccess(t *testing.T) {
|
func TestConcurrentAccess(t *testing.T) {
|
||||||
|
@ -87,22 +84,17 @@ func TestConcurrentAccess(t *testing.T) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
key := "concurrent_test"
|
key := "concurrent_test"
|
||||||
|
|
||||||
ds, err := NewDocumentStore(dir)
|
ds, _ := NewDocumentStore(dir)
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create document store: %v", err)
|
|
||||||
}
|
|
||||||
defer ds.storage.Close()
|
|
||||||
|
|
||||||
wg.Add(numGoroutines)
|
wg.Add(numGoroutines)
|
||||||
|
|
||||||
for i := 0; i < numGoroutines; i++ {
|
for i := 0; i < numGoroutines; i++ {
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
testDoc := map[string]interface{}{
|
testDoc := TestDoc{
|
||||||
"_id": key,
|
ID: key,
|
||||||
"name": fmt.Sprintf("Test%d", i),
|
Name: fmt.Sprintf("Test%d", i),
|
||||||
}
|
}
|
||||||
err := ds.StoreDocument(key, fmt.Sprintf("doc%d", i), testDoc)
|
err := ds.StoreDocument(key, testDoc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("StoreDocument failed: %v", err)
|
t.Errorf("StoreDocument failed: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -111,8 +103,8 @@ func TestConcurrentAccess(t *testing.T) {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
// 验证最终值是否为最后一个写入
|
// 验证最终值是否为最后一个写入
|
||||||
var result map[string]interface{}
|
var result TestDoc
|
||||||
if err := ds.GetDocument(key, "doc9", &result); err != nil {
|
if err := ds.GetDocument(key, &result); err != nil {
|
||||||
t.Errorf("GetDocument failed: %v", err)
|
t.Errorf("GetDocument failed: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
252
index/index.go
252
index/index.go
|
@ -1,22 +1,19 @@
|
||||||
package index
|
package index
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"git.pyer.club/kingecg/godocdb/storage"
|
"github.com/iancoleman/orderedmap"
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
"git.pyer.club/kingecg/godocdb/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
// IndexType 索引类型
|
// IndexType 表示索引类型
|
||||||
type IndexType int
|
type IndexType string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
NonUnique IndexType = iota
|
SingleField IndexType = "single"
|
||||||
Unique
|
Composite IndexType = "composite"
|
||||||
)
|
)
|
||||||
|
|
||||||
// IndexSortOrder 表示索引的排序方式
|
// IndexSortOrder 表示索引的排序方式
|
||||||
|
@ -33,46 +30,24 @@ type IndexMetadata struct {
|
||||||
Type IndexType
|
Type IndexType
|
||||||
KeyFields []string
|
KeyFields []string
|
||||||
SortOrders []IndexSortOrder // 每个字段对应的排序方式(长度应与KeyFields一致)
|
SortOrders []IndexSortOrder // 每个字段对应的排序方式(长度应与KeyFields一致)
|
||||||
Version int // 索引版本号,用于缓存一致性校验
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// IndexStore 管理索引存储
|
// IndexStore 管理索引的存储和查询
|
||||||
type IndexStore struct {
|
type IndexStore struct {
|
||||||
storage *storage.LevelDBStorage
|
storage *storage.LevelDBStorage
|
||||||
indexes map[string]map[string]string // 缓存索引数据 {indexName: {key: value}}
|
|
||||||
mu sync.RWMutex // 保护并发访问
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewIndexStore 创建新的索引存储实例
|
// NewIndexStore 创建新的索引存储实例
|
||||||
func NewIndexStore(path string) (*IndexStore, error) {
|
func NewIndexStore(path string) (*IndexStore, error) {
|
||||||
// 确保目录存在
|
|
||||||
if strings.HasPrefix(path, "/") {
|
|
||||||
path = strings.TrimSuffix(path, "/")
|
|
||||||
}
|
|
||||||
if !strings.HasSuffix(path, "_index") {
|
|
||||||
// add _index suffix to avoid name confilict with document store, do remove this
|
|
||||||
path += "_index"
|
|
||||||
}
|
|
||||||
if err := os.MkdirAll(path, 0755); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create storage directory: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
storage, err := storage.NewLevelDBStorage(path)
|
storage, err := storage.NewLevelDBStorage(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create LevelDB storage: %v", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
|
return &IndexStore{storage: storage}, nil
|
||||||
return &IndexStore{
|
|
||||||
storage: storage,
|
|
||||||
indexes: make(map[string]map[string]string),
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateIndex 创建索引
|
// CreateIndex 创建索引
|
||||||
func (is *IndexStore) CreateIndex(indexName string, indexType IndexType, keyFields []string, sortOrders []IndexSortOrder) error {
|
func (is *IndexStore) CreateIndex(indexName string, indexType IndexType, keyFields []string, sortOrders []IndexSortOrder) error {
|
||||||
is.mu.Lock()
|
|
||||||
defer is.mu.Unlock()
|
|
||||||
|
|
||||||
// 验证keyFields和sortOrders长度一致
|
// 验证keyFields和sortOrders长度一致
|
||||||
if len(keyFields) != len(sortOrders) {
|
if len(keyFields) != len(sortOrders) {
|
||||||
return fmt.Errorf("keyFields and sortOrders must have the same length")
|
return fmt.Errorf("keyFields and sortOrders must have the same length")
|
||||||
|
@ -83,8 +58,7 @@ func (is *IndexStore) CreateIndex(indexName string, indexType IndexType, keyFiel
|
||||||
Name: indexName,
|
Name: indexName,
|
||||||
Type: indexType,
|
Type: indexType,
|
||||||
KeyFields: keyFields,
|
KeyFields: keyFields,
|
||||||
SortOrders: sortOrders,
|
SortOrders: sortOrders, // 使用字段级排序方式
|
||||||
Version: 1, // 初始版本号
|
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := bson.Marshal(metadata)
|
data, err := bson.Marshal(metadata)
|
||||||
|
@ -97,226 +71,38 @@ func (is *IndexStore) CreateIndex(indexName string, indexType IndexType, keyFiel
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// 初始化索引结构
|
// 初始化索引存储结构
|
||||||
index := make(map[string]string)
|
|
||||||
|
|
||||||
// 存储初始索引结构
|
|
||||||
indexKey := fmt.Sprintf("indexes:data:%s", indexName)
|
indexKey := fmt.Sprintf("indexes:data:%s", indexName)
|
||||||
indexData, err := bson.Marshal(index)
|
index := orderedmap.New()
|
||||||
if err != nil {
|
indexData, _ := bson.Marshal(index)
|
||||||
return fmt.Errorf("failed to marshal index data: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return is.storage.Put([]byte(indexKey), indexData)
|
return is.storage.Put([]byte(indexKey), indexData)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateIndex 在指定collection下更新索引
|
|
||||||
func (is *IndexStore) UpdateIndex(collection string, indexName string, fieldValue interface{}, documentID string) error {
|
|
||||||
is.mu.Lock()
|
|
||||||
defer is.mu.Unlock()
|
|
||||||
|
|
||||||
// 增加空值检查
|
|
||||||
if fieldValue == nil {
|
|
||||||
return fmt.Errorf("cannot update index with nil field value")
|
|
||||||
}
|
|
||||||
|
|
||||||
// 从存储加载当前索引
|
|
||||||
indexKey := []byte(fmt.Sprintf("indexes:data:%s", indexName))
|
|
||||||
storedData, err := is.storage.Get(indexKey)
|
|
||||||
if err != nil {
|
|
||||||
// 如果不存在则创建新索引
|
|
||||||
storedData, _ = bson.Marshal(make(map[string]string))
|
|
||||||
}
|
|
||||||
|
|
||||||
// 反序列化索引数据
|
|
||||||
var index map[string]string
|
|
||||||
if err := bson.Unmarshal(storedData, &index); err != nil {
|
|
||||||
return fmt.Errorf("failed to unmarshal index data: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 获取索引元数据
|
|
||||||
metadata, err := is.GetIndexMetadata(indexName)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get index metadata: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新索引
|
|
||||||
indexKeyField := fmt.Sprintf("%s:%v", collection, fieldValue)
|
|
||||||
if _, ok := index[indexKeyField]; ok {
|
|
||||||
// 处理重复值的逻辑(需要根据索引类型决定是否覆盖)
|
|
||||||
if metadata.Type == Unique {
|
|
||||||
return fmt.Errorf("duplicate key error for unique index")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
index[indexKeyField] = documentID
|
|
||||||
|
|
||||||
// 序列化并存储更新后的索引
|
|
||||||
updatedData, err := bson.Marshal(index)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to marshal updated index: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return is.storage.Put(indexKey, updatedData)
|
|
||||||
}
|
|
||||||
|
|
||||||
// getOrCreateIndex 获取现有索引或创建新索引
|
|
||||||
func (is *IndexStore) getOrCreateIndex(indexName string) (map[string]string, error) {
|
|
||||||
// 检查内存缓存
|
|
||||||
if index, ok := is.indexes[indexName]; ok {
|
|
||||||
return index, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// 从存储中加载
|
|
||||||
indexKey := fmt.Sprintf("indexes:data:%s", indexName)
|
|
||||||
storedData, err := is.storage.Get([]byte(indexKey))
|
|
||||||
if err != nil {
|
|
||||||
// 如果不存在则创建空索引
|
|
||||||
return make(map[string]string), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// 反序列化索引数据
|
|
||||||
var loadedIndex map[string]string
|
|
||||||
if err := bson.Unmarshal(storedData, &loadedIndex); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to unmarshal index data: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return loadedIndex, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetIndexedDocuments 查询特定collection下的文档
|
|
||||||
func (is *IndexStore) GetIndexedDocuments(collection string, indexName string, fieldValue interface{}) ([]string, error) {
|
|
||||||
is.mu.RLock()
|
|
||||||
defer is.mu.RUnlock()
|
|
||||||
|
|
||||||
// 构造前缀
|
|
||||||
prefix := fmt.Sprintf("%s:%v", collection, fieldValue)
|
|
||||||
|
|
||||||
// 直接从存储读取
|
|
||||||
indexKey := fmt.Sprintf("indexes:data:%s", indexName)
|
|
||||||
storedData, err := is.storage.Get([]byte(indexKey))
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("index not found: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var loadedIndex map[string]string
|
|
||||||
if err := bson.Unmarshal(storedData, &loadedIndex); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var results []string
|
|
||||||
|
|
||||||
// 遍历查找匹配项
|
|
||||||
for key, value := range loadedIndex {
|
|
||||||
if strings.HasPrefix(key, prefix) {
|
|
||||||
results = append(results, value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return results, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetAllIndexMetadata 获取所有索引的元数据
|
|
||||||
func (is *IndexStore) GetAllIndexMetadata(collection string) ([]IndexMetadata, error) {
|
|
||||||
// 构建前缀键(注意这里移除了多余的冒号)
|
|
||||||
prefix := []byte(fmt.Sprintf("indexes:metadata:%s:", collection))
|
|
||||||
|
|
||||||
// 使用storage的Scan方法替代直接访问db字段
|
|
||||||
keys, _, err := is.storage.Scan(prefix)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to scan metadata: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var metadataList []IndexMetadata
|
|
||||||
for _, key := range keys {
|
|
||||||
// 验证键是否匹配前缀
|
|
||||||
if !bytes.HasPrefix(key, prefix) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// 读取元数据值
|
|
||||||
value, err := is.storage.Get(key)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// 解析元数据
|
|
||||||
var md IndexMetadata
|
|
||||||
if err := bson.Unmarshal(value, &md); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
metadataList = append(metadataList, md)
|
|
||||||
}
|
|
||||||
|
|
||||||
return metadataList, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteIndex 删除索引条目
|
|
||||||
func (is *IndexStore) DeleteIndex(collection string, indexName string, fieldValue interface{}, documentID string) error {
|
|
||||||
is.mu.Lock()
|
|
||||||
defer is.mu.Unlock()
|
|
||||||
|
|
||||||
// 修复索引键的构造方式,使用统一格式
|
|
||||||
indexKey := []byte(fmt.Sprintf("indexes:data:%s:%s:%v", collection, indexName, fieldValue))
|
|
||||||
|
|
||||||
// 从LevelDB删除
|
|
||||||
return is.storage.Delete(indexKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
// DropIndex 删除索引
|
// DropIndex 删除索引
|
||||||
func (is *IndexStore) DropIndex(indexName string) error {
|
func (is *IndexStore) DropIndex(indexName string) error {
|
||||||
is.mu.Lock()
|
|
||||||
defer is.mu.Unlock()
|
|
||||||
|
|
||||||
// 删除元数据
|
// 删除元数据
|
||||||
metadataKey := []byte(fmt.Sprintf("indexes:metadata:%s", indexName))
|
metadataKey := []byte(fmt.Sprintf("indexes:metadata:%s", indexName))
|
||||||
if err := is.storage.Delete(metadataKey); err != nil {
|
if err := is.storage.Delete(metadataKey); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// 删除内存中的缓存
|
|
||||||
delete(is.indexes, indexName)
|
|
||||||
|
|
||||||
// 删除索引数据
|
// 删除索引数据
|
||||||
indexKey := []byte(fmt.Sprintf("indexes:data:%s", indexName))
|
indexKey := []byte(fmt.Sprintf("indexes:data:%s", indexName))
|
||||||
return is.storage.Delete(indexKey)
|
return is.storage.Delete(indexKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetIndexMetadata 获取索引元数据
|
// GetIndexMetadata 获取索引元数据
|
||||||
func (is *IndexStore) GetIndexMetadata(indexName string) (IndexMetadata, error) {
|
func (is *IndexStore) GetIndexMetadata(indexName string) (*IndexMetadata, error) {
|
||||||
key := []byte(fmt.Sprintf("indexes:metadata:%s", indexName))
|
key := []byte(fmt.Sprintf("indexes:metadata:%s", indexName))
|
||||||
storedData, err := is.storage.Get(key)
|
rawData, err := is.storage.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return IndexMetadata{}, fmt.Errorf("index not found: %v", err)
|
return nil, fmt.Errorf("index not found: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var metadata IndexMetadata
|
var metadata IndexMetadata
|
||||||
if err := bson.Unmarshal(storedData, &metadata); err != nil {
|
if err := bson.Unmarshal(rawData, &metadata); err != nil {
|
||||||
return IndexMetadata{}, fmt.Errorf("failed to unmarshal index metadata: %v", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return metadata, nil
|
return &metadata, nil
|
||||||
}
|
|
||||||
|
|
||||||
// FlushIndexToDisk 将内存中的索引更新写入磁盘
|
|
||||||
func (is *IndexStore) FlushIndexToDisk(indexName string) error {
|
|
||||||
// 因为现在直接操作存储,不需要单独的刷新方法
|
|
||||||
// 现在Flush只是简单验证索引是否存在
|
|
||||||
is.mu.RLock()
|
|
||||||
defer is.mu.RUnlock()
|
|
||||||
|
|
||||||
if _, ok := is.indexes[indexName]; !ok {
|
|
||||||
return fmt.Errorf("index not found in memory: %s", indexName)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClearIndexCache 清除内存中的索引缓存
|
|
||||||
func (is *IndexStore) ClearIndexCache(indexName string) {
|
|
||||||
is.mu.Lock()
|
|
||||||
defer is.mu.Unlock()
|
|
||||||
|
|
||||||
delete(is.indexes, indexName)
|
|
||||||
}
|
}
|
|
@ -24,7 +24,7 @@ func TestIndexStore(t *testing.T) {
|
||||||
keyFields := []string{"name"}
|
keyFields := []string{"name"}
|
||||||
|
|
||||||
// 测试创建索引(默认升序)
|
// 测试创建索引(默认升序)
|
||||||
if err := is.CreateIndex(indexName, NonUnique, keyFields, []IndexSortOrder{Ascending}); err != nil {
|
if err := is.CreateIndex(indexName, SingleField, keyFields, []IndexSortOrder{Ascending}); err != nil {
|
||||||
t.Errorf("CreateIndex failed: %v", err)
|
t.Errorf("CreateIndex failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,8 +34,8 @@ func TestIndexStore(t *testing.T) {
|
||||||
t.Errorf("GetIndexMetadata failed: %v", err)
|
t.Errorf("GetIndexMetadata failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if metadata.Name != indexName || metadata.Type != NonUnique {
|
if metadata.Name != indexName || metadata.Type != SingleField {
|
||||||
t.Errorf("Metadata mismatch: got %+v want name=%s type=%d", metadata, indexName, NonUnique)
|
t.Errorf("Metadata mismatch: got %+v want name=%s type=%s", metadata, indexName, SingleField)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 测试删除索引
|
// 测试删除索引
|
||||||
|
@ -67,7 +67,7 @@ func TestCompositeIndex(t *testing.T) {
|
||||||
keyFields := []string{"name", "age"}
|
keyFields := []string{"name", "age"}
|
||||||
|
|
||||||
// 创建复合索引(默认升序)
|
// 创建复合索引(默认升序)
|
||||||
if err := is.CreateIndex(indexName, NonUnique, keyFields, []IndexSortOrder{Ascending, Ascending}); err != nil {
|
if err := is.CreateIndex(indexName, Composite, keyFields, []IndexSortOrder{Ascending, Ascending}); err != nil {
|
||||||
t.Errorf("CreateIndex failed: %v", err)
|
t.Errorf("CreateIndex failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,8 +77,8 @@ func TestCompositeIndex(t *testing.T) {
|
||||||
t.Errorf("GetIndexMetadata failed: %v", err)
|
t.Errorf("GetIndexMetadata failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if metadata.Type != NonUnique || len(metadata.KeyFields) != 2 {
|
if metadata.Type != Composite || len(metadata.KeyFields) != 2 {
|
||||||
t.Errorf("Composite index metadata mismatch: got %+v want type=%d fieldsCount=2", metadata, NonUnique)
|
t.Errorf("Composite index metadata mismatch: got %+v want type=%s fieldsCount=2", metadata, Composite)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,7 +99,7 @@ func TestIndexSortOrder(t *testing.T) {
|
||||||
keyFields := []string{"timestamp"}
|
keyFields := []string{"timestamp"}
|
||||||
|
|
||||||
// 测试创建升序索引
|
// 测试创建升序索引
|
||||||
if err := is.CreateIndex(indexName, NonUnique, keyFields, []IndexSortOrder{Ascending}); err != nil {
|
if err := is.CreateIndex(indexName, SingleField, keyFields, []IndexSortOrder{Ascending}); err != nil {
|
||||||
t.Errorf("CreateIndex failed: %v", err)
|
t.Errorf("CreateIndex failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,7 +115,7 @@ func TestIndexSortOrder(t *testing.T) {
|
||||||
|
|
||||||
// 测试创建降序索引
|
// 测试创建降序索引
|
||||||
indexNameDesc := "sorted_index_desc"
|
indexNameDesc := "sorted_index_desc"
|
||||||
if err := is.CreateIndex(indexNameDesc, NonUnique, keyFields, []IndexSortOrder{Descending}); err != nil {
|
if err := is.CreateIndex(indexNameDesc, SingleField, keyFields, []IndexSortOrder{Descending}); err != nil {
|
||||||
t.Errorf("CreateIndex failed: %v", err)
|
t.Errorf("CreateIndex failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -147,7 +147,7 @@ func TestCompositeIndexSortOrders(t *testing.T) {
|
||||||
sortOrders := []IndexSortOrder{Descending, Ascending}
|
sortOrders := []IndexSortOrder{Descending, Ascending}
|
||||||
|
|
||||||
// 创建复合索引
|
// 创建复合索引
|
||||||
if err := is.CreateIndex(indexName, NonUnique, keyFields, sortOrders); err != nil {
|
if err := is.CreateIndex(indexName, Composite, keyFields, sortOrders); err != nil {
|
||||||
t.Errorf("CreateIndex failed: %v", err)
|
t.Errorf("CreateIndex failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,8 +157,8 @@ func TestCompositeIndexSortOrders(t *testing.T) {
|
||||||
t.Errorf("GetIndexMetadata failed: %v", err)
|
t.Errorf("GetIndexMetadata failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if metadata.Type != NonUnique || len(metadata.KeyFields) != 2 {
|
if metadata.Type != Composite || len(metadata.KeyFields) != 2 {
|
||||||
t.Errorf("Composite index metadata mismatch: got %+v want type=%d fieldsCount=2", metadata, NonUnique)
|
t.Errorf("Composite index metadata mismatch: got %+v want type=%s fieldsCount=2", metadata, Composite)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 验证每个字段的排序方式
|
// 验证每个字段的排序方式
|
||||||
|
@ -167,7 +167,7 @@ func TestCompositeIndexSortOrders(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkNonUniqueQuery(b *testing.B) {
|
func BenchmarkSingleFieldQuery(b *testing.B) {
|
||||||
// 基准测试单字段查询性能
|
// 基准测试单字段查询性能
|
||||||
dir := "./testdb_bench"
|
dir := "./testdb_bench"
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
@ -176,7 +176,7 @@ func BenchmarkNonUniqueQuery(b *testing.B) {
|
||||||
|
|
||||||
// 创建测试数据
|
// 创建测试数据
|
||||||
for i := 0; i < 1000; i++ {
|
for i := 0; i < 1000; i++ {
|
||||||
is.CreateIndex(fmt.Sprintf("index_%d", i), NonUnique, []string{"name"}, []IndexSortOrder{Ascending})
|
is.CreateIndex(fmt.Sprintf("index_%d", i), SingleField, []string{"name"}, []IndexSortOrder{Ascending})
|
||||||
}
|
}
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
@ -206,7 +206,7 @@ func TestConcurrentIndexOperations(t *testing.T) {
|
||||||
indexName := fmt.Sprintf("concurrent_index_%d", i)
|
indexName := fmt.Sprintf("concurrent_index_%d", i)
|
||||||
|
|
||||||
// 创建索引(默认升序)
|
// 创建索引(默认升序)
|
||||||
if err := is.CreateIndex(indexName, NonUnique, []string{"name"}, []IndexSortOrder{Ascending}); err != nil {
|
if err := is.CreateIndex(indexName, SingleField, []string{"name"}, []IndexSortOrder{Ascending}); err != nil {
|
||||||
t.Errorf("CreateIndex failed: %v", err)
|
t.Errorf("CreateIndex failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
"github.com/syndtr/goleveldb/leveldb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -34,25 +33,6 @@ func (s *LevelDBStorage) Delete(key []byte) error {
|
||||||
return s.db.Delete(key, nil)
|
return s.db.Delete(key, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Scan 执行范围扫描
|
|
||||||
func (s *LevelDBStorage) Scan(prefix []byte) (keys [][]byte, values [][]byte, err error) {
|
|
||||||
iter := s.db.NewIterator(nil, nil)
|
|
||||||
defer iter.Release()
|
|
||||||
|
|
||||||
for iter.Next() {
|
|
||||||
if bytes.HasPrefix(iter.Key(), prefix) {
|
|
||||||
keys = append(keys, iter.Key())
|
|
||||||
values = append(values, iter.Value())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := iter.Error(); err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return keys, values, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close 关闭数据库连接
|
// Close 关闭数据库连接
|
||||||
func (s *LevelDBStorage) Close() {
|
func (s *LevelDBStorage) Close() {
|
||||||
s.db.Close()
|
s.db.Close()
|
||||||
|
|
Loading…
Reference in New Issue