package model import ( "fmt" "sort" "sync" "time" ) // DataPointID 数据点标识 type DataPointID struct { DeviceID string // 设备ID MetricCode string // 指标代码 Labels map[string]string // 自定义标签 } // String 返回数据点标识的字符串表示 func (id DataPointID) String() string { return id.Hash() } func (id DataPointID) MetricHash() string { return fmt.Sprintf("%s:%s", id.DeviceID, id.MetricCode) } // Equal 判断两个数据点标识是否相等 func (id DataPointID) Equal(other DataPointID) bool { if id.DeviceID != other.DeviceID || id.MetricCode != other.MetricCode { return false } if len(id.Labels) != len(other.Labels) { return false } for k, v := range id.Labels { if otherV, ok := other.Labels[k]; !ok || v != otherV { return false } } return true } // Hash 返回数据点标识的哈希值 func (id DataPointID) Hash() string { if len(id.Labels) == 0 { return fmt.Sprintf("%s:%s:", id.DeviceID, id.MetricCode) } // 提取并排序标签键 keys := make([]string, 0, len(id.Labels)) for k := range id.Labels { keys = append(keys, k) } sort.Strings(keys) // 按排序后的键顺序构建标签字符串 var labelStr string for i, k := range keys { if i == 0 { labelStr = fmt.Sprintf("%s=%s", k, id.Labels[k]) } else { labelStr = fmt.Sprintf("%s:%s=%s", labelStr, k, id.Labels[k]) } } return fmt.Sprintf("%s:%s:%s", id.DeviceID, id.MetricCode, labelStr) } // DataValue 数据值 type DataValue struct { Timestamp time.Time // 时间戳 Value interface{} // 支持 int64 或 float64 } // CircularBuffer 环形缓冲区 type CircularBuffer struct { Values []DataValue // 固定大小为30的环形缓冲区 Head int // 当前写入位置 Lock sync.RWMutex // 细粒度锁 } // NewCircularBuffer 创建一个新的环形缓冲区 func NewCircularBuffer(bufferLen int) *CircularBuffer { return &CircularBuffer{ Values: make([]DataValue, bufferLen), Head: -1, // 初始化为-1,表示缓冲区为空 } } // Write 写入一个新的数据值 func (cb *CircularBuffer) Write(value DataValue) { cb.Lock.Lock() defer cb.Lock.Unlock() cb.Head = (cb.Head + 1) % len(cb.Values) cb.Values[cb.Head] = value } // Read 读取所有数据值,按时间顺序返回 func (cb *CircularBuffer) Read() []DataValue { cb.Lock.RLock() defer cb.Lock.RUnlock() if cb.Head == -1 { return []DataValue{} // 缓冲区为空 } result := make([]DataValue, 0, len(cb.Values)) // 从最旧的数据开始 oldest := (cb.Head + 1) % len(cb.Values) // 如果缓冲区未满,oldest可能指向未初始化的元素 // 我们需要检查并跳过这些元素 for i := 0; i < len(cb.Values); i++ { idx := (oldest + i) % len(cb.Values) // 跳过未初始化的元素 if cb.Values[idx].Timestamp.IsZero() { continue } result = append(result, cb.Values[idx]) } return result } // GetLatest 获取最新的数据值 func (cb *CircularBuffer) GetLatest() (DataValue, bool) { cb.Lock.RLock() defer cb.Lock.RUnlock() if cb.Head == -1 { return DataValue{}, false // 缓冲区为空 } return cb.Values[cb.Head], true } // GetDuration 获取最新值的持续时间 func (cb *CircularBuffer) GetDuration() time.Duration { cb.Lock.RLock() defer cb.Lock.RUnlock() if cb.Head == -1 { return 0 // 缓冲区为空 } latest := cb.Values[cb.Head] // 如果只有一个值,无法计算持续时间 if len(cb.Values) == 1 { return 0 } // 查找前一个不同的值 for i := 1; i < len(cb.Values); i++ { idx := (cb.Head - i + len(cb.Values)) % len(cb.Values) prev := cb.Values[idx] // 跳过未初始化的元素 if prev.Timestamp.IsZero() { continue } // 如果值不同,计算持续时间 if prev.Value != latest.Value { return latest.Timestamp.Sub(prev.Timestamp) } } // 所有值都相同,无法计算持续时间 return 0 }