diff --git a/cmd/server/config.go b/cmd/server/config.go index 5ba1971..c739b9e 100644 --- a/cmd/server/config.go +++ b/cmd/server/config.go @@ -18,6 +18,7 @@ type Config struct { PersistenceType string `yaml:"persistence_type"` PersistenceDir string `yaml:"persistence_dir"` SyncEvery int `yaml:"sync_every"` + MemLen int `yaml:"mem_len"` QuicConfig *quic.Config `yaml:"quic_config"` // BoltDB特有配置 diff --git a/cmd/server/main.go b/cmd/server/main.go index 2230f16..3fbe77e 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -25,6 +25,7 @@ var ( persistenceType = flag.String("persistence", "none", "持久化类型 (none, wal)") persistenceDir = flag.String("persistence-dir", "./data", "持久化目录") syncEvery = flag.Int("sync-every", 100, "每写入多少条数据同步一次") + memLen = flag.Int("mem-len", 2048, "内存缓冲区长度") configPath = flag.String("config", "config.yaml", "配置文件路径") // 定义配置QUIC服务的命令行参数 quicAddr = flag.String("quic-addr", ":8083", "QUIC服务地址") @@ -33,16 +34,6 @@ var ( ) func main() { - if *genSampleConfig { - err := GenerateSampleConfig("./config.yaml.sample") - if err != nil { - log.Fatalf("生成示例配置文件失败: %v", err) - } - log.Println("示例配置文件已生成") - return - } - flag.Parse() - // 保存命令行原始值,用于后续判断是否被用户显式设置 originalRestAddr := *restAddr originalWsAddr := *wsAddr @@ -52,6 +43,17 @@ func main() { originalPersistenceType := *persistenceType originalPersistenceDir := *persistenceDir originalSyncEvery := *syncEvery + originalMemBufferLen := *memLen + + if *genSampleConfig { + err := GenerateSampleConfig("./config.yaml.sample") + if err != nil { + log.Fatalf("生成示例配置文件失败: %v", err) + } + log.Println("示例配置文件已生成") + return + } + flag.Parse() var config *Config if *configPath != "" { @@ -86,6 +88,9 @@ func main() { if *syncEvery == originalSyncEvery { syncEvery = &config.SyncEvery } + if *memLen == originalMemBufferLen { + memLen = &config.MemLen + } } // 创建存储引擎配置 diff --git a/pkg/api/rest_test.go b/pkg/api/rest_test.go index f0243c3..970ffe5 100644 --- a/pkg/api/rest_test.go +++ b/pkg/api/rest_test.go @@ -18,7 +18,7 @@ import ( func setupTestRESTServer() *RESTServer { // 创建存储引擎 - engine := storage.NewMemoryEngine() + engine := storage.NewMemoryEngine(200) // 创建数据管理器 dataManager := manager.NewDataManager(engine) @@ -152,7 +152,7 @@ func TestRESTServer_QueryEndpoint(t *testing.T) { server := setupTestRESTServer() // 写入测试数据 - engine := storage.NewMemoryEngine() + engine := storage.NewMemoryEngine(200) dataManager := manager.NewDataManager(engine) server.dataManager = dataManager diff --git a/pkg/api/websocket_test.go b/pkg/api/websocket_test.go index 1e1523a..4564e39 100644 --- a/pkg/api/websocket_test.go +++ b/pkg/api/websocket_test.go @@ -18,7 +18,7 @@ import ( func setupTestWebSocketServer() *WebSocketServer { // 创建存储引擎 - engine := storage.NewMemoryEngine() + engine := storage.NewMemoryEngine(200) // 创建数据管理器 dataManager := manager.NewDataManager(engine) diff --git a/pkg/manager/datamanager_test.go b/pkg/manager/datamanager_test.go index cdde7cf..c5499f8 100644 --- a/pkg/manager/datamanager_test.go +++ b/pkg/manager/datamanager_test.go @@ -11,7 +11,7 @@ import ( func TestDataManager(t *testing.T) { // 创建存储引擎 - engine := storage.NewMemoryEngine() + engine := storage.NewMemoryEngine(200) // 创建数据管理器 manager := NewDataManager(engine) diff --git a/pkg/model/datapoint.go b/pkg/model/datapoint.go index 567f9d4..b534577 100644 --- a/pkg/model/datapoint.go +++ b/pkg/model/datapoint.go @@ -75,15 +75,16 @@ type DataValue struct { // CircularBuffer 环形缓冲区 type CircularBuffer struct { - Values [30]DataValue // 固定大小为30的环形缓冲区 - Head int // 当前写入位置 - Lock sync.RWMutex // 细粒度锁 + Values []DataValue // 固定大小为30的环形缓冲区 + Head int // 当前写入位置 + Lock sync.RWMutex // 细粒度锁 } // NewCircularBuffer 创建一个新的环形缓冲区 -func NewCircularBuffer() *CircularBuffer { +func NewCircularBuffer(bufferLen int) *CircularBuffer { return &CircularBuffer{ - Head: -1, // 初始化为-1,表示缓冲区为空 + Values: make([]DataValue, bufferLen), + Head: -1, // 初始化为-1,表示缓冲区为空 } } diff --git a/pkg/storage/boltdb.go b/pkg/storage/boltdb.go index ebe3cea..dfeaf4f 100644 --- a/pkg/storage/boltdb.go +++ b/pkg/storage/boltdb.go @@ -178,7 +178,7 @@ func (e *BoltDBEngine) Read(ctx context.Context, id model.DataPointID) ([]model. } // 创建并填充缓存 - buffer := model.NewCircularBuffer() + buffer := model.NewCircularBuffer(1024) for _, value := range values { buffer.Write(value) } diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index bcce652..c2e6587 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -46,6 +46,7 @@ type StorageEngine interface { // MemoryEngine 内存存储引擎 type MemoryEngine struct { + buffLen int // every metric buffer size data map[string]*model.CircularBuffer // 数据存储 dataLock sync.RWMutex // 数据锁 persister Persister // 持久化器 @@ -102,9 +103,10 @@ func (e *MemoryEngine) ReadDuration(ctx context.Context, id model.DataPointID, f } // NewMemoryEngine 创建一个新的内存存储引擎 -func NewMemoryEngine() *MemoryEngine { +func NewMemoryEngine(memBufferLen int) *MemoryEngine { return &MemoryEngine{ - data: make(map[string]*model.CircularBuffer), + buffLen: memBufferLen, + data: make(map[string]*model.CircularBuffer), } } @@ -118,7 +120,11 @@ func (e *MemoryEngine) Write(ctx context.Context, id model.DataPointID, value mo if !exists { // 如果数据点不存在,创建一个新的环形缓冲区 - buffer = model.NewCircularBuffer() + bufferLen := e.buffLen + if bufferLen <= 0 { + bufferLen = 30 + } + buffer = model.NewCircularBuffer(bufferLen) e.dataLock.Lock() e.data[key] = buffer e.dataLock.Unlock() diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index 9e87eda..63d5c67 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -12,7 +12,7 @@ import ( func TestMemoryEngine(t *testing.T) { // 创建内存存储引擎 - engine := NewMemoryEngine() + engine := NewMemoryEngine(200) // 创建测试数据 id := model.DataPointID{ @@ -170,7 +170,7 @@ func TestPersistence(t *testing.T) { defer os.RemoveAll(tempDir) // 创建内存存储引擎 - engine := NewMemoryEngine() + engine := NewMemoryEngine(200) // 启用WAL持久化 persistenceConfig := PersistenceConfig{ @@ -221,7 +221,7 @@ func TestPersistence(t *testing.T) { } // 创建新的引擎并从WAL恢复 - newEngine := NewMemoryEngine() + newEngine := NewMemoryEngine(200) err = newEngine.EnablePersistence(persistenceConfig) if err != nil { t.Fatalf("EnablePersistence() for new engine error = %v", err) diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 114c5a4..8830e7a 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -11,7 +11,8 @@ type EngineConfig struct { PersistenceType PersistenceType // 持久化类型 PersistenceDir string // 持久化目录 SyncEvery int // 每写入多少条数据同步一次 - + // memory engine特有配置 + MemLen int // BoltDB特有配置 BoltDBFilename string // BoltDB文件名 BoltDBOptions *BoltDBConfig // BoltDB选项 @@ -25,7 +26,7 @@ func NewStorageEngine(config EngineConfig) (StorageEngine, error) { switch config.PersistenceType { case PersistenceTypeNone, PersistenceTypeWAL: // 创建内存存储引擎 - memEngine := NewMemoryEngine() + memEngine := NewMemoryEngine(config.MemLen) // 如果需要持久化,启用WAL if config.PersistenceType == PersistenceTypeWAL {