feat(server): 添加 QUIC 协议支持

- 在配置文件中增加 QUIC 相关配置项
- 实现 QUIC 服务器并集成到主程序中
- 添加 QUIC 写入请求处理逻辑
- 优化命令行参数
This commit is contained in:
kingecg 2025-06-11 21:47:10 +08:00
parent 0a193d72c2
commit 90d3ecefb8
5 changed files with 110 additions and 28 deletions

View File

@ -2,19 +2,23 @@ package main
import (
"os"
"time"
"github.com/quic-go/quic-go"
"gopkg.in/yaml.v3"
)
// Config 应用程序配置结构
type Config struct {
RestAddr string `yaml:"rest_addr"`
WsAddr string `yaml:"ws_addr"`
MetricsAddr string `yaml:"metrics_addr"`
NATSURL string `yaml:"nats_url"`
PersistenceType string `yaml:"persistence_type"`
PersistenceDir string `yaml:"persistence_dir"`
SyncEvery int `yaml:"sync_every"`
RestAddr string `yaml:"rest_addr"`
WsAddr string `yaml:"ws_addr"`
MetricsAddr string `yaml:"metrics_addr"`
QuicAddr string `yaml:"quic_addr"`
NATSURL string `yaml:"nats_url"`
PersistenceType string `yaml:"persistence_type"`
PersistenceDir string `yaml:"persistence_dir"`
SyncEvery int `yaml:"sync_every"`
QuicConfig *quic.Config `yaml:"quic_config"`
}
func LoadConfig(path string) (*Config, error) {
@ -42,6 +46,8 @@ func GenerateSampleConfig(path string) error {
RestAddr: ":8080",
SyncEvery: 1000,
WsAddr: ":8081",
QuicAddr: ":8083",
QuicConfig: DefaultQuicConfig(),
}
// 序列化yaml到path
file, err := os.Create(path)
@ -51,3 +57,14 @@ func GenerateSampleConfig(path string) error {
defer file.Close()
return yaml.NewEncoder(file).Encode(config)
}
func DefaultQuicConfig() *quic.Config {
return &quic.Config{
MaxIdleTimeout: 30 * time.Second, // 空闲超时时间
KeepAlivePeriod: 15 * time.Second, // 保活间隔
MaxIncomingStreams: 1000, // 最大传入流数
MaxIncomingUniStreams: 1000, // 最大单向传入流数
HandshakeIdleTimeout: 10 * time.Second, // 握手超时时间
EnableDatagrams: true, // 启用数据报支持
DisablePathMTUDiscovery: false, // 启用路径 MTU 发现
}
}

View File

@ -26,6 +26,9 @@ var (
persistenceDir = flag.String("persistence-dir", "./data", "持久化目录")
syncEvery = flag.Int("sync-every", 100, "每写入多少条数据同步一次")
configPath = flag.String("config", "config.yaml", "配置文件路径")
// 定义配置QUIC服务的命令行参数
quicAddr = flag.String("quic-addr", ":8083", "QUIC服务地址")
genSampleConfig = flag.Bool("gen-sample-config", false, "生成示例配置文件")
)
@ -39,19 +42,50 @@ func main() {
return
}
flag.Parse()
if *configPath != "" {
config, err := LoadConfig(*configPath)
// 保存命令行原始值,用于后续判断是否被用户显式设置
originalRestAddr := *restAddr
originalWsAddr := *wsAddr
originalMetricsAddr := *metricsAddr
originalQuicAddr := *quicAddr
originalNatsURL := *natsURL
originalPersistenceType := *persistenceType
originalPersistenceDir := *persistenceDir
originalSyncEvery := *syncEvery
var config *Config
if *configPath != "" {
var err error
config, err = LoadConfig(*configPath)
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
restAddr = &config.RestAddr
wsAddr = &config.WsAddr
metricsAddr = &config.MetricsAddr
natsURL = &config.NATSURL
persistenceType = &config.PersistenceType
persistenceDir = &config.PersistenceDir
syncEvery = &config.SyncEvery
// 只有当命令行参数为默认值时,才使用配置文件中的值
if *restAddr == originalRestAddr {
restAddr = &config.RestAddr
}
if *wsAddr == originalWsAddr {
wsAddr = &config.WsAddr
}
if *metricsAddr == originalMetricsAddr {
metricsAddr = &config.MetricsAddr
}
if *quicAddr == originalQuicAddr {
quicAddr = &config.QuicAddr
}
if *natsURL == originalNatsURL {
natsURL = &config.NATSURL
}
if *persistenceType == originalPersistenceType {
persistenceType = &config.PersistenceType
}
if *persistenceDir == originalPersistenceDir {
persistenceDir = &config.PersistenceDir
}
if *syncEvery == originalSyncEvery {
syncEvery = &config.SyncEvery
}
}
// 创建存储引擎
@ -84,6 +118,19 @@ func main() {
// 创建WebSocket服务
wsServer := api.NewWebSocketServer(dataManager)
// 创建QUIC服务器
var quicServer *api.QUICServer
var quicConfig = DefaultQuicConfig() // 使用默认配置
if config != nil && config.QuicConfig != nil {
quicConfig = config.QuicConfig // 如果配置文件中有配置,则使用配置文件中的配置
}
quicServer, err := api.NewQUICServer(dataManager, quicConfig)
if err != nil {
log.Printf("Failed to create QUIC server: %v", err)
log.Println("Continuing without QUIC server")
quicServer = nil
}
// 创建NATS消息系统
natsConfig := messaging.NATSConfig{
URL: *natsURL,
@ -141,6 +188,16 @@ func main() {
}
}()
// 启动QUIC服务
if quicServer != nil {
go func() {
log.Printf("Starting QUIC server on %s", *quicAddr)
if err := quicServer.Start(*quicAddr); err != nil {
log.Printf("Failed to start QUIC server: %v", err)
}
}()
}
// 等待信号
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
@ -170,6 +227,13 @@ func main() {
}
}
// 关闭QUIC服务
if quicServer != nil {
if err := quicServer.Stop(ctx); err != nil {
log.Printf("Failed to stop QUIC server: %v", err)
}
}
// 关闭数据管理器
if err := dataManager.Close(); err != nil {
log.Printf("Failed to close data manager: %v", err)

View File

@ -20,14 +20,14 @@ import (
// QUICServer represents the QUIC protocol server
type QUICServer struct {
listener quic.Listener
listener *quic.Listener
manager *manager.DataManager
tlsConfig *tls.Config
quicConfig *quic.Config
}
// WriteRequest represents a write request from client
type WriteRequest struct {
// QUICWriteRequest represents a write request from client
type QUICWriteRequest struct {
Points []DataPoint `json:"points"`
BatchID string `json:"batch_id"`
}
@ -50,7 +50,7 @@ type WriteResponse struct {
}
// NewQUICServer creates a new QUIC server instance
func NewQUICServer(dm *manager.DataManager) (*QUICServer, error) {
func NewQUICServer(dm *manager.DataManager, config *quic.Config) (*QUICServer, error) {
// Generate self-signed certificate for development
// In production, you should use proper certificates
tlsConfig, err := generateTLSConfig()
@ -117,7 +117,7 @@ func (s *QUICServer) handleConnection(conn quic.Connection) {
func (s *QUICServer) handleStream(stream quic.Stream) {
defer stream.Close()
var req WriteRequest
var req QUICWriteRequest
decoder := json.NewDecoder(stream)
if err := decoder.Decode(&req); err != nil {
s.sendError(stream, "Failed to decode request", err)
@ -137,9 +137,10 @@ func (s *QUICServer) handleStream(stream quic.Stream) {
point.Labels,
point.Value,
)
timestamp := time.Unix(point.Timestamp, 0)
value := model.DataValue{
Timestamp: point.Timestamp,
Timestamp: timestamp,
Value: point.Value,
}

View File

@ -18,8 +18,8 @@ type RESTServer struct {
server *http.Server
}
// WriteRequest 写入请求
type WriteRequest struct {
// RESTWriteRequest 写入请求
type RESTWriteRequest struct {
DeviceID string `json:"device_id"`
MetricCode string `json:"metric_code"`
Labels map[string]string `json:"labels"`
@ -31,7 +31,7 @@ type Response map[string]any
// BatchWriteRequest 批量写入请求
type BatchWriteRequest struct {
Points []WriteRequest `json:"points"`
Points []RESTWriteRequest `json:"points"`
}
// QueryRequest 查询请求
@ -82,7 +82,7 @@ func (s *RESTServer) setupRoutes() {
// handleWrite 处理写入请求
func (s *RESTServer) handleWrite(c *gin.Context) {
var req WriteRequest
var req RESTWriteRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": "Invalid request: " + err.Error(),

View File

@ -37,7 +37,7 @@ func TestRESTServer_WriteEndpoint(t *testing.T) {
server := setupTestRESTServer()
// 创建测试请求
writeReq := WriteRequest{
writeReq := RESTWriteRequest{
DeviceID: "test-device",
MetricCode: "temperature",
Labels: map[string]string{
@ -87,7 +87,7 @@ func TestRESTServer_BatchWriteEndpoint(t *testing.T) {
// 创建测试请求
batchReq := BatchWriteRequest{
Points: []WriteRequest{
Points: []RESTWriteRequest{
{
DeviceID: "test-device",
MetricCode: "temperature",