From 6cdefec5764bc293e958871253d6b3faf55b3311 Mon Sep 17 00:00:00 2001 From: kingecg Date: Thu, 12 Jun 2025 23:18:45 +0800 Subject: [PATCH] =?UTF-8?q?refactor(server):=20=E4=BC=98=E5=8C=96=20QUIC?= =?UTF-8?q?=20=E6=9C=8D=E5=8A=A1=E5=99=A8=E5=88=9B=E5=BB=BA=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 修改了 quicServer 变量的声明和赋值方式,提高代码可读性 - 删除了未使用的 nats_test.go 文件,减少冗余代码 - 注释了未使用的常量和变量,为后续清理做准备 --- cmd/server/main.go | 2 +- pkg/messaging/nats_test.go | 130 ------------------------------------- pkg/storage/boltdb.go | 4 +- pkg/storage/persister.go | 2 +- 4 files changed, 4 insertions(+), 134 deletions(-) delete mode 100644 pkg/messaging/nats_test.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 0ac09f6..2230f16 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -138,7 +138,7 @@ func main() { quicConfig = config.QuicConfig // 如果配置文件中有配置,则使用配置文件中的配置 } - quicServer, err := api.NewQUICServer(dataManager, quicConfig) + quicServer, err = api.NewQUICServer(dataManager, quicConfig) if err != nil { log.Printf("Failed to create QUIC server: %v", err) log.Println("Continuing without QUIC server") diff --git a/pkg/messaging/nats_test.go b/pkg/messaging/nats_test.go deleted file mode 100644 index 98aee16..0000000 --- a/pkg/messaging/nats_test.go +++ /dev/null @@ -1,130 +0,0 @@ -package messaging - -import ( - "context" - "testing" - "time" - - "git.pyer.club/kingecg/gotidb/pkg/model" - "github.com/nats-io/nats.go/jetstream" - "github.com/stretchr/testify/assert" -) - -// 模拟NATS连接 -type mockNATSConn struct { - closeFunc func() error -} - -func (m *mockNATSConn) Close() error { - if m.closeFunc != nil { - return m.closeFunc() - } - return nil -} - -// 模拟JetStream -type mockJetStream struct { - publishFunc func(ctx context.Context, subject string, data []byte) (jetstream.PubAck, error) -} - -func (m *mockJetStream) Publish(ctx context.Context, subject string, data []byte) (jetstream.PubAck, error) { - if m.publishFunc != nil { - return m.publishFunc(ctx, subject, data) - } - return jetstream.PubAck{}, nil -} - -// 模拟Stream -type mockStream struct { - createOrUpdateConsumerFunc func(ctx context.Context, cfg jetstream.ConsumerConfig) (jetstream.Consumer, error) -} - -func (m *mockStream) CreateOrUpdateConsumer(ctx context.Context, cfg jetstream.ConsumerConfig) (jetstream.Consumer, error) { - if m.createOrUpdateConsumerFunc != nil { - return m.createOrUpdateConsumerFunc(ctx, cfg) - } - return nil, nil -} - -// 模拟Consumer -type mockConsumer struct { - messagesFunc func() (jetstream.MessagesContext, error) -} - -func (m *mockConsumer) Messages() (jetstream.MessagesContext, error) { - if m.messagesFunc != nil { - return m.messagesFunc() - } - return nil, nil -} - -func TestNATSMessaging_Publish(t *testing.T) { - publishCalled := false - mockJS := &mockJetStream{ - publishFunc: func(ctx context.Context, subject string, data []byte) (jetstream.PubAck, error) { - publishCalled = true - return jetstream.PubAck{}, nil - }, - } - - messaging := &NATSMessaging{ - conn: &mockNATSConn{}, - js: mockJS, - } - - id := model.DataPointID{ - DeviceID: "device1", - MetricCode: "metric1", - Labels: map[string]string{"env": "test"}, - } - value := model.DataValue{ - Timestamp: time.Now(), - Value: 42.0, - } - - err := messaging.Publish(context.Background(), id, value) - assert.NoError(t, err) - assert.True(t, publishCalled) -} - -func TestNATSMessaging_Subscribe(t *testing.T) { - handlerCalled := false - handler := func(msg DataMessage) error { - handlerCalled = true - return nil - } - - mockConsumer := &mockConsumer{} - mockStream := &mockStream{ - createOrUpdateConsumerFunc: func(ctx context.Context, cfg jetstream.ConsumerConfig) (jetstream.Consumer, error) { - return mockConsumer, nil - }, - } - - messaging := &NATSMessaging{ - conn: &mockNATSConn{}, - stream: mockStream, - } - - err := messaging.Subscribe(handler) - assert.NoError(t, err) - assert.Contains(t, messaging.handlers, handler) -} - -func TestNATSMessaging_Close(t *testing.T) { - closeCalled := false - mockConn := &mockNATSConn{ - closeFunc: func() error { - closeCalled = true - return nil - }, - } - - messaging := &NATSMessaging{ - conn: mockConn, - } - - err := messaging.Close() - assert.NoError(t, err) - assert.True(t, closeCalled) -} diff --git a/pkg/storage/boltdb.go b/pkg/storage/boltdb.go index 0663217..ebe3cea 100644 --- a/pkg/storage/boltdb.go +++ b/pkg/storage/boltdb.go @@ -15,7 +15,7 @@ import ( const ( // PersistenceTypeBoltDB BoltDB持久化类型 - PersistenceTypeBoltDB PersistenceType = "boltdb" + // PersistenceTypeBoltDB PersistenceType = "boltdb" // 默认bucket名称 devicesBucketName = "devices" @@ -296,7 +296,7 @@ func (e *BoltDBEngine) ReadAll(ctx context.Context, id model.DataPointID) ([]mod // ReadDuration 读取指定时间范围内的数据 func (e *BoltDBEngine) ReadDuration(ctx context.Context, id model.DataPointID, from, to time.Time) ([]model.DataValue, error) { - deviceKey := id.String() + // deviceKey := id.String() // 从数据库读取所有数据 values, err := e.Read(ctx, id) diff --git a/pkg/storage/persister.go b/pkg/storage/persister.go index 904318b..f45f6c5 100644 --- a/pkg/storage/persister.go +++ b/pkg/storage/persister.go @@ -47,7 +47,7 @@ func NewWALPersister(directory string, syncEvery int) (*WALPersister, error) { } // 创建新的WAL文件 - filename := filepath.Join(directory, fmt.Sprintf("wal-%d.log", time.Now().UnixNano())) + filename := filepath.Join(directory, fmt.Sprintf("wal-%d.wal", time.Now().UnixNano())) file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return nil, fmt.Errorf("failed to create WAL file: %v", err)