diff --git a/main.go b/main.go index cbf0487..4b1fa71 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,9 @@ package gostream -import "errors" +import ( + "errors" + "reflect" +) type StreamOption struct { WaterMark int // buffer len @@ -124,7 +127,11 @@ func (tr *TransformStream[T, K]) OnData(cb func(K)) error { go func() { for k := range tr.writer { transfered := tr.ITransformer.Transform(k) - tr.reader <- transfered + tvalue := reflect.ValueOf(transfered) + if !tvalue.IsZero() && !tvalue.IsValid() { + tr.reader <- transfered + } + } }() }