gotidb/pkg/messaging/nats.go

235 lines
5.1 KiB
Go

package messaging
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"git.pyer.club/kingecg/gotidb/pkg/model"
)
const (
// DefaultStreamName 默认流名称
DefaultStreamName = "GOTIDB_DATA"
// DefaultSubject 默认主题
DefaultSubject = "gotidb.data.>"
)
// DataMessage 数据消息
type DataMessage struct {
DeviceID string `json:"device_id"`
MetricCode string `json:"metric_code"`
Labels map[string]string `json:"labels"`
Timestamp time.Time `json:"timestamp"`
Value interface{} `json:"value"`
}
// MessageHandler 消息处理函数类型
type MessageHandler func(msg DataMessage) error
// NATSMessaging NATS消息系统
type NATSMessaging struct {
conn *nats.Conn
js jetstream.JetStream
stream jetstream.Stream
consumer jetstream.Consumer
handlers []MessageHandler
subContext context.Context
subCancel context.CancelFunc
}
// NATSConfig NATS配置
type NATSConfig struct {
URL string
StreamName string
Subject string
}
// NewNATSMessaging 创建一个新的NATS消息系统
func NewNATSMessaging(config NATSConfig) (*NATSMessaging, error) {
// 使用默认值
if config.StreamName == "" {
config.StreamName = DefaultStreamName
}
if config.Subject == "" {
config.Subject = DefaultSubject
}
// 连接NATS服务器
conn, err := nats.Connect(config.URL)
if err != nil {
return nil, fmt.Errorf("failed to connect to NATS: %v", err)
}
// 创建JetStream上下文
js, err := jetstream.New(conn)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to create JetStream context: %v", err)
}
// 创建或获取流
stream, err := js.Stream(context.Background(), config.StreamName)
if err != nil {
// 如果流不存在,创建一个新的流
cfg := jetstream.StreamConfig{
Name: config.StreamName,
Subjects: []string{config.Subject},
}
stream, err = js.CreateStream(context.Background(), cfg)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to create stream: %v", err)
}
}
return &NATSMessaging{
conn: conn,
js: js,
stream: stream,
handlers: make([]MessageHandler, 0),
}, nil
}
// Publish 发布消息
func (n *NATSMessaging) Publish(ctx context.Context, id model.DataPointID, value model.DataValue) error {
// 创建消息
msg := DataMessage{
DeviceID: id.DeviceID,
MetricCode: id.MetricCode,
Labels: id.Labels,
Timestamp: value.Timestamp,
Value: value.Value,
}
// 序列化消息
data, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %v", err)
}
// 构建主题
subject := fmt.Sprintf("gotidb.data.%s.%s", id.DeviceID, id.MetricCode)
// 发布消息
_, err = n.js.Publish(ctx, subject, data)
if err != nil {
return fmt.Errorf("failed to publish message: %v", err)
}
return nil
}
// BatchPublish 批量发布消息
func (n *NATSMessaging) BatchPublish(ctx context.Context, batch []struct {
ID model.DataPointID
Value model.DataValue
}) error {
for _, item := range batch {
if err := n.Publish(ctx, item.ID, item.Value); err != nil {
return err
}
}
return nil
}
// Subscribe 订阅消息
func (n *NATSMessaging) Subscribe(handler MessageHandler) error {
// 添加处理函数
n.handlers = append(n.handlers, handler)
// 如果已经有订阅,不需要重新订阅
if n.subContext != nil {
return nil
}
// 创建消费者
consumerConfig := jetstream.ConsumerConfig{
Durable: "gotidb-consumer",
AckPolicy: jetstream.AckExplicitPolicy,
DeliverPolicy: jetstream.DeliverAllPolicy,
}
var err error
n.consumer, err = n.stream.CreateOrUpdateConsumer(context.Background(), consumerConfig)
if err != nil {
return fmt.Errorf("failed to create consumer: %v", err)
}
// 创建可取消的上下文
n.subContext, n.subCancel = context.WithCancel(context.Background())
// 开始消费消息
go n.consumeMessages()
return nil
}
// consumeMessages 消费消息
func (n *NATSMessaging) consumeMessages() {
// 创建消费迭代器
iter, err := n.consumer.Messages()
if err != nil {
log.Printf("Failed to create message iterator: %v", err)
return
}
// 消费消息
for {
select {
case <-n.subContext.Done():
return
default:
msg, err := iter.Next()
if err != nil {
log.Printf("Failed to get next message: %v", err)
continue
}
// 解析消息
var dataMsg DataMessage
if err := json.Unmarshal(msg.Data(), &dataMsg); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
msg.Ack()
continue
}
// 调用所有处理函数
for _, handler := range n.handlers {
if err := handler(dataMsg); err != nil {
log.Printf("Failed to handle message: %v", err)
}
}
// 确认消息
msg.Ack()
}
}
}
// Unsubscribe 取消订阅
func (n *NATSMessaging) Unsubscribe() {
if n.subCancel != nil {
n.subCancel()
n.subContext = nil
n.subCancel = nil
}
}
// Close 关闭连接
func (n *NATSMessaging) Close() error {
// 取消订阅
n.Unsubscribe()
// 关闭连接
n.conn.Close()
return nil
}