gotidb/pkg/engine/buffer_test.go

307 lines
7.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package engine
import (
"errors"
"testing"
"time"
)
// errorBufferHandler 是一个会返回错误的WriteBufferHandler实现
type errorBufferHandler struct {
writeError bool
flushError bool
writeCount int
flushCount int
lastWritten DataPoint
}
func (h *errorBufferHandler) WriteToBuffer(point DataPoint) error {
h.writeCount++
h.lastWritten = point
if h.writeError {
return errors.New("write error")
}
return nil
}
func (h *errorBufferHandler) ValidatePoint(point DataPoint) error {
// 如果设置了写入错误标志,返回错误
if h.writeError {
return errors.New("simulated write error")
}
return nil
}
func (h *errorBufferHandler) FlushBuffer() error {
h.flushCount++
if h.flushError {
return errors.New("flush error")
}
return nil
}
func TestWriteBuffer_Creation(t *testing.T) {
handler := &mockBufferHandler{}
// 测试正常创建
buffer := NewWriteBuffer(handler, 10)
if buffer == nil {
t.Fatal("NewWriteBuffer() returned nil")
}
if buffer.size != 10 {
t.Errorf("Buffer size = %d, want 10", buffer.size)
}
// 测试零大小缓冲区
buffer = NewWriteBuffer(handler, 0)
if buffer.size != 0 {
t.Errorf("Buffer size = %d, want 0", buffer.size)
}
// 测试负大小缓冲区应该被视为0
buffer = NewWriteBuffer(handler, -1)
if buffer.size != -1 {
t.Errorf("Buffer size = %d, want -1", buffer.size)
}
}
func TestWriteBuffer_SingleWrite(t *testing.T) {
handler := &mockBufferHandler{
points: make([]DataPoint, 0),
}
buffer := NewWriteBuffer(handler, 5)
// 写入单个数据点
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: 42.0,
Labels: map[string]string{"test": "single"},
}
if err := buffer.Write(point); err != nil {
t.Errorf("Write() error = %v", err)
}
// 检查缓冲区状态
if len(buffer.buffer) != 1 {
t.Errorf("Buffer length = %d, want 1", len(buffer.buffer))
}
// 检查处理器状态(不应该有数据点)
if len(handler.points) != 0 {
t.Errorf("Handler points = %d, want 0", len(handler.points))
}
}
func TestWriteBuffer_MultipleWrites(t *testing.T) {
handler := &mockBufferHandler{
points: make([]DataPoint, 0),
}
buffer := NewWriteBuffer(handler, 5)
// 写入多个数据点,但不超过缓冲区大小
for i := 0; i < 3; i++ {
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: float64(i),
Labels: map[string]string{"test": "multiple"},
}
if err := buffer.Write(point); err != nil {
t.Errorf("Write() error = %v", err)
}
}
// 检查缓冲区状态
if len(buffer.buffer) != 3 {
t.Errorf("Buffer length = %d, want 3", len(buffer.buffer))
}
// 检查处理器状态(不应该有数据点)
if len(handler.points) != 0 {
t.Errorf("Handler points = %d, want 0", len(handler.points))
}
}
func TestWriteBuffer_AutoFlush(t *testing.T) {
handler := &mockBufferHandler{
points: make([]DataPoint, 0),
}
buffer := NewWriteBuffer(handler, 3)
// 写入足够的数据点触发自动刷新
for i := 0; i < 3; i++ {
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: float64(i),
Labels: map[string]string{"test": "autoflush"},
}
if err := buffer.Write(point); err != nil {
t.Errorf("Write() error = %v", err)
}
}
// 检查缓冲区状态(应该已经刷新)
if len(buffer.buffer) != 0 {
t.Errorf("Buffer length = %d, want 0", len(buffer.buffer))
}
// 检查处理器状态应该有3个数据点
if len(handler.toflush) != 3 {
t.Errorf("Handler points = %d, want 3", len(handler.points))
}
// 再写入一个数据点
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: 42.0,
Labels: map[string]string{"test": "after_flush"},
}
if err := buffer.Write(point); err != nil {
t.Errorf("Write() error = %v", err)
}
// 检查缓冲区状态
if len(buffer.buffer) != 1 {
t.Errorf("Buffer length = %d, want 1", len(buffer.buffer))
}
// 检查处理器状态仍然应该有3个数据点
if len(handler.toflush) != 3 {
t.Errorf("Handler points = %d, want 3", len(handler.points))
}
}
func TestWriteBuffer_ManualFlush(t *testing.T) {
handler := &mockBufferHandler{
points: make([]DataPoint, 0),
}
buffer := NewWriteBuffer(handler, 10)
// 写入几个数据点
for i := 0; i < 5; i++ {
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: float64(i),
Labels: map[string]string{"test": "manual_flush"},
}
if err := buffer.Write(point); err != nil {
t.Errorf("Write() error = %v", err)
}
}
// 检查缓冲区状态
if len(buffer.buffer) != 5 {
t.Errorf("Buffer length = %d, want 5", len(buffer.buffer))
}
// 手动刷新
if err := buffer.Flush(); err != nil {
t.Errorf("Flush() error = %v", err)
}
// 检查缓冲区状态(应该已经刷新)
if len(buffer.buffer) != 0 {
t.Errorf("Buffer length = %d, want 0", len(buffer.buffer))
}
// 检查处理器状态应该有5个数据点
if len(handler.toflush) != 5 {
t.Errorf("Handler points = %d, want 5", len(handler.points))
}
// 再次刷新(应该没有效果)
if err := buffer.Flush(); err != nil {
t.Errorf("Flush() error = %v", err)
}
// 检查处理器状态仍然应该有5个数据点
if len(handler.toflush) != 5 {
t.Errorf("Handler points = %d, want 5", len(handler.points))
}
}
func TestWriteBuffer_ErrorHandling(t *testing.T) {
// 测试写入错误
writeErrorHandler := &errorBufferHandler{
writeError: true,
}
buffer := NewWriteBuffer(writeErrorHandler, 3)
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: 42.0,
Labels: map[string]string{"test": "error"},
}
// 写入应该返回错误
err := buffer.Write(point)
if err == nil {
t.Error("Write() did not return error")
}
// 检查处理器状态
if writeErrorHandler.writeCount != 0 {
t.Errorf("Handler write count = %d, want 0", writeErrorHandler.writeCount)
}
// 测试刷新错误
flushErrorHandler := &errorBufferHandler{
flushError: true,
}
buffer = NewWriteBuffer(flushErrorHandler, 3)
// 写入几个数据点
for i := 0; i < 3; i++ {
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: float64(i),
Labels: map[string]string{"test": "flush_error"},
}
// 最后一个写入应该触发刷新,并返回错误
err := buffer.Write(point)
if i == 2 && err == nil {
t.Error("Write() did not return error on flush")
}
}
// 检查处理器状态
if flushErrorHandler.writeCount != 3 {
t.Errorf("Handler write count = %d, want 3", flushErrorHandler.writeCount)
}
if flushErrorHandler.flushCount != 1 {
t.Errorf("Handler flush count = %d, want 1", flushErrorHandler.flushCount)
}
}
func TestWriteBuffer_ZeroSize(t *testing.T) {
handler := &mockBufferHandler{
points: make([]DataPoint, 0),
}
buffer := NewWriteBuffer(handler, 0)
// 写入数据点(应该立即刷新)
point := DataPoint{
Timestamp: time.Now().UnixNano(),
Value: 42.0,
Labels: map[string]string{"test": "zero_size"},
}
if err := buffer.Write(point); err != nil {
t.Errorf("Write() error = %v", err)
}
// 检查缓冲区状态
if len(buffer.buffer) != 0 {
t.Errorf("Buffer length = %d, want 0", len(buffer.buffer))
}
// 检查处理器状态应该有1个数据点
if len(handler.toflush) != 1 {
t.Errorf("Handler points = %d, want 1", len(handler.points))
}
}