gotidb/docs/writer.md

3.7 KiB
Raw Permalink Blame History

时序数据库写入接口设计

QUIC协议选型

为什么选择QUIC

  1. 低延迟特性

    • 0-RTT连接建立减少握手开销
    • 避免队头阻塞,提高并发写入性能
    • 内置多路复用支持,优化批量写入
  2. 可靠性保证

    • 内置加密和认证
    • 基于UDP的可靠传输
    • 拥塞控制机制
  3. 与需求匹配度

    • 支持高并发(>500K ops/sec)的写入场景
    • 批量提交数据时的多路复用优势
    • 低延迟响应要求

接口设计

连接建立

message WriteHandshake {
  string client_id = 1;        // 客户端标识
  string auth_token = 2;       // 认证令牌
  uint32 protocol_version = 3; // 协议版本号
}

数据写入

message WriteRequest {
  message DataPoint {
    string metric_id = 1;     // 指标ID
    double value = 2;         // 数值
    int64 timestamp = 3;      // 时间戳(微秒)
  }
  
  repeated DataPoint points = 1;  // 批量数据点
  string batch_id = 2;           // 批次ID(用于去重)
}

message WriteResponse {
  bool success = 1;           // 写入是否成功
  string error = 2;           // 错误信息(如果有)
  string batch_id = 3;        // 对应的批次ID
  uint32 points_written = 4;  // 成功写入的点数
}

实现细节

1. 连接管理

  • 使用QUIC的0-RTT特性快速建立连接
  • 保持长连接以减少重连开销
  • 支持连接复用和会话恢复

2. 数据流控制

  • 每个批次写入使用独立的QUIC流
  • 利用QUIC的流量控制机制
  • 支持写入优先级设置

3. 批量写入优化

  • 客户端聚合策略:
    • 时间窗口(默认100ms)
    • 数据点数量阈值(默认1000点)
    • 数据大小阈值(默认1MB)
  • 服务端并行处理:
    • 多流并行写入
    • 批次内并行处理

4. 错误处理

  • 网络错误自动重试
  • 批次级别的原子性保证
  • 错误反馈机制

性能考虑

1. 连接优化

  • 连接池管理
  • 会话票据复用
  • 智能负载均衡

2. 内存管理

  • 零拷贝数据传输
  • 内存池复用
  • 批量数据预分配

3. 并发控制

  • 每个数据点独立锁
  • 无锁数据结构
  • 批量写入优化

监控指标

  1. 性能指标

    • 写入延迟(P99/P95/P50)
    • 写入吞吐量
    • 连接数量
  2. 错误指标

    • 写入失败率
    • 重试次数
    • 错误类型分布
  3. 资源使用

    • 内存使用量
    • 网络带宽使用
    • CPU使用率

示例代码

// 客户端示例
type Writer struct {
    conn    quic.Connection
    streams map[string]quic.Stream
    mu      sync.RWMutex
}

func (w *Writer) WriteBatch(points []DataPoint) error {
    stream, err := w.conn.OpenStreamSync(context.Background())
    if err != nil {
        return err
    }
    defer stream.Close()

    req := &WriteRequest{
        Points:  points,
        BatchId: uuid.New().String(),
    }
    
    // 序列化并发送
    data, err := proto.Marshal(req)
    if err != nil {
        return err
    }
    
    _, err = stream.Write(data)
    return err
}

// 服务端示例
func (s *Server) handleStream(stream quic.Stream) {
    defer stream.Close()
    
    buf := make([]byte, maxMessageSize)
    n, err := stream.Read(buf)
    if err != nil {
        return
    }
    
    req := &WriteRequest{}
    if err := proto.Unmarshal(buf[:n], req); err != nil {
        return
    }
    
    // 处理写入请求
    s.processWriteRequest(req, stream)
}

后续优化方向

  1. 压缩优化

    • 时间戳压缩
    • 数值压缩
    • 批量压缩
  2. 安全增强

    • 细粒度访问控制
    • 加密传输优化
    • 审计日志
  3. 客户端优化

    • 智能批处理
    • 自适应重试
    • 连接池优化