From 90d3ecefb831c7a63fa9d7d85bc26cd6dae397ea Mon Sep 17 00:00:00 2001 From: kingecg Date: Wed, 11 Jun 2025 21:47:10 +0800 Subject: [PATCH] =?UTF-8?q?feat(server):=20=E6=B7=BB=E5=8A=A0=20QUIC=20?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在配置文件中增加 QUIC 相关配置项 - 实现 QUIC 服务器并集成到主程序中 - 添加 QUIC 写入请求处理逻辑 - 优化命令行参数 --- cmd/server/config.go | 31 ++++++++++++---- cmd/server/main.go | 82 +++++++++++++++++++++++++++++++++++++----- pkg/api/quic_server.go | 13 +++---- pkg/api/rest.go | 8 ++--- pkg/api/rest_test.go | 4 +-- 5 files changed, 110 insertions(+), 28 deletions(-) diff --git a/cmd/server/config.go b/cmd/server/config.go index 7a8447e..e6b535e 100644 --- a/cmd/server/config.go +++ b/cmd/server/config.go @@ -2,19 +2,23 @@ package main import ( "os" + "time" + "github.com/quic-go/quic-go" "gopkg.in/yaml.v3" ) // Config 应用程序配置结构 type Config struct { - RestAddr string `yaml:"rest_addr"` - WsAddr string `yaml:"ws_addr"` - MetricsAddr string `yaml:"metrics_addr"` - NATSURL string `yaml:"nats_url"` - PersistenceType string `yaml:"persistence_type"` - PersistenceDir string `yaml:"persistence_dir"` - SyncEvery int `yaml:"sync_every"` + RestAddr string `yaml:"rest_addr"` + WsAddr string `yaml:"ws_addr"` + MetricsAddr string `yaml:"metrics_addr"` + QuicAddr string `yaml:"quic_addr"` + NATSURL string `yaml:"nats_url"` + PersistenceType string `yaml:"persistence_type"` + PersistenceDir string `yaml:"persistence_dir"` + SyncEvery int `yaml:"sync_every"` + QuicConfig *quic.Config `yaml:"quic_config"` } func LoadConfig(path string) (*Config, error) { @@ -42,6 +46,8 @@ func GenerateSampleConfig(path string) error { RestAddr: ":8080", SyncEvery: 1000, WsAddr: ":8081", + QuicAddr: ":8083", + QuicConfig: DefaultQuicConfig(), } // 序列化yaml到path file, err := os.Create(path) @@ -51,3 +57,14 @@ func GenerateSampleConfig(path string) error { defer file.Close() return yaml.NewEncoder(file).Encode(config) } +func DefaultQuicConfig() *quic.Config { + return &quic.Config{ + MaxIdleTimeout: 30 * time.Second, // 空闲超时时间 + KeepAlivePeriod: 15 * time.Second, // 保活间隔 + MaxIncomingStreams: 1000, // 最大传入流数 + MaxIncomingUniStreams: 1000, // 最大单向传入流数 + HandshakeIdleTimeout: 10 * time.Second, // 握手超时时间 + EnableDatagrams: true, // 启用数据报支持 + DisablePathMTUDiscovery: false, // 启用路径 MTU 发现 + } +} diff --git a/cmd/server/main.go b/cmd/server/main.go index 00d6835..b50e9e5 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -26,6 +26,9 @@ var ( persistenceDir = flag.String("persistence-dir", "./data", "持久化目录") syncEvery = flag.Int("sync-every", 100, "每写入多少条数据同步一次") configPath = flag.String("config", "config.yaml", "配置文件路径") + // 定义配置QUIC服务的命令行参数 + quicAddr = flag.String("quic-addr", ":8083", "QUIC服务地址") + genSampleConfig = flag.Bool("gen-sample-config", false, "生成示例配置文件") ) @@ -39,19 +42,50 @@ func main() { return } flag.Parse() - if *configPath != "" { - config, err := LoadConfig(*configPath) + // 保存命令行原始值,用于后续判断是否被用户显式设置 + originalRestAddr := *restAddr + originalWsAddr := *wsAddr + originalMetricsAddr := *metricsAddr + originalQuicAddr := *quicAddr + originalNatsURL := *natsURL + originalPersistenceType := *persistenceType + originalPersistenceDir := *persistenceDir + originalSyncEvery := *syncEvery + + var config *Config + if *configPath != "" { + var err error + config, err = LoadConfig(*configPath) if err != nil { log.Fatalf("Failed to load config: %v", err) } - restAddr = &config.RestAddr - wsAddr = &config.WsAddr - metricsAddr = &config.MetricsAddr - natsURL = &config.NATSURL - persistenceType = &config.PersistenceType - persistenceDir = &config.PersistenceDir - syncEvery = &config.SyncEvery + + // 只有当命令行参数为默认值时,才使用配置文件中的值 + if *restAddr == originalRestAddr { + restAddr = &config.RestAddr + } + if *wsAddr == originalWsAddr { + wsAddr = &config.WsAddr + } + if *metricsAddr == originalMetricsAddr { + metricsAddr = &config.MetricsAddr + } + if *quicAddr == originalQuicAddr { + quicAddr = &config.QuicAddr + } + if *natsURL == originalNatsURL { + natsURL = &config.NATSURL + } + if *persistenceType == originalPersistenceType { + persistenceType = &config.PersistenceType + } + if *persistenceDir == originalPersistenceDir { + persistenceDir = &config.PersistenceDir + } + if *syncEvery == originalSyncEvery { + syncEvery = &config.SyncEvery + } } // 创建存储引擎 @@ -84,6 +118,19 @@ func main() { // 创建WebSocket服务 wsServer := api.NewWebSocketServer(dataManager) + // 创建QUIC服务器 + var quicServer *api.QUICServer + var quicConfig = DefaultQuicConfig() // 使用默认配置 + if config != nil && config.QuicConfig != nil { + quicConfig = config.QuicConfig // 如果配置文件中有配置,则使用配置文件中的配置 + } + + quicServer, err := api.NewQUICServer(dataManager, quicConfig) + if err != nil { + log.Printf("Failed to create QUIC server: %v", err) + log.Println("Continuing without QUIC server") + quicServer = nil + } // 创建NATS消息系统 natsConfig := messaging.NATSConfig{ URL: *natsURL, @@ -141,6 +188,16 @@ func main() { } }() + // 启动QUIC服务 + if quicServer != nil { + go func() { + log.Printf("Starting QUIC server on %s", *quicAddr) + if err := quicServer.Start(*quicAddr); err != nil { + log.Printf("Failed to start QUIC server: %v", err) + } + }() + } + // 等待信号 sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) @@ -170,6 +227,13 @@ func main() { } } + // 关闭QUIC服务 + if quicServer != nil { + if err := quicServer.Stop(ctx); err != nil { + log.Printf("Failed to stop QUIC server: %v", err) + } + } + // 关闭数据管理器 if err := dataManager.Close(); err != nil { log.Printf("Failed to close data manager: %v", err) diff --git a/pkg/api/quic_server.go b/pkg/api/quic_server.go index 3342a47..38c76b3 100644 --- a/pkg/api/quic_server.go +++ b/pkg/api/quic_server.go @@ -20,14 +20,14 @@ import ( // QUICServer represents the QUIC protocol server type QUICServer struct { - listener quic.Listener + listener *quic.Listener manager *manager.DataManager tlsConfig *tls.Config quicConfig *quic.Config } -// WriteRequest represents a write request from client -type WriteRequest struct { +// QUICWriteRequest represents a write request from client +type QUICWriteRequest struct { Points []DataPoint `json:"points"` BatchID string `json:"batch_id"` } @@ -50,7 +50,7 @@ type WriteResponse struct { } // NewQUICServer creates a new QUIC server instance -func NewQUICServer(dm *manager.DataManager) (*QUICServer, error) { +func NewQUICServer(dm *manager.DataManager, config *quic.Config) (*QUICServer, error) { // Generate self-signed certificate for development // In production, you should use proper certificates tlsConfig, err := generateTLSConfig() @@ -117,7 +117,7 @@ func (s *QUICServer) handleConnection(conn quic.Connection) { func (s *QUICServer) handleStream(stream quic.Stream) { defer stream.Close() - var req WriteRequest + var req QUICWriteRequest decoder := json.NewDecoder(stream) if err := decoder.Decode(&req); err != nil { s.sendError(stream, "Failed to decode request", err) @@ -137,9 +137,10 @@ func (s *QUICServer) handleStream(stream quic.Stream) { point.Labels, point.Value, ) + timestamp := time.Unix(point.Timestamp, 0) value := model.DataValue{ - Timestamp: point.Timestamp, + Timestamp: timestamp, Value: point.Value, } diff --git a/pkg/api/rest.go b/pkg/api/rest.go index fed451f..06f58e4 100644 --- a/pkg/api/rest.go +++ b/pkg/api/rest.go @@ -18,8 +18,8 @@ type RESTServer struct { server *http.Server } -// WriteRequest 写入请求 -type WriteRequest struct { +// RESTWriteRequest 写入请求 +type RESTWriteRequest struct { DeviceID string `json:"device_id"` MetricCode string `json:"metric_code"` Labels map[string]string `json:"labels"` @@ -31,7 +31,7 @@ type Response map[string]any // BatchWriteRequest 批量写入请求 type BatchWriteRequest struct { - Points []WriteRequest `json:"points"` + Points []RESTWriteRequest `json:"points"` } // QueryRequest 查询请求 @@ -82,7 +82,7 @@ func (s *RESTServer) setupRoutes() { // handleWrite 处理写入请求 func (s *RESTServer) handleWrite(c *gin.Context) { - var req WriteRequest + var req RESTWriteRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "error": "Invalid request: " + err.Error(), diff --git a/pkg/api/rest_test.go b/pkg/api/rest_test.go index 6552258..1d1979e 100644 --- a/pkg/api/rest_test.go +++ b/pkg/api/rest_test.go @@ -37,7 +37,7 @@ func TestRESTServer_WriteEndpoint(t *testing.T) { server := setupTestRESTServer() // 创建测试请求 - writeReq := WriteRequest{ + writeReq := RESTWriteRequest{ DeviceID: "test-device", MetricCode: "temperature", Labels: map[string]string{ @@ -87,7 +87,7 @@ func TestRESTServer_BatchWriteEndpoint(t *testing.T) { // 创建测试请求 batchReq := BatchWriteRequest{ - Points: []WriteRequest{ + Points: []RESTWriteRequest{ { DeviceID: "test-device", MetricCode: "temperature",