From 25a2c2de57c8f5e370fea15ca3314566d7ebcecd Mon Sep 17 00:00:00 2001 From: kingecg Date: Tue, 10 Jun 2025 21:38:06 +0800 Subject: [PATCH] first gen --- .gitignore | 1 + README.md | 153 +++++++++++++++++++++ cmd/server/main.go | 155 ++++++++++++++++++++++ docs/requirement/require.md | 14 ++ go.mod | 49 +++++++ go.sum | 125 ++++++++++++++++++ pkg/api/rest.go | 257 ++++++++++++++++++++++++++++++++++++ pkg/api/websocket.go | 168 +++++++++++++++++++++++ pkg/manager/datamanager.go | 123 +++++++++++++++++ pkg/messaging/nats.go | 234 ++++++++++++++++++++++++++++++++ pkg/model/datapoint.go | 124 +++++++++++++++++ pkg/model/query.go | 117 ++++++++++++++++ pkg/monitoring/metrics.go | 182 +++++++++++++++++++++++++ pkg/storage/engine.go | 169 ++++++++++++++++++++++++ pkg/storage/persister.go | 169 ++++++++++++++++++++++++ 15 files changed, 2040 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 cmd/server/main.go create mode 100644 docs/requirement/require.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 pkg/api/rest.go create mode 100644 pkg/api/websocket.go create mode 100644 pkg/manager/datamanager.go create mode 100644 pkg/messaging/nats.go create mode 100644 pkg/model/datapoint.go create mode 100644 pkg/model/query.go create mode 100644 pkg/monitoring/metrics.go create mode 100644 pkg/storage/engine.go create mode 100644 pkg/storage/persister.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a725465 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +vendor/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..a29faf8 --- /dev/null +++ b/README.md @@ -0,0 +1,153 @@ +# GoTiDB - 时序数据库 + +GoTiDB 是一个用 Go 语言编写的轻量级时序数据库,专门用于存储和查询时间序列数据。它支持高效的数据写入、查询和实时数据推送功能。 + +## 特性 + +- 高性能内存存储引擎 +- WAL(预写日志)持久化 +- REST API 接口 +- WebSocket 实时数据推送 +- NATS 消息系统集成 +- Prometheus 指标监控 +- 支持自定义标签的数据点 +- 环形缓冲区数据结构 +- 支持多种查询类型(最新值、所有值、持续时间) + +## 安装 + +确保你已经安装了 Go 1.16 或更高版本。 + +```bash +git clone git.pyer.club/kingecg/gotidb +cd gotidb +go mod download +``` + +## 构建 + +```bash +go build -o gotidb cmd/server/main.go +``` + +## 运行 + +```bash +./gotidb [options] +``` + +### 可用选项 + +- `-rest-addr`: REST API 服务地址(默认:":8080") +- `-ws-addr`: WebSocket 服务地址(默认:":8081") +- `-metrics-addr`: 指标服务地址(默认:":8082") +- `-nats-url`: NATS 服务器地址(默认:"nats://localhost:4222") +- `-persistence`: 持久化类型(none, wal)(默认:"none") +- `-persistence-dir`: 持久化目录(默认:"./data") +- `-sync-every`: 每写入多少条数据同步一次(默认:100) + +## API 使用 + +### REST API + +#### 写入数据 + +```bash +curl -X POST http://localhost:8080/api/v1/write \ + -H "Content-Type: application/json" \ + -d '{ + "device_id": "device1", + "metric_code": "temperature", + "labels": { + "location": "room1" + }, + "value": 25.5 + }' +``` + +#### 批量写入数据 + +```bash +curl -X POST http://localhost:8080/api/v1/batch_write \ + -H "Content-Type: application/json" \ + -d '{ + "points": [ + { + "device_id": "device1", + "metric_code": "temperature", + "labels": { + "location": "room1" + }, + "value": 25.5 + }, + { + "device_id": "device2", + "metric_code": "humidity", + "labels": { + "location": "room2" + }, + "value": 60 + } + ] + }' +``` + +#### 查询数据 + +```bash +curl -X POST http://localhost:8080/api/v1/query \ + -H "Content-Type: application/json" \ + -d '{ + "device_id": "device1", + "metric_code": "temperature", + "labels": { + "location": "room1" + }, + "query_type": "latest" + }' +``` + +### WebSocket API + +连接 WebSocket 服务: + +```javascript +const ws = new WebSocket('ws://localhost:8081/ws'); + +// 订阅数据点 +ws.send(JSON.stringify({ + device_id: "device1", + metric_code: "temperature", + labels: { + location: "room1" + } +})); + +// 接收数据更新 +ws.onmessage = function(event) { + const data = JSON.parse(event.data); + console.log('Received update:', data); +}; +``` + +## 监控 + +访问 `http://localhost:8082/metrics` 查看 Prometheus 指标。 + +可用指标: + +- `gotidb_write_total`: 写入操作总数 +- `gotidb_query_total`: 查询操作总数 +- `gotidb_write_latency_seconds`: 写入操作延迟 +- `gotidb_query_latency_seconds`: 查询操作延迟 +- `gotidb_active_connections`: 活跃连接数 +- `gotidb_data_points_count`: 数据点数量 +- `gotidb_persistence_latency_seconds`: 持久化操作延迟 +- `gotidb_persistence_errors_total`: 持久化错误总数 +- `gotidb_messaging_latency_seconds`: 消息操作延迟 +- `gotidb_messaging_errors_total`: 消息错误总数 +- `gotidb_websocket_connections`: WebSocket 连接数 + +## 许可证 + +MIT License \ No newline at end of file diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 0000000..3ac16bf --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,155 @@ +package main + +import ( + "context" + "flag" + "log" + "os" + "os/signal" + "syscall" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/api" + "git.pyer.club/kingecg/gotidb/pkg/manager" + "git.pyer.club/kingecg/gotidb/pkg/messaging" + "git.pyer.club/kingecg/gotidb/pkg/model" + "git.pyer.club/kingecg/gotidb/pkg/monitoring" + "git.pyer.club/kingecg/gotidb/pkg/storage" +) + +var ( + restAddr = flag.String("rest-addr", ":8080", "REST API服务地址") + wsAddr = flag.String("ws-addr", ":8081", "WebSocket服务地址") + metricsAddr = flag.String("metrics-addr", ":8082", "指标服务地址") + natsURL = flag.String("nats-url", "nats://localhost:4222", "NATS服务器地址") + persistenceType = flag.String("persistence", "none", "持久化类型 (none, wal)") + persistenceDir = flag.String("persistence-dir", "./data", "持久化目录") + syncEvery = flag.Int("sync-every", 100, "每写入多少条数据同步一次") +) + +func main() { + flag.Parse() + + // 创建存储引擎 + engine := storage.NewMemoryEngine() + + // 如果启用了持久化,配置持久化 + if *persistenceType != "none" { + persistenceConfig := storage.PersistenceConfig{ + Type: storage.PersistenceType(*persistenceType), + Directory: *persistenceDir, + SyncEvery: *syncEvery, + } + + if err := engine.EnablePersistence(persistenceConfig); err != nil { + log.Fatalf("Failed to enable persistence: %v", err) + } + + log.Printf("Persistence enabled: type=%s, directory=%s", *persistenceType, *persistenceDir) + } + + // 创建数据管理器 + dataManager := manager.NewDataManager(engine) + + // 创建指标收集器 + metrics := monitoring.NewMetrics() + + // 创建REST API服务 + restServer := api.NewRESTServer(dataManager) + + // 创建WebSocket服务 + wsServer := api.NewWebSocketServer(dataManager) + + // 创建NATS消息系统 + natsConfig := messaging.NATSConfig{ + URL: *natsURL, + } + nats, err := messaging.NewNATSMessaging(natsConfig) + if err != nil { + log.Printf("Failed to create NATS messaging: %v", err) + log.Println("Continuing without NATS messaging") + } else { + // 订阅消息 + err = nats.Subscribe(func(msg messaging.DataMessage) error { + // 创建数据点 + id := manager.CreateDataPoint( + msg.DeviceID, + msg.MetricCode, + msg.Labels, + msg.Value, + ) + + // 写入数据 + value := model.DataValue{ + Timestamp: msg.Timestamp, + Value: msg.Value, + } + + return dataManager.Write(context.Background(), id, value) + }) + + if err != nil { + log.Printf("Failed to subscribe to NATS: %v", err) + } else { + log.Println("NATS messaging enabled") + } + } + + // 启动服务 + go func() { + log.Printf("Starting REST API server on %s", *restAddr) + if err := restServer.Start(*restAddr); err != nil { + log.Fatalf("Failed to start REST API server: %v", err) + } + }() + + go func() { + log.Printf("Starting WebSocket server on %s", *wsAddr) + if err := wsServer.Start(*wsAddr); err != nil { + log.Fatalf("Failed to start WebSocket server: %v", err) + } + }() + + go func() { + log.Printf("Starting metrics server on %s", *metricsAddr) + if err := metrics.StartServer(*metricsAddr); err != nil { + log.Fatalf("Failed to start metrics server: %v", err) + } + }() + + // 等待信号 + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + + // 优雅关闭 + log.Println("Shutting down...") + + // 创建关闭上下文 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // 关闭REST API服务 + if err := restServer.Stop(ctx); err != nil { + log.Printf("Failed to stop REST API server: %v", err) + } + + // 关闭WebSocket服务 + if err := wsServer.Stop(ctx); err != nil { + log.Printf("Failed to stop WebSocket server: %v", err) + } + + // 关闭NATS消息系统 + if nats != nil { + if err := nats.Close(); err != nil { + log.Printf("Failed to close NATS messaging: %v", err) + } + } + + // 关闭数据管理器 + if err := dataManager.Close(); err != nil { + log.Printf("Failed to close data manager: %v", err) + } + + log.Println("Shutdown complete") +} diff --git a/docs/requirement/require.md b/docs/requirement/require.md new file mode 100644 index 0000000..4c342dd --- /dev/null +++ b/docs/requirement/require.md @@ -0,0 +1,14 @@ +时序数据库需求的总结及推荐类库 +一、核心需求总结 +1.数据存储逻辑 +每个数据点仅保留最近30次变化值(环形队列覆盖机制) +精确追踪当前值的持续时间(记录值变更的起始时间戳) +2.读写性能要求 +高速内存处理:微秒级写入延迟,支持高并发(>500K ops/sec) +细粒度锁:每个数据点独立锁,避免全局锁竞争 +3.客户端分离 +写客户端:批量提交数据,低延迟响应 +读客户端:支持实时推送变化(数据变更时主动推送) +4.扩展能力 +可选持久化(如WAL日志) +监控指标暴露(内存用量、写入延迟等) \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..f71648f --- /dev/null +++ b/go.mod @@ -0,0 +1,49 @@ +module git.pyer.club/kingecg/gotidb + +go 1.20 + +require ( + github.com/gin-gonic/gin v1.9.1 + github.com/gorilla/websocket v1.5.1 + github.com/nats-io/nats.go v1.31.0 + github.com/prometheus/client_golang v1.17.0 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/bytedance/sonic v1.10.2 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect + github.com/chenzhuoyu/iasm v0.9.1 // indirect + github.com/gabriel-vasile/mimetype v1.4.3 // indirect + github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.16.0 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.4 // indirect + github.com/klauspost/cpuid/v2 v2.2.6 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/leodido/go-urn v1.2.4 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/nats-io/nkeys v0.4.6 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pelletier/go-toml/v2 v2.1.0 // indirect + github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect + github.com/prometheus/common v0.44.0 // indirect + github.com/prometheus/procfs v0.11.1 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.2.11 // indirect + golang.org/x/arch v0.6.0 // indirect + golang.org/x/crypto v0.16.0 // indirect + golang.org/x/net v0.19.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8a69fc4 --- /dev/null +++ b/go.sum @@ -0,0 +1,125 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= +github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM= +github.com/bytedance/sonic v1.10.2 h1:GQebETVBxYB7JGWJtLBi07OVzWwt+8dWA00gEVW2ZFE= +github.com/bytedance/sonic v1.10.2/go.mod h1:iZcSUejdk5aukTND/Eu/ivjQuEL0Cu9/rf50Hi0u/g4= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= +github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= +github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d h1:77cEq6EriyTZ0g/qfRdp61a3Uu/AWrgIq2s0ClJV1g0= +github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d/go.mod h1:8EPpVsBuRksnlj1mLy4AWzRNQYxauNi62uWcE3to6eA= +github.com/chenzhuoyu/iasm v0.9.0/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog= +github.com/chenzhuoyu/iasm v0.9.1 h1:tUHQJXo3NhBqw6s33wkGn9SP3bvrWLdlVIJ3hQBL7P0= +github.com/chenzhuoyu/iasm v0.9.1/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= +github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg= +github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.16.0 h1:x+plE831WK4vaKHO/jpgUGsvLKIqRRkz6M78GuJAfGE= +github.com/go-playground/validator/v10 v10.16.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= +github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= +github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= +github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= +github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= +github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= +github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= +github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= +github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM= +github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= +github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= +github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= +github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= +github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= +github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +golang.org/x/arch v0.6.0 h1:S0JTfE48HbRj80+4tbvZDYsJ3tGv6BUU3XxyZ7CirAc= +golang.org/x/arch v0.6.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= +golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= +golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/pkg/api/rest.go b/pkg/api/rest.go new file mode 100644 index 0000000..61ad7ca --- /dev/null +++ b/pkg/api/rest.go @@ -0,0 +1,257 @@ +package api + +import ( + "context" + "net/http" + "time" + + "github.com/gin-gonic/gin" + + "git.pyer.club/kingecg/gotidb/pkg/manager" + "git.pyer.club/kingecg/gotidb/pkg/model" +) + +// RESTServer REST API服务 +type RESTServer struct { + dataManager *manager.DataManager + router *gin.Engine + server *http.Server +} + +// WriteRequest 写入请求 +type WriteRequest struct { + DeviceID string `json:"device_id"` + MetricCode string `json:"metric_code"` + Labels map[string]string `json:"labels"` + Value interface{} `json:"value"` + Timestamp *time.Time `json:"timestamp,omitempty"` +} + +// BatchWriteRequest 批量写入请求 +type BatchWriteRequest struct { + Points []WriteRequest `json:"points"` +} + +// QueryRequest 查询请求 +type QueryRequest struct { + DeviceID string `json:"device_id"` + MetricCode string `json:"metric_code"` + Labels map[string]string `json:"labels"` + QueryType string `json:"query_type"` + Params map[string]interface{} `json:"params"` +} + +// NewRESTServer 创建一个新的REST API服务 +func NewRESTServer(dataManager *manager.DataManager) *RESTServer { + router := gin.Default() + + server := &RESTServer{ + dataManager: dataManager, + router: router, + } + + server.setupRoutes() + + return server +} + +// setupRoutes 设置路由 +func (s *RESTServer) setupRoutes() { + // 健康检查 + s.router.GET("/health", func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{ + "status": "ok", + }) + }) + + // API版本 + v1 := s.router.Group("/api/v1") + { + // 写入数据 + v1.POST("/write", s.handleWrite) + + // 批量写入数据 + v1.POST("/batch_write", s.handleBatchWrite) + + // 查询数据 + v1.POST("/query", s.handleQuery) + } +} + +// handleWrite 处理写入请求 +func (s *RESTServer) handleWrite(c *gin.Context) { + var req WriteRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "error": "Invalid request: " + err.Error(), + }) + return + } + + // 创建数据点 + timestamp := time.Now() + if req.Timestamp != nil { + timestamp = *req.Timestamp + } + + id := model.DataPointID{ + DeviceID: req.DeviceID, + MetricCode: req.MetricCode, + Labels: req.Labels, + } + + value := model.DataValue{ + Timestamp: timestamp, + Value: req.Value, + } + + // 写入数据 + if err := s.dataManager.Write(c.Request.Context(), id, value); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{ + "error": "Failed to write data: " + err.Error(), + }) + return + } + + c.JSON(http.StatusOK, gin.H{ + "status": "ok", + }) +} + +// handleBatchWrite 处理批量写入请求 +func (s *RESTServer) handleBatchWrite(c *gin.Context) { + var req BatchWriteRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "error": "Invalid request: " + err.Error(), + }) + return + } + + // 创建批量写入数据 + batch := make([]struct { + ID model.DataPointID + Value model.DataValue + }, len(req.Points)) + + for i, point := range req.Points { + timestamp := time.Now() + if point.Timestamp != nil { + timestamp = *point.Timestamp + } + + batch[i].ID = model.DataPointID{ + DeviceID: point.DeviceID, + MetricCode: point.MetricCode, + Labels: point.Labels, + } + + batch[i].Value = model.DataValue{ + Timestamp: timestamp, + Value: point.Value, + } + } + + // 批量写入数据 + if err := s.dataManager.BatchWrite(c.Request.Context(), batch); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{ + "error": "Failed to batch write data: " + err.Error(), + }) + return + } + + c.JSON(http.StatusOK, gin.H{ + "status": "ok", + }) +} + +// handleQuery 处理查询请求 +func (s *RESTServer) handleQuery(c *gin.Context) { + var req QueryRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "error": "Invalid request: " + err.Error(), + }) + return + } + + // 创建数据点ID + id := model.DataPointID{ + DeviceID: req.DeviceID, + MetricCode: req.MetricCode, + Labels: req.Labels, + } + + // 创建查询 + queryType := model.QueryType(req.QueryType) + query := model.NewQuery(queryType, req.Params) + + // 执行查询 + result, err := s.dataManager.ExecuteQuery(c.Request.Context(), id, query) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{ + "error": "Failed to execute query: " + err.Error(), + }) + return + } + + // 根据查询类型返回结果 + switch queryType { + case model.QueryTypeLatest: + value, ok := result.AsLatest() + if !ok { + c.JSON(http.StatusNotFound, gin.H{ + "error": "Data point not found", + }) + return + } + c.JSON(http.StatusOK, gin.H{ + "timestamp": value.Timestamp, + "value": value.Value, + }) + + case model.QueryTypeAll: + values, ok := result.AsAll() + if !ok { + c.JSON(http.StatusNotFound, gin.H{ + "error": "Data point not found", + }) + return + } + c.JSON(http.StatusOK, gin.H{ + "values": values, + }) + + case model.QueryTypeDuration: + duration, ok := result.AsDuration() + if !ok { + c.JSON(http.StatusNotFound, gin.H{ + "error": "Data point not found", + }) + return + } + c.JSON(http.StatusOK, gin.H{ + "duration": duration, + }) + + default: + c.JSON(http.StatusBadRequest, gin.H{ + "error": "Unsupported query type", + }) + } +} + +// Start 启动REST API服务 +func (s *RESTServer) Start(addr string) error { + s.server = &http.Server{ + Addr: addr, + Handler: s.router, + } + + return s.server.ListenAndServe() +} + +// Stop 停止REST API服务 +func (s *RESTServer) Stop(ctx context.Context) error { + return s.server.Shutdown(ctx) +} diff --git a/pkg/api/websocket.go b/pkg/api/websocket.go new file mode 100644 index 0000000..437c5d3 --- /dev/null +++ b/pkg/api/websocket.go @@ -0,0 +1,168 @@ +package api + +import ( + "context" + "encoding/json" + "log" + "net/http" + "sync" + "time" + + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" + + "git.pyer.club/kingecg/gotidb/pkg/manager" + "git.pyer.club/kingecg/gotidb/pkg/model" +) + +// WebSocketServer WebSocket服务 +type WebSocketServer struct { + dataManager *manager.DataManager + router *gin.Engine + server *http.Server + clients map[*websocket.Conn]bool + clientsLock sync.RWMutex + upgrader websocket.Upgrader +} + +// SubscriptionRequest 订阅请求 +type SubscriptionRequest struct { + DeviceID string `json:"device_id"` + MetricCode string `json:"metric_code"` + Labels map[string]string `json:"labels"` +} + +// DataChangeEvent 数据变更事件 +type DataChangeEvent struct { + DeviceID string `json:"device_id"` + MetricCode string `json:"metric_code"` + Labels map[string]string `json:"labels"` + Timestamp time.Time `json:"timestamp"` + Value interface{} `json:"value"` +} + +// NewWebSocketServer 创建一个新的WebSocket服务 +func NewWebSocketServer(dataManager *manager.DataManager) *WebSocketServer { + router := gin.Default() + + server := &WebSocketServer{ + dataManager: dataManager, + router: router, + clients: make(map[*websocket.Conn]bool), + upgrader: websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true // 允许所有来源的WebSocket连接 + }, + }, + } + + server.setupRoutes() + + // 注册数据变更回调 + dataManager.RegisterCallback(server.handleDataChange) + + return server +} + +// setupRoutes 设置路由 +func (s *WebSocketServer) setupRoutes() { + // WebSocket连接 + s.router.GET("/ws", s.handleWebSocket) +} + +// handleWebSocket 处理WebSocket连接 +func (s *WebSocketServer) handleWebSocket(c *gin.Context) { + // 升级HTTP连接为WebSocket连接 + conn, err := s.upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + log.Printf("Failed to upgrade connection: %v", err) + return + } + + // 注册客户端 + s.clientsLock.Lock() + s.clients[conn] = true + s.clientsLock.Unlock() + + // 处理客户端消息 + go s.handleClient(conn) +} + +// handleClient 处理客户端消息 +func (s *WebSocketServer) handleClient(conn *websocket.Conn) { + defer func() { + // 关闭连接 + conn.Close() + + // 移除客户端 + s.clientsLock.Lock() + delete(s.clients, conn) + s.clientsLock.Unlock() + }() + + // 读取客户端消息 + for { + _, message, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Printf("WebSocket error: %v", err) + } + break + } + + // 解析订阅请求 + var req SubscriptionRequest + if err := json.Unmarshal(message, &req); err != nil { + log.Printf("Failed to parse subscription request: %v", err) + continue + } + + // 处理订阅请求 + // TODO: 实现订阅逻辑 + } +} + +// handleDataChange 处理数据变更 +func (s *WebSocketServer) handleDataChange(id model.DataPointID, value model.DataValue) { + // 创建数据变更事件 + event := DataChangeEvent{ + DeviceID: id.DeviceID, + MetricCode: id.MetricCode, + Labels: id.Labels, + Timestamp: value.Timestamp, + Value: value.Value, + } + + // 序列化事件 + data, err := json.Marshal(event) + if err != nil { + log.Printf("Failed to marshal data change event: %v", err) + return + } + + // 广播事件 + s.clientsLock.RLock() + for client := range s.clients { + if err := client.WriteMessage(websocket.TextMessage, data); err != nil { + log.Printf("Failed to send data change event: %v", err) + client.Close() + delete(s.clients, client) + } + } + s.clientsLock.RUnlock() +} + +// Start 启动WebSocket服务 +func (s *WebSocketServer) Start(addr string) error { + s.server = &http.Server{ + Addr: addr, + Handler: s.router, + } + + return s.server.ListenAndServe() +} + +// Stop 停止WebSocket服务 +func (s *WebSocketServer) Stop(ctx context.Context) error { + return s.server.Shutdown(ctx) +} diff --git a/pkg/manager/datamanager.go b/pkg/manager/datamanager.go new file mode 100644 index 0000000..43c82b3 --- /dev/null +++ b/pkg/manager/datamanager.go @@ -0,0 +1,123 @@ +package manager + +import ( + "context" + "errors" + "sync" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/model" + "git.pyer.club/kingecg/gotidb/pkg/storage" +) + +// DataChangeCallback 数据变更回调函数类型 +type DataChangeCallback func(id model.DataPointID, value model.DataValue) + +// DataManager 数据管理器 +type DataManager struct { + engine storage.StorageEngine + callbacks []DataChangeCallback + callbacksLock sync.RWMutex +} + +// NewDataManager 创建一个新的数据管理器 +func NewDataManager(engine storage.StorageEngine) *DataManager { + return &DataManager{ + engine: engine, + callbacks: make([]DataChangeCallback, 0), + } +} + +// Write 写入数据 +func (m *DataManager) Write(ctx context.Context, id model.DataPointID, value model.DataValue) error { + // 写入存储引擎 + if err := m.engine.Write(ctx, id, value); err != nil { + return err + } + + // 触发回调 + m.callbacksLock.RLock() + callbacks := m.callbacks + m.callbacksLock.RUnlock() + + for _, callback := range callbacks { + callback(id, value) + } + + return nil +} + +// BatchWrite 批量写入数据 +func (m *DataManager) BatchWrite(ctx context.Context, batch []struct { + ID model.DataPointID + Value model.DataValue +}) error { + for _, item := range batch { + if err := m.Write(ctx, item.ID, item.Value); err != nil { + return err + } + } + return nil +} + +// RegisterCallback 注册数据变更回调 +func (m *DataManager) RegisterCallback(callback DataChangeCallback) { + m.callbacksLock.Lock() + defer m.callbacksLock.Unlock() + m.callbacks = append(m.callbacks, callback) +} + +// ExecuteQuery 执行查询 +func (m *DataManager) ExecuteQuery(ctx context.Context, id model.DataPointID, query model.Query) (model.Result, error) { + switch query.Type() { + case model.QueryTypeLatest: + value, err := m.engine.GetLatest(ctx, id) + if err != nil { + return nil, err + } + return model.NewLatestResult(value), nil + + case model.QueryTypeAll: + values, err := m.engine.Read(ctx, id) + if err != nil { + return nil, err + } + return model.NewAllResult(values), nil + + case model.QueryTypeDuration: + duration, err := m.engine.GetDuration(ctx, id) + if err != nil { + return nil, err + } + return model.NewDurationResult(duration), nil + + default: + return nil, errors.New("unsupported query type") + } +} + +// Close 关闭数据管理器 +func (m *DataManager) Close() error { + return m.engine.Close() +} + +// EnablePersistence 启用持久化 +func (m *DataManager) EnablePersistence(config storage.PersistenceConfig) error { + return m.engine.EnablePersistence(config) +} + +// CreateDataPoint 创建一个新的数据点 +func CreateDataPoint(deviceID, metricCode string, labels map[string]string, value interface{}) (model.DataPointID, model.DataValue) { + id := model.DataPointID{ + DeviceID: deviceID, + MetricCode: metricCode, + Labels: labels, + } + + dataValue := model.DataValue{ + Timestamp: time.Now(), + Value: value, + } + + return id, dataValue +} diff --git a/pkg/messaging/nats.go b/pkg/messaging/nats.go new file mode 100644 index 0000000..62be42c --- /dev/null +++ b/pkg/messaging/nats.go @@ -0,0 +1,234 @@ +package messaging + +import ( + "context" + "encoding/json" + "fmt" + "log" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + + "git.pyer.club/kingecg/gotidb/pkg/model" +) + +const ( + // DefaultStreamName 默认流名称 + DefaultStreamName = "GOTIDB_DATA" + + // DefaultSubject 默认主题 + DefaultSubject = "gotidb.data.>" +) + +// DataMessage 数据消息 +type DataMessage struct { + DeviceID string `json:"device_id"` + MetricCode string `json:"metric_code"` + Labels map[string]string `json:"labels"` + Timestamp time.Time `json:"timestamp"` + Value interface{} `json:"value"` +} + +// MessageHandler 消息处理函数类型 +type MessageHandler func(msg DataMessage) error + +// NATSMessaging NATS消息系统 +type NATSMessaging struct { + conn *nats.Conn + js jetstream.JetStream + stream jetstream.Stream + consumer jetstream.Consumer + handlers []MessageHandler + subContext context.Context + subCancel context.CancelFunc +} + +// NATSConfig NATS配置 +type NATSConfig struct { + URL string + StreamName string + Subject string +} + +// NewNATSMessaging 创建一个新的NATS消息系统 +func NewNATSMessaging(config NATSConfig) (*NATSMessaging, error) { + // 使用默认值 + if config.StreamName == "" { + config.StreamName = DefaultStreamName + } + if config.Subject == "" { + config.Subject = DefaultSubject + } + + // 连接NATS服务器 + conn, err := nats.Connect(config.URL) + if err != nil { + return nil, fmt.Errorf("failed to connect to NATS: %v", err) + } + + // 创建JetStream上下文 + js, err := jetstream.New(conn) + if err != nil { + conn.Close() + return nil, fmt.Errorf("failed to create JetStream context: %v", err) + } + + // 创建或获取流 + stream, err := js.Stream(context.Background(), config.StreamName) + if err != nil { + // 如果流不存在,创建一个新的流 + cfg := jetstream.StreamConfig{ + Name: config.StreamName, + Subjects: []string{config.Subject}, + } + stream, err = js.CreateStream(context.Background(), cfg) + if err != nil { + conn.Close() + return nil, fmt.Errorf("failed to create stream: %v", err) + } + } + + return &NATSMessaging{ + conn: conn, + js: js, + stream: stream, + handlers: make([]MessageHandler, 0), + }, nil +} + +// Publish 发布消息 +func (n *NATSMessaging) Publish(ctx context.Context, id model.DataPointID, value model.DataValue) error { + // 创建消息 + msg := DataMessage{ + DeviceID: id.DeviceID, + MetricCode: id.MetricCode, + Labels: id.Labels, + Timestamp: value.Timestamp, + Value: value.Value, + } + + // 序列化消息 + data, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to marshal message: %v", err) + } + + // 构建主题 + subject := fmt.Sprintf("gotidb.data.%s.%s", id.DeviceID, id.MetricCode) + + // 发布消息 + _, err = n.js.Publish(ctx, subject, data) + if err != nil { + return fmt.Errorf("failed to publish message: %v", err) + } + + return nil +} + +// BatchPublish 批量发布消息 +func (n *NATSMessaging) BatchPublish(ctx context.Context, batch []struct { + ID model.DataPointID + Value model.DataValue +}) error { + for _, item := range batch { + if err := n.Publish(ctx, item.ID, item.Value); err != nil { + return err + } + } + return nil +} + +// Subscribe 订阅消息 +func (n *NATSMessaging) Subscribe(handler MessageHandler) error { + // 添加处理函数 + n.handlers = append(n.handlers, handler) + + // 如果已经有订阅,不需要重新订阅 + if n.subContext != nil { + return nil + } + + // 创建消费者 + consumerConfig := jetstream.ConsumerConfig{ + Durable: "gotidb-consumer", + AckPolicy: jetstream.AckExplicitPolicy, + DeliverPolicy: jetstream.DeliverAllPolicy, + } + + var err error + n.consumer, err = n.stream.CreateOrUpdateConsumer(context.Background(), consumerConfig) + if err != nil { + return fmt.Errorf("failed to create consumer: %v", err) + } + + // 创建可取消的上下文 + n.subContext, n.subCancel = context.WithCancel(context.Background()) + + // 开始消费消息 + go n.consumeMessages() + + return nil +} + +// consumeMessages 消费消息 +func (n *NATSMessaging) consumeMessages() { + // 创建消费迭代器 + iter, err := n.consumer.Messages() + if err != nil { + log.Printf("Failed to create message iterator: %v", err) + return + } + + // 消费消息 + for { + select { + case <-n.subContext.Done(): + return + default: + msg, err := iter.Next() + if err != nil { + log.Printf("Failed to get next message: %v", err) + continue + } + + // 解析消息 + var dataMsg DataMessage + if err := json.Unmarshal(msg.Data(), &dataMsg); err != nil { + log.Printf("Failed to unmarshal message: %v", err) + msg.Ack() + continue + } + + // 调用所有处理函数 + for _, handler := range n.handlers { + if err := handler(dataMsg); err != nil { + log.Printf("Failed to handle message: %v", err) + } + } + + // 确认消息 + msg.Ack() + } + } +} + +// Unsubscribe 取消订阅 +func (n *NATSMessaging) Unsubscribe() { + if n.subCancel != nil { + n.subCancel() + n.subContext = nil + n.subCancel = nil + } +} + +// Close 关闭连接 +func (n *NATSMessaging) Close() error { + // 取消订阅 + n.Unsubscribe() + + // 关闭连接 + n.conn.Close() + + return nil +} diff --git a/pkg/model/datapoint.go b/pkg/model/datapoint.go new file mode 100644 index 0000000..ebb2dba --- /dev/null +++ b/pkg/model/datapoint.go @@ -0,0 +1,124 @@ +package model + +import ( + "fmt" + "sync" + "time" +) + +// DataPointID 数据点标识 +type DataPointID struct { + DeviceID string // 设备ID + MetricCode string // 指标代码 + Labels map[string]string // 自定义标签 +} + +// String 返回数据点标识的字符串表示 +func (id DataPointID) String() string { + return fmt.Sprintf("%s:%s:%v", id.DeviceID, id.MetricCode, id.Labels) +} + +// DataValue 数据值 +type DataValue struct { + Timestamp time.Time // 时间戳 + Value interface{} // 支持 int64 或 float64 +} + +// CircularBuffer 环形缓冲区 +type CircularBuffer struct { + Values [30]DataValue // 固定大小为30的环形缓冲区 + Head int // 当前写入位置 + Lock sync.RWMutex // 细粒度锁 +} + +// NewCircularBuffer 创建一个新的环形缓冲区 +func NewCircularBuffer() *CircularBuffer { + return &CircularBuffer{ + Head: -1, // 初始化为-1,表示缓冲区为空 + } +} + +// Write 写入一个新的数据值 +func (cb *CircularBuffer) Write(value DataValue) { + cb.Lock.Lock() + defer cb.Lock.Unlock() + + cb.Head = (cb.Head + 1) % len(cb.Values) + cb.Values[cb.Head] = value +} + +// Read 读取所有数据值,按时间顺序返回 +func (cb *CircularBuffer) Read() []DataValue { + cb.Lock.RLock() + defer cb.Lock.RUnlock() + + if cb.Head == -1 { + return []DataValue{} // 缓冲区为空 + } + + result := make([]DataValue, 0, len(cb.Values)) + + // 从最旧的数据开始 + oldest := (cb.Head + 1) % len(cb.Values) + + // 如果缓冲区未满,oldest可能指向未初始化的元素 + // 我们需要检查并跳过这些元素 + for i := 0; i < len(cb.Values); i++ { + idx := (oldest + i) % len(cb.Values) + // 跳过未初始化的元素 + if cb.Values[idx].Timestamp.IsZero() { + continue + } + result = append(result, cb.Values[idx]) + } + + return result +} + +// GetLatest 获取最新的数据值 +func (cb *CircularBuffer) GetLatest() (DataValue, bool) { + cb.Lock.RLock() + defer cb.Lock.RUnlock() + + if cb.Head == -1 { + return DataValue{}, false // 缓冲区为空 + } + + return cb.Values[cb.Head], true +} + +// GetDuration 获取最新值的持续时间 +func (cb *CircularBuffer) GetDuration() time.Duration { + cb.Lock.RLock() + defer cb.Lock.RUnlock() + + if cb.Head == -1 { + return 0 // 缓冲区为空 + } + + latest := cb.Values[cb.Head] + + // 如果只有一个值,无法计算持续时间 + if len(cb.Values) == 1 { + return 0 + } + + // 查找前一个不同的值 + for i := 1; i < len(cb.Values); i++ { + idx := (cb.Head - i + len(cb.Values)) % len(cb.Values) + prev := cb.Values[idx] + + // 跳过未初始化的元素 + if prev.Timestamp.IsZero() { + continue + } + + // 如果值不同,计算持续时间 + if prev.Value != latest.Value { + return latest.Timestamp.Sub(prev.Timestamp) + } + } + + // 所有值都相同,无法计算持续时间 + return 0 +} diff --git a/pkg/model/query.go b/pkg/model/query.go new file mode 100644 index 0000000..76fa28c --- /dev/null +++ b/pkg/model/query.go @@ -0,0 +1,117 @@ +package model + +import ( + "context" +) + +// QueryType 查询类型 +type QueryType string + +const ( + // QueryTypeLatest 获取最新值 + QueryTypeLatest QueryType = "latest" + // QueryTypeAll 获取所有值 + QueryTypeAll QueryType = "all" + // QueryTypeDuration 获取持续时间 + QueryTypeDuration QueryType = "duration" +) + +// Query 查询接口 +type Query interface { + Type() QueryType + Params() map[string]interface{} +} + +// Result 查询结果接口 +type Result interface { + IsEmpty() bool + AsLatest() (DataValue, bool) + AsAll() ([]DataValue, bool) + AsDuration() (float64, bool) +} + +// QueryExecutor 查询执行器接口 +type QueryExecutor interface { + Execute(ctx context.Context, query Query) (Result, error) +} + +// BaseQuery 基础查询实现 +type BaseQuery struct { + queryType QueryType + params map[string]interface{} +} + +// NewQuery 创建一个新的查询 +func NewQuery(queryType QueryType, params map[string]interface{}) Query { + return &BaseQuery{ + queryType: queryType, + params: params, + } +} + +// Type 返回查询类型 +func (q *BaseQuery) Type() QueryType { + return q.queryType +} + +// Params 返回查询参数 +func (q *BaseQuery) Params() map[string]interface{} { + return q.params +} + +// BaseResult 基础查询结果实现 +type BaseResult struct { + latest *DataValue + all []DataValue + duration *float64 +} + +// NewLatestResult 创建一个最新值查询结果 +func NewLatestResult(value DataValue) Result { + return &BaseResult{ + latest: &value, + } +} + +// NewAllResult 创建一个所有值查询结果 +func NewAllResult(values []DataValue) Result { + return &BaseResult{ + all: values, + } +} + +// NewDurationResult 创建一个持续时间查询结果 +func NewDurationResult(duration float64) Result { + return &BaseResult{ + duration: &duration, + } +} + +// IsEmpty 检查结果是否为空 +func (r *BaseResult) IsEmpty() bool { + return r.latest == nil && len(r.all) == 0 && r.duration == nil +} + +// AsLatest 将结果转换为最新值 +func (r *BaseResult) AsLatest() (DataValue, bool) { + if r.latest != nil { + return *r.latest, true + } + return DataValue{}, false +} + +// AsAll 将结果转换为所有值 +func (r *BaseResult) AsAll() ([]DataValue, bool) { + if len(r.all) > 0 { + return r.all, true + } + return nil, false +} + +// AsDuration 将结果转换为持续时间 +func (r *BaseResult) AsDuration() (float64, bool) { + if r.duration != nil { + return *r.duration, true + } + return 0, false +} diff --git a/pkg/monitoring/metrics.go b/pkg/monitoring/metrics.go new file mode 100644 index 0000000..cb3df2b --- /dev/null +++ b/pkg/monitoring/metrics.go @@ -0,0 +1,182 @@ +package monitoring + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +// Metrics 指标收集器 +type Metrics struct { + registry *prometheus.Registry + writeCounter prometheus.Counter + queryCounter prometheus.Counter + writeLatency prometheus.Histogram + queryLatency prometheus.Histogram + activeConnections prometheus.Gauge + dataPointsCount prometheus.Gauge + persistenceLatency prometheus.Histogram + persistenceErrors prometheus.Counter + messagingLatency prometheus.Histogram + messagingErrors prometheus.Counter + websocketConnections prometheus.Gauge +} + +// NewMetrics 创建一个新的指标收集器 +func NewMetrics() *Metrics { + registry := prometheus.NewRegistry() + + writeCounter := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "gotidb_write_total", + Help: "Total number of write operations", + }) + + queryCounter := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "gotidb_query_total", + Help: "Total number of query operations", + }) + + writeLatency := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "gotidb_write_latency_seconds", + Help: "Write operation latency in seconds", + Buckets: prometheus.DefBuckets, + }) + + queryLatency := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "gotidb_query_latency_seconds", + Help: "Query operation latency in seconds", + Buckets: prometheus.DefBuckets, + }) + + activeConnections := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "gotidb_active_connections", + Help: "Number of active connections", + }) + + dataPointsCount := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "gotidb_data_points_count", + Help: "Number of data points in the database", + }) + + persistenceLatency := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "gotidb_persistence_latency_seconds", + Help: "Persistence operation latency in seconds", + Buckets: prometheus.DefBuckets, + }) + + persistenceErrors := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "gotidb_persistence_errors_total", + Help: "Total number of persistence errors", + }) + + messagingLatency := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "gotidb_messaging_latency_seconds", + Help: "Messaging operation latency in seconds", + Buckets: prometheus.DefBuckets, + }) + + messagingErrors := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "gotidb_messaging_errors_total", + Help: "Total number of messaging errors", + }) + + websocketConnections := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "gotidb_websocket_connections", + Help: "Number of active WebSocket connections", + }) + + // 注册指标 + registry.MustRegister(writeCounter) + registry.MustRegister(queryCounter) + registry.MustRegister(writeLatency) + registry.MustRegister(queryLatency) + registry.MustRegister(activeConnections) + registry.MustRegister(dataPointsCount) + registry.MustRegister(persistenceLatency) + registry.MustRegister(persistenceErrors) + registry.MustRegister(messagingLatency) + registry.MustRegister(messagingErrors) + registry.MustRegister(websocketConnections) + + return &Metrics{ + registry: registry, + writeCounter: writeCounter, + queryCounter: queryCounter, + writeLatency: writeLatency, + queryLatency: queryLatency, + activeConnections: activeConnections, + dataPointsCount: dataPointsCount, + persistenceLatency: persistenceLatency, + persistenceErrors: persistenceErrors, + messagingLatency: messagingLatency, + messagingErrors: messagingErrors, + websocketConnections: websocketConnections, + } +} + +// IncrementWriteCounter 增加写入计数器 +func (m *Metrics) IncrementWriteCounter() { + m.writeCounter.Inc() +} + +// IncrementQueryCounter 增加查询计数器 +func (m *Metrics) IncrementQueryCounter() { + m.queryCounter.Inc() +} + +// ObserveWriteLatency 观察写入延迟 +func (m *Metrics) ObserveWriteLatency(seconds float64) { + m.writeLatency.Observe(seconds) +} + +// ObserveQueryLatency 观察查询延迟 +func (m *Metrics) ObserveQueryLatency(seconds float64) { + m.queryLatency.Observe(seconds) +} + +// SetActiveConnections 设置活跃连接数 +func (m *Metrics) SetActiveConnections(count float64) { + m.activeConnections.Set(count) +} + +// SetDataPointsCount 设置数据点数量 +func (m *Metrics) SetDataPointsCount(count float64) { + m.dataPointsCount.Set(count) +} + +// ObservePersistenceLatency 观察持久化延迟 +func (m *Metrics) ObservePersistenceLatency(seconds float64) { + m.persistenceLatency.Observe(seconds) +} + +// IncrementPersistenceErrors 增加持久化错误计数器 +func (m *Metrics) IncrementPersistenceErrors() { + m.persistenceErrors.Inc() +} + +// ObserveMessagingLatency 观察消息延迟 +func (m *Metrics) ObserveMessagingLatency(seconds float64) { + m.messagingLatency.Observe(seconds) +} + +// IncrementMessagingErrors 增加消息错误计数器 +func (m *Metrics) IncrementMessagingErrors() { + m.messagingErrors.Inc() +} + +// SetWebsocketConnections 设置WebSocket连接数 +func (m *Metrics) SetWebsocketConnections(count float64) { + m.websocketConnections.Set(count) +} + +// Handler 返回Prometheus HTTP处理器 +func (m *Metrics) Handler() http.Handler { + return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}) +} + +// StartServer 启动指标服务器 +func (m *Metrics) StartServer(addr string) error { + http.Handle("/metrics", m.Handler()) + return http.ListenAndServe(addr, nil) +} diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go new file mode 100644 index 0000000..0919a49 --- /dev/null +++ b/pkg/storage/engine.go @@ -0,0 +1,169 @@ +package storage + +import ( + "context" + "sync" + + "git.pyer.club/kingecg/gotidb/pkg/model" +) + +// PersistenceType 持久化类型 +type PersistenceType string + +const ( + // PersistenceTypeNone 不持久化 + PersistenceTypeNone PersistenceType = "none" + // PersistenceTypeWAL 使用WAL日志持久化 + PersistenceTypeWAL PersistenceType = "wal" +) + +// PersistenceConfig 持久化配置 +type PersistenceConfig struct { + Type PersistenceType // 持久化类型 + Directory string // 持久化目录 + SyncEvery int // 每写入多少条数据同步一次 +} + +// StorageEngine 存储引擎接口 +type StorageEngine interface { + // Write 写入数据 + Write(ctx context.Context, id model.DataPointID, value model.DataValue) error + // Read 读取数据 + Read(ctx context.Context, id model.DataPointID) ([]model.DataValue, error) + // GetLatest 获取最新数据 + GetLatest(ctx context.Context, id model.DataPointID) (model.DataValue, error) + // GetDuration 获取持续时间 + GetDuration(ctx context.Context, id model.DataPointID) (float64, error) + // EnablePersistence 启用持久化 + EnablePersistence(config PersistenceConfig) error + // Close 关闭存储引擎 + Close() error +} + +// MemoryEngine 内存存储引擎 +type MemoryEngine struct { + data map[string]*model.CircularBuffer // 数据存储 + dataLock sync.RWMutex // 数据锁 + persister Persister // 持久化器 +} + +// NewMemoryEngine 创建一个新的内存存储引擎 +func NewMemoryEngine() *MemoryEngine { + return &MemoryEngine{ + data: make(map[string]*model.CircularBuffer), + } +} + +// Write 写入数据 +func (e *MemoryEngine) Write(ctx context.Context, id model.DataPointID, value model.DataValue) error { + key := id.String() + + e.dataLock.RLock() + buffer, exists := e.data[key] + e.dataLock.RUnlock() + + if !exists { + // 如果数据点不存在,创建一个新的环形缓冲区 + buffer = model.NewCircularBuffer() + e.dataLock.Lock() + e.data[key] = buffer + e.dataLock.Unlock() + } + + // 写入数据 + buffer.Write(value) + + // 如果启用了持久化,写入WAL日志 + if e.persister != nil { + if err := e.persister.Write(id, value); err != nil { + return err + } + } + + return nil +} + +// Read 读取数据 +func (e *MemoryEngine) Read(ctx context.Context, id model.DataPointID) ([]model.DataValue, error) { + key := id.String() + + e.dataLock.RLock() + buffer, exists := e.data[key] + e.dataLock.RUnlock() + + if !exists { + return []model.DataValue{}, nil + } + + return buffer.Read(), nil +} + +// GetLatest 获取最新数据 +func (e *MemoryEngine) GetLatest(ctx context.Context, id model.DataPointID) (model.DataValue, error) { + key := id.String() + + e.dataLock.RLock() + buffer, exists := e.data[key] + e.dataLock.RUnlock() + + if !exists { + return model.DataValue{}, nil + } + + value, exists := buffer.GetLatest() + if !exists { + return model.DataValue{}, nil + } + + return value, nil +} + +// GetDuration 获取持续时间 +func (e *MemoryEngine) GetDuration(ctx context.Context, id model.DataPointID) (float64, error) { + key := id.String() + + e.dataLock.RLock() + buffer, exists := e.data[key] + e.dataLock.RUnlock() + + if !exists { + return 0, nil + } + + duration := buffer.GetDuration() + return duration.Seconds(), nil +} + +// EnablePersistence 启用持久化 +func (e *MemoryEngine) EnablePersistence(config PersistenceConfig) error { + var persister Persister + var err error + + switch config.Type { + case PersistenceTypeWAL: + persister, err = NewWALPersister(config.Directory, config.SyncEvery) + case PersistenceTypeNone: + // 不启用持久化 + e.persister = nil + return nil + default: + // 默认不启用持久化 + e.persister = nil + return nil + } + + if err != nil { + return err + } + + e.persister = persister + return nil +} + +// Close 关闭存储引擎 +func (e *MemoryEngine) Close() error { + if e.persister != nil { + return e.persister.Close() + } + return nil +} diff --git a/pkg/storage/persister.go b/pkg/storage/persister.go new file mode 100644 index 0000000..904318b --- /dev/null +++ b/pkg/storage/persister.go @@ -0,0 +1,169 @@ +package storage + +import ( + "bufio" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/model" +) + +// Persister 持久化接口 +type Persister interface { + // Write 写入数据 + Write(id model.DataPointID, value model.DataValue) error + // Close 关闭持久化器 + Close() error +} + +// WALEntry WAL日志条目 +type WALEntry struct { + Timestamp time.Time `json:"timestamp"` + DeviceID string `json:"device_id"` + MetricCode string `json:"metric_code"` + Labels map[string]string `json:"labels"` + Value interface{} `json:"value"` +} + +// WALPersister WAL持久化器 +type WALPersister struct { + directory string // WAL日志目录 + file *os.File // 当前WAL文件 + writer *bufio.Writer + mutex sync.Mutex + syncEvery int // 每写入多少条数据同步一次 + count int // 当前写入计数 +} + +// NewWALPersister 创建一个新的WAL持久化器 +func NewWALPersister(directory string, syncEvery int) (*WALPersister, error) { + // 确保目录存在 + if err := os.MkdirAll(directory, 0755); err != nil { + return nil, fmt.Errorf("failed to create WAL directory: %v", err) + } + + // 创建新的WAL文件 + filename := filepath.Join(directory, fmt.Sprintf("wal-%d.log", time.Now().UnixNano())) + file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, fmt.Errorf("failed to create WAL file: %v", err) + } + + return &WALPersister{ + directory: directory, + file: file, + writer: bufio.NewWriter(file), + syncEvery: syncEvery, + }, nil +} + +// Write 写入数据到WAL日志 +func (w *WALPersister) Write(id model.DataPointID, value model.DataValue) error { + w.mutex.Lock() + defer w.mutex.Unlock() + + // 创建WAL条目 + entry := WALEntry{ + Timestamp: value.Timestamp, + DeviceID: id.DeviceID, + MetricCode: id.MetricCode, + Labels: id.Labels, + Value: value.Value, + } + + // 序列化为JSON + data, err := json.Marshal(entry) + if err != nil { + return fmt.Errorf("failed to marshal WAL entry: %v", err) + } + + // 写入数据 + if _, err := w.writer.Write(data); err != nil { + return fmt.Errorf("failed to write WAL entry: %v", err) + } + if err := w.writer.WriteByte('\n'); err != nil { + return fmt.Errorf("failed to write newline: %v", err) + } + + // 增加计数 + w.count++ + + // 检查是否需要同步 + if w.count >= w.syncEvery { + if err := w.sync(); err != nil { + return fmt.Errorf("failed to sync WAL: %v", err) + } + w.count = 0 + } + + return nil +} + +// sync 同步WAL日志到磁盘 +func (w *WALPersister) sync() error { + if err := w.writer.Flush(); err != nil { + return err + } + return w.file.Sync() +} + +// Close 关闭WAL持久化器 +func (w *WALPersister) Close() error { + w.mutex.Lock() + defer w.mutex.Unlock() + + // 刷新缓冲区 + if err := w.writer.Flush(); err != nil { + return fmt.Errorf("failed to flush WAL buffer: %v", err) + } + + // 同步到磁盘 + if err := w.file.Sync(); err != nil { + return fmt.Errorf("failed to sync WAL file: %v", err) + } + + // 关闭文件 + return w.file.Close() +} + +// LoadWAL 从WAL日志恢复数据 +func LoadWAL(directory string) ([]WALEntry, error) { + var entries []WALEntry + + // 获取所有WAL文件 + files, err := filepath.Glob(filepath.Join(directory, "wal-*.log")) + if err != nil { + return nil, fmt.Errorf("failed to list WAL files: %v", err) + } + + // 按文件名排序(文件名包含时间戳) + // TODO: 实现文件排序 + + // 读取每个文件 + for _, file := range files { + f, err := os.Open(file) + if err != nil { + return nil, fmt.Errorf("failed to open WAL file %s: %v", file, err) + } + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + var entry WALEntry + if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil { + return nil, fmt.Errorf("failed to unmarshal WAL entry: %v", err) + } + entries = append(entries, entry) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("failed to scan WAL file %s: %v", file, err) + } + } + + return entries, nil +}