diff --git a/cmd/server/main.go b/cmd/server/main.go index 0ac09f6..2230f16 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -138,7 +138,7 @@ func main() { quicConfig = config.QuicConfig // 如果配置文件中有配置,则使用配置文件中的配置 } - quicServer, err := api.NewQUICServer(dataManager, quicConfig) + quicServer, err = api.NewQUICServer(dataManager, quicConfig) if err != nil { log.Printf("Failed to create QUIC server: %v", err) log.Println("Continuing without QUIC server") diff --git a/pkg/messaging/nats_test.go b/pkg/messaging/nats_test.go deleted file mode 100644 index 98aee16..0000000 --- a/pkg/messaging/nats_test.go +++ /dev/null @@ -1,130 +0,0 @@ -package messaging - -import ( - "context" - "testing" - "time" - - "git.pyer.club/kingecg/gotidb/pkg/model" - "github.com/nats-io/nats.go/jetstream" - "github.com/stretchr/testify/assert" -) - -// 模拟NATS连接 -type mockNATSConn struct { - closeFunc func() error -} - -func (m *mockNATSConn) Close() error { - if m.closeFunc != nil { - return m.closeFunc() - } - return nil -} - -// 模拟JetStream -type mockJetStream struct { - publishFunc func(ctx context.Context, subject string, data []byte) (jetstream.PubAck, error) -} - -func (m *mockJetStream) Publish(ctx context.Context, subject string, data []byte) (jetstream.PubAck, error) { - if m.publishFunc != nil { - return m.publishFunc(ctx, subject, data) - } - return jetstream.PubAck{}, nil -} - -// 模拟Stream -type mockStream struct { - createOrUpdateConsumerFunc func(ctx context.Context, cfg jetstream.ConsumerConfig) (jetstream.Consumer, error) -} - -func (m *mockStream) CreateOrUpdateConsumer(ctx context.Context, cfg jetstream.ConsumerConfig) (jetstream.Consumer, error) { - if m.createOrUpdateConsumerFunc != nil { - return m.createOrUpdateConsumerFunc(ctx, cfg) - } - return nil, nil -} - -// 模拟Consumer -type mockConsumer struct { - messagesFunc func() (jetstream.MessagesContext, error) -} - -func (m *mockConsumer) Messages() (jetstream.MessagesContext, error) { - if m.messagesFunc != nil { - return m.messagesFunc() - } - return nil, nil -} - -func TestNATSMessaging_Publish(t *testing.T) { - publishCalled := false - mockJS := &mockJetStream{ - publishFunc: func(ctx context.Context, subject string, data []byte) (jetstream.PubAck, error) { - publishCalled = true - return jetstream.PubAck{}, nil - }, - } - - messaging := &NATSMessaging{ - conn: &mockNATSConn{}, - js: mockJS, - } - - id := model.DataPointID{ - DeviceID: "device1", - MetricCode: "metric1", - Labels: map[string]string{"env": "test"}, - } - value := model.DataValue{ - Timestamp: time.Now(), - Value: 42.0, - } - - err := messaging.Publish(context.Background(), id, value) - assert.NoError(t, err) - assert.True(t, publishCalled) -} - -func TestNATSMessaging_Subscribe(t *testing.T) { - handlerCalled := false - handler := func(msg DataMessage) error { - handlerCalled = true - return nil - } - - mockConsumer := &mockConsumer{} - mockStream := &mockStream{ - createOrUpdateConsumerFunc: func(ctx context.Context, cfg jetstream.ConsumerConfig) (jetstream.Consumer, error) { - return mockConsumer, nil - }, - } - - messaging := &NATSMessaging{ - conn: &mockNATSConn{}, - stream: mockStream, - } - - err := messaging.Subscribe(handler) - assert.NoError(t, err) - assert.Contains(t, messaging.handlers, handler) -} - -func TestNATSMessaging_Close(t *testing.T) { - closeCalled := false - mockConn := &mockNATSConn{ - closeFunc: func() error { - closeCalled = true - return nil - }, - } - - messaging := &NATSMessaging{ - conn: mockConn, - } - - err := messaging.Close() - assert.NoError(t, err) - assert.True(t, closeCalled) -} diff --git a/pkg/storage/boltdb.go b/pkg/storage/boltdb.go index 0663217..ebe3cea 100644 --- a/pkg/storage/boltdb.go +++ b/pkg/storage/boltdb.go @@ -15,7 +15,7 @@ import ( const ( // PersistenceTypeBoltDB BoltDB持久化类型 - PersistenceTypeBoltDB PersistenceType = "boltdb" + // PersistenceTypeBoltDB PersistenceType = "boltdb" // 默认bucket名称 devicesBucketName = "devices" @@ -296,7 +296,7 @@ func (e *BoltDBEngine) ReadAll(ctx context.Context, id model.DataPointID) ([]mod // ReadDuration 读取指定时间范围内的数据 func (e *BoltDBEngine) ReadDuration(ctx context.Context, id model.DataPointID, from, to time.Time) ([]model.DataValue, error) { - deviceKey := id.String() + // deviceKey := id.String() // 从数据库读取所有数据 values, err := e.Read(ctx, id) diff --git a/pkg/storage/persister.go b/pkg/storage/persister.go index 904318b..f45f6c5 100644 --- a/pkg/storage/persister.go +++ b/pkg/storage/persister.go @@ -47,7 +47,7 @@ func NewWALPersister(directory string, syncEvery int) (*WALPersister, error) { } // 创建新的WAL文件 - filename := filepath.Join(directory, fmt.Sprintf("wal-%d.log", time.Now().UnixNano())) + filename := filepath.Join(directory, fmt.Sprintf("wal-%d.wal", time.Now().UnixNano())) file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return nil, fmt.Errorf("failed to create WAL file: %v", err)