diff --git a/docs/writer.md b/docs/writer.md new file mode 100644 index 0000000..b457fb7 --- /dev/null +++ b/docs/writer.md @@ -0,0 +1,190 @@ +# 时序数据库写入接口设计 + +## QUIC协议选型 + +### 为什么选择QUIC + +1. 低延迟特性 + - 0-RTT连接建立,减少握手开销 + - 避免队头阻塞,提高并发写入性能 + - 内置多路复用支持,优化批量写入 + +2. 可靠性保证 + - 内置加密和认证 + - 基于UDP的可靠传输 + - 拥塞控制机制 + +3. 与需求匹配度 + - 支持高并发(>500K ops/sec)的写入场景 + - 批量提交数据时的多路复用优势 + - 低延迟响应要求 + +## 接口设计 + +### 连接建立 + +```protobuf +message WriteHandshake { + string client_id = 1; // 客户端标识 + string auth_token = 2; // 认证令牌 + uint32 protocol_version = 3; // 协议版本号 +} +``` + +### 数据写入 + +```protobuf +message WriteRequest { + message DataPoint { + string metric_id = 1; // 指标ID + double value = 2; // 数值 + int64 timestamp = 3; // 时间戳(微秒) + } + + repeated DataPoint points = 1; // 批量数据点 + string batch_id = 2; // 批次ID(用于去重) +} + +message WriteResponse { + bool success = 1; // 写入是否成功 + string error = 2; // 错误信息(如果有) + string batch_id = 3; // 对应的批次ID + uint32 points_written = 4; // 成功写入的点数 +} +``` + +## 实现细节 + +### 1. 连接管理 + +- 使用QUIC的0-RTT特性快速建立连接 +- 保持长连接以减少重连开销 +- 支持连接复用和会话恢复 + +### 2. 数据流控制 + +- 每个批次写入使用独立的QUIC流 +- 利用QUIC的流量控制机制 +- 支持写入优先级设置 + +### 3. 批量写入优化 + +- 客户端聚合策略: + * 时间窗口(默认100ms) + * 数据点数量阈值(默认1000点) + * 数据大小阈值(默认1MB) +- 服务端并行处理: + * 多流并行写入 + * 批次内并行处理 + +### 4. 错误处理 + +- 网络错误自动重试 +- 批次级别的原子性保证 +- 错误反馈机制 + +## 性能考虑 + +### 1. 连接优化 + +- 连接池管理 +- 会话票据复用 +- 智能负载均衡 + +### 2. 内存管理 + +- 零拷贝数据传输 +- 内存池复用 +- 批量数据预分配 + +### 3. 并发控制 + +- 每个数据点独立锁 +- 无锁数据结构 +- 批量写入优化 + +## 监控指标 + +1. 性能指标 + - 写入延迟(P99/P95/P50) + - 写入吞吐量 + - 连接数量 + +2. 错误指标 + - 写入失败率 + - 重试次数 + - 错误类型分布 + +3. 资源使用 + - 内存使用量 + - 网络带宽使用 + - CPU使用率 + +## 示例代码 + +```go +// 客户端示例 +type Writer struct { + conn quic.Connection + streams map[string]quic.Stream + mu sync.RWMutex +} + +func (w *Writer) WriteBatch(points []DataPoint) error { + stream, err := w.conn.OpenStreamSync(context.Background()) + if err != nil { + return err + } + defer stream.Close() + + req := &WriteRequest{ + Points: points, + BatchId: uuid.New().String(), + } + + // 序列化并发送 + data, err := proto.Marshal(req) + if err != nil { + return err + } + + _, err = stream.Write(data) + return err +} + +// 服务端示例 +func (s *Server) handleStream(stream quic.Stream) { + defer stream.Close() + + buf := make([]byte, maxMessageSize) + n, err := stream.Read(buf) + if err != nil { + return + } + + req := &WriteRequest{} + if err := proto.Unmarshal(buf[:n], req); err != nil { + return + } + + // 处理写入请求 + s.processWriteRequest(req, stream) +} +``` + +## 后续优化方向 + +1. 压缩优化 + - 时间戳压缩 + - 数值压缩 + - 批量压缩 + +2. 安全增强 + - 细粒度访问控制 + - 加密传输优化 + - 审计日志 + +3. 客户端优化 + - 智能批处理 + - 自适应重试 + - 连接池优化 \ No newline at end of file diff --git a/go.mod b/go.mod index f71648f..2f60349 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,14 @@ module git.pyer.club/kingecg/gotidb -go 1.20 +go 1.23 + +toolchain go1.23.1 require ( github.com/gin-gonic/gin v1.9.1 github.com/gorilla/websocket v1.5.1 github.com/nats-io/nats.go v1.31.0 - github.com/prometheus/client_golang v1.17.0 + github.com/prometheus/client_golang v1.19.1 ) require ( @@ -20,8 +22,10 @@ require ( github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.16.0 // indirect + github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.4 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect @@ -33,17 +37,23 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nats-io/nkeys v0.4.6 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/onsi/ginkgo/v2 v2.9.5 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect - github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect - github.com/prometheus/common v0.44.0 // indirect - github.com/prometheus/procfs v0.11.1 // indirect + github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/common v0.48.0 // indirect + github.com/prometheus/procfs v0.12.0 // indirect + github.com/quic-go/quic-go v0.52.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.11 // indirect + go.uber.org/mock v0.5.0 // indirect golang.org/x/arch v0.6.0 // indirect - golang.org/x/crypto v0.16.0 // indirect - golang.org/x/net v0.19.0 // indirect - golang.org/x/sys v0.15.0 // indirect - golang.org/x/text v0.14.0 // indirect - google.golang.org/protobuf v1.31.0 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/mod v0.18.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.23.0 // indirect + golang.org/x/text v0.17.0 // indirect + golang.org/x/tools v0.22.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8a69fc4..3d1c8c7 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,9 @@ github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d/go.mod h1:8EPpV github.com/chenzhuoyu/iasm v0.9.0/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog= github.com/chenzhuoyu/iasm v0.9.1 h1:tUHQJXo3NhBqw6s33wkGn9SP3bvrWLdlVIJ3hQBL7P0= github.com/chenzhuoyu/iasm v0.9.1/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -30,6 +33,8 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.16.0 h1:x+plE831WK4vaKHO/jpgUGsvLKIqRRkz6M78GuJAfGE= github.com/go-playground/validator/v10 v10.16.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -39,8 +44,11 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= +github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= @@ -69,23 +77,36 @@ github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/onsi/ginkgo/v2 v2.9.5 h1:+6Hr4uxzP4XIUyAkg61dWBw8lb/gc4/X5luuxN/EC+Q= +github.com/onsi/ginkgo/v2 v2.9.5/go.mod h1:tvAoo1QUJwNEU2ITftXTpR7R1RbCzoZUOs3RonqW57k= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM= github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/quic-go/quic-go v0.52.0 h1:/SlHrCRElyaU6MaEPKqKr9z83sBg2v4FLLvWM+Z47pA= +github.com/quic-go/quic-go v0.52.0/go.mod h1:MFlGGpcpJqRAfmYi6NC2cptDPSxRWTOGNuP4wqrWmzQ= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -97,25 +118,44 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= +go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.6.0 h1:S0JTfE48HbRj80+4tbvZDYsJ3tGv6BUU3XxyZ7CirAc= golang.org/x/arch v0.6.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0= +golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA= +golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/api/quic_server.go b/pkg/api/quic_server.go new file mode 100644 index 0000000..3342a47 --- /dev/null +++ b/pkg/api/quic_server.go @@ -0,0 +1,209 @@ +package api + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "encoding/json" + "encoding/pem" + "fmt" + "log" + "math/big" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/manager" + "git.pyer.club/kingecg/gotidb/pkg/model" + "github.com/quic-go/quic-go" +) + +// QUICServer represents the QUIC protocol server +type QUICServer struct { + listener quic.Listener + manager *manager.DataManager + tlsConfig *tls.Config + quicConfig *quic.Config +} + +// WriteRequest represents a write request from client +type WriteRequest struct { + Points []DataPoint `json:"points"` + BatchID string `json:"batch_id"` +} + +// DataPoint represents a single data point +type DataPoint struct { + DeviceID string `json:"device_id"` + MetricCode string `json:"metric_code"` + Labels map[string]string `json:"labels"` + Value float64 `json:"value"` + Timestamp int64 `json:"timestamp"` +} + +// WriteResponse represents the response to a write request +type WriteResponse struct { + Success bool `json:"success"` + Error string `json:"error,omitempty"` + BatchID string `json:"batch_id"` + PointsCount int `json:"points_count"` +} + +// NewQUICServer creates a new QUIC server instance +func NewQUICServer(dm *manager.DataManager) (*QUICServer, error) { + // Generate self-signed certificate for development + // In production, you should use proper certificates + tlsConfig, err := generateTLSConfig() + if err != nil { + return nil, fmt.Errorf("failed to generate TLS config: %v", err) + } + + quicConfig := &quic.Config{ + MaxIdleTimeout: 30 * time.Second, + KeepAlivePeriod: 10 * time.Second, + } + + return &QUICServer{ + manager: dm, + tlsConfig: tlsConfig, + quicConfig: quicConfig, + }, nil +} + +// Start starts the QUIC server +func (s *QUICServer) Start(addr string) error { + listener, err := quic.ListenAddr(addr, s.tlsConfig, s.quicConfig) + if err != nil { + return fmt.Errorf("failed to create QUIC listener: %v", err) + } + s.listener = listener + + go s.acceptConnections() + return nil +} + +// Stop stops the QUIC server +func (s *QUICServer) Stop(ctx context.Context) error { + if s.listener != nil { + return s.listener.Close() + } + return nil +} + +func (s *QUICServer) acceptConnections() { + for { + conn, err := s.listener.Accept(context.Background()) + if err != nil { + log.Printf("Failed to accept QUIC connection: %v", err) + continue + } + + go s.handleConnection(conn) + } +} + +func (s *QUICServer) handleConnection(conn quic.Connection) { + for { + stream, err := conn.AcceptStream(context.Background()) + if err != nil { + log.Printf("Failed to accept QUIC stream: %v", err) + return + } + + go s.handleStream(stream) + } +} + +func (s *QUICServer) handleStream(stream quic.Stream) { + defer stream.Close() + + var req WriteRequest + decoder := json.NewDecoder(stream) + if err := decoder.Decode(&req); err != nil { + s.sendError(stream, "Failed to decode request", err) + return + } + + response := WriteResponse{ + BatchID: req.BatchID, + PointsCount: len(req.Points), + } + + // Process each data point + for _, point := range req.Points { + id := manager.CreateDataPoint( + point.DeviceID, + point.MetricCode, + point.Labels, + point.Value, + ) + + value := model.DataValue{ + Timestamp: point.Timestamp, + Value: point.Value, + } + + if err := s.manager.Write(context.Background(), id, value); err != nil { + s.sendError(stream, "Failed to write data point", err) + return + } + } + + response.Success = true + encoder := json.NewEncoder(stream) + if err := encoder.Encode(response); err != nil { + log.Printf("Failed to send response: %v", err) + } +} + +func (s *QUICServer) sendError(stream quic.Stream, msg string, err error) { + response := WriteResponse{ + Success: false, + Error: fmt.Sprintf("%s: %v", msg, err), + } + encoder := json.NewEncoder(stream) + if err := encoder.Encode(response); err != nil { + log.Printf("Failed to send error response: %v", err) + } +} + +// generateTLSConfig generates a self-signed certificate for development +func generateTLSConfig() (*tls.Config, error) { + key, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return nil, err + } + + template := x509.Certificate{ + SerialNumber: big.NewInt(1), + NotBefore: time.Now(), + NotAfter: time.Now().Add(time.Hour * 24 * 180), // Valid for 180 days + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + } + + certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key) + if err != nil { + return nil, err + } + + keyPEM := pem.EncodeToMemory(&pem.Block{ + Type: "RSA PRIVATE KEY", + Bytes: x509.MarshalPKCS1PrivateKey(key), + }) + + certPEM := pem.EncodeToMemory(&pem.Block{ + Type: "CERTIFICATE", + Bytes: certDER, + }) + + tlsCert, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + return nil, err + } + + return &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + NextProtos: []string{"gotidb-quic"}, // Application protocol + }, nil +} diff --git a/pkg/client/quic_client.go b/pkg/client/quic_client.go new file mode 100644 index 0000000..d3cf400 --- /dev/null +++ b/pkg/client/quic_client.go @@ -0,0 +1,170 @@ +package client + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "time" + + "github.com/quic-go/quic-go" +) + +// QUICClient represents a QUIC protocol client +type QUICClient struct { + conn quic.Connection + tlsConfig *tls.Config + quicConfig *quic.Config + address string +} + +// WriteRequest represents a write request to the server +type WriteRequest struct { + Points []DataPoint `json:"points"` + BatchID string `json:"batch_id"` +} + +// DataPoint represents a single data point +type DataPoint struct { + DeviceID string `json:"device_id"` + MetricCode string `json:"metric_code"` + Labels map[string]string `json:"labels"` + Value float64 `json:"value"` + Timestamp int64 `json:"timestamp"` +} + +// WriteResponse represents the response from the server +type WriteResponse struct { + Success bool `json:"success"` + Error string `json:"error,omitempty"` + BatchID string `json:"batch_id"` + PointsCount int `json:"points_count"` +} + +// NewQUICClient creates a new QUIC client +func NewQUICClient(address string) (*QUICClient, error) { + tlsConfig := &tls.Config{ + InsecureSkipVerify: true, // For development only + NextProtos: []string{"gotidb-quic"}, + } + + quicConfig := &quic.Config{ + MaxIdleTimeout: 30 * time.Second, + KeepAlivePeriod: 10 * time.Second, + } + + return &QUICClient{ + address: address, + tlsConfig: tlsConfig, + quicConfig: quicConfig, + }, nil +} + +// Connect establishes a connection to the QUIC server +func (c *QUICClient) Connect(ctx context.Context) error { + conn, err := quic.DialAddr(ctx, c.address, c.tlsConfig, c.quicConfig) + if err != nil { + return fmt.Errorf("failed to connect to QUIC server: %v", err) + } + c.conn = conn + return nil +} + +// Close closes the connection to the QUIC server +func (c *QUICClient) Close() error { + if c.conn != nil { + return c.conn.CloseWithError(0, "normal closure") + } + return nil +} + +// Write sends data points to the server +func (c *QUICClient) Write(ctx context.Context, points []DataPoint) (*WriteResponse, error) { + if c.conn == nil { + return nil, fmt.Errorf("client not connected") + } + + // Open a new stream for this write request + stream, err := c.conn.OpenStreamSync(ctx) + if err != nil { + return nil, fmt.Errorf("failed to open stream: %v", err) + } + defer stream.Close() + + // Create and send the write request + req := WriteRequest{ + Points: points, + BatchID: fmt.Sprintf("%d", time.Now().UnixNano()), + } + + encoder := json.NewEncoder(stream) + if err := encoder.Encode(req); err != nil { + return nil, fmt.Errorf("failed to encode request: %v", err) + } + + // Read the response + var resp WriteResponse + decoder := json.NewDecoder(stream) + if err := decoder.Decode(&resp); err != nil { + return nil, fmt.Errorf("failed to decode response: %v", err) + } + + return &resp, nil +} + +// WriteBatch is a convenience method for writing multiple data points +func (c *QUICClient) WriteBatch(ctx context.Context, deviceID, metricCode string, values []float64, timestamps []int64, labels map[string]string) (*WriteResponse, error) { + if len(values) != len(timestamps) { + return nil, fmt.Errorf("values and timestamps length mismatch") + } + + points := make([]DataPoint, len(values)) + for i := range values { + points[i] = DataPoint{ + DeviceID: deviceID, + MetricCode: metricCode, + Labels: labels, + Value: values[i], + Timestamp: timestamps[i], + } + } + + return c.Write(ctx, points) +} + +// Example usage: +/* +func main() { + client, err := NewQUICClient("localhost:4242") + if err != nil { + log.Fatal(err) + } + + ctx := context.Background() + if err := client.Connect(ctx); err != nil { + log.Fatal(err) + } + defer client.Close() + + points := []DataPoint{ + { + DeviceID: "device1", + MetricCode: "temperature", + Labels: map[string]string{"location": "room1"}, + Value: 25.5, + Timestamp: time.Now().UnixNano(), + }, + } + + resp, err := client.Write(ctx, points) + if err != nil { + log.Fatal(err) + } + + if !resp.Success { + log.Printf("Write failed: %s", resp.Error) + } else { + log.Printf("Successfully wrote %d points", resp.PointsCount) + } +} +*/