package gostream import ( "errors" "reflect" ) type StreamOption struct { WaterMark int // buffer len } type IReadAble[T any] interface { StartRead() chan T } type IReadStream[T any] interface { OnData(func(T)) error Read() (T, error) Pipe(w IWriteStream[T]) error } type IWriteAble[T any] interface { StartWrite() chan T } type IWriteStream[T any] interface { Write(T) } type WriteStream[T any] struct { IWriteAble[T] IWriteStream[T] writeChan chan T // isSync bool } func (w *WriteStream[T]) Write(t T) { if w.writeChan == nil { w.writeChan = w.StartWrite() } w.writeChan <- t } type ReadStream[T any] struct { IReadAble[T] IReadStream[T] readChan chan T isSync bool } // func (r *ReadStream[T]) StartRead() <-chan T { // return nil // } func (r *ReadStream[T]) OnData(cb func(T)) error { if r.readChan != nil { return errors.New("already has reader") } r.readChan = r.StartRead() go func() { for t := range r.readChan { cb(t) } }() return nil } func (r *ReadStream[T]) Read() (T, error) { if r.readChan != nil && !r.isSync { return *new(T), errors.New("already has reader") } if r.readChan == nil { r.readChan = r.StartRead() r.isSync = true } return <-r.readChan, nil } func (r *ReadStream[T]) Pipe(w IWriteStream[T]) error { if r.readChan != nil { return errors.New("already has reader") } r.readChan = r.StartRead() go func() { for t := range r.readChan { w.Write(t) } }() return nil } type ITransformer[T any, K any] interface { Transform(t T) K } type TransformStream[T any, K any] struct { Options StreamOption IWriteAble[T] WriteStream[T] IReadAble[K] ReadStream[K] ITransformer[T, K] writer chan T reader chan K } func (tr *TransformStream[T, K]) StartRead() chan K { if tr.reader != nil { return tr.reader } return make(chan K, tr.Options.WaterMark) } func (tr *TransformStream[T, K]) StartWrite() chan T { if tr.writer != nil { return tr.writer } return make(chan T, tr.Options.WaterMark) } func (tr *TransformStream[T, K]) OnData(cb func(K)) error { err := tr.ReadStream.OnData(cb) if err == nil { if tr.writer == nil { tr.writer = tr.StartWrite() } go func() { for k := range tr.writer { transfered := tr.ITransformer.Transform(k) tvalue := reflect.ValueOf(transfered) if !tvalue.IsZero() && !tvalue.IsValid() { tr.reader <- transfered } } }() } return err } func NewTransform[T any, K any](t ITransformer[T, K], options StreamOption) *TransformStream[T, K] { ret := &TransformStream[T, K]{ Options: options, ITransformer: t, ReadStream: ReadStream[K]{}, WriteStream: WriteStream[T]{}, } ret.ReadStream.IReadAble = ret ret.WriteStream.IWriteAble = ret return ret } func NewReadStream[T any](reader IReadAble[T]) *ReadStream[T] { ret := &ReadStream[T]{ IReadAble: reader, } return ret } func NewWriteStream[T any](writer IWriteAble[T]) *WriteStream[T] { ret := &WriteStream[T]{ IWriteAble: writer, } return ret }