159 lines
2.8 KiB
Go
159 lines
2.8 KiB
Go
|
package gostream
|
||
|
|
||
|
import "errors"
|
||
|
|
||
|
type StreamOption struct {
|
||
|
WaterMark int // buffer len
|
||
|
}
|
||
|
|
||
|
type IReadAble[T any] interface {
|
||
|
StartRead() chan T
|
||
|
}
|
||
|
|
||
|
type IReadStream[T any] interface {
|
||
|
OnData(func(T)) error
|
||
|
Read() (T, error)
|
||
|
Pipe(w IWriteStream[T]) error
|
||
|
}
|
||
|
|
||
|
type IWriteAble[T any] interface {
|
||
|
StartWrite() chan T
|
||
|
}
|
||
|
|
||
|
type IWriteStream[T any] interface {
|
||
|
Write(T)
|
||
|
}
|
||
|
type WriteStream[T any] struct {
|
||
|
IWriteAble[T]
|
||
|
IWriteStream[T]
|
||
|
writeChan chan T
|
||
|
// isSync bool
|
||
|
}
|
||
|
|
||
|
func (w *WriteStream[T]) Write(t T) {
|
||
|
if w.writeChan == nil {
|
||
|
w.writeChan = w.StartWrite()
|
||
|
}
|
||
|
w.writeChan <- t
|
||
|
}
|
||
|
|
||
|
type ReadStream[T any] struct {
|
||
|
IReadAble[T]
|
||
|
IReadStream[T]
|
||
|
readChan chan T
|
||
|
isSync bool
|
||
|
}
|
||
|
|
||
|
// func (r *ReadStream[T]) StartRead() <-chan T {
|
||
|
// return nil
|
||
|
// }
|
||
|
func (r *ReadStream[T]) OnData(cb func(T)) error {
|
||
|
if r.readChan != nil {
|
||
|
return errors.New("already has reader")
|
||
|
}
|
||
|
r.readChan = r.StartRead()
|
||
|
|
||
|
go func() {
|
||
|
for t := range r.readChan {
|
||
|
cb(t)
|
||
|
}
|
||
|
}()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (r *ReadStream[T]) Read() (T, error) {
|
||
|
if r.readChan != nil && !r.isSync {
|
||
|
return *new(T), errors.New("already has reader")
|
||
|
}
|
||
|
if r.readChan == nil {
|
||
|
r.readChan = r.StartRead()
|
||
|
r.isSync = true
|
||
|
}
|
||
|
|
||
|
return <-r.readChan, nil
|
||
|
}
|
||
|
|
||
|
func (r *ReadStream[T]) Pipe(w IWriteStream[T]) error {
|
||
|
if r.readChan != nil {
|
||
|
return errors.New("already has reader")
|
||
|
}
|
||
|
r.readChan = r.StartRead()
|
||
|
go func() {
|
||
|
for t := range r.readChan {
|
||
|
w.Write(t)
|
||
|
}
|
||
|
}()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
type ITransformer[T any, K any] interface {
|
||
|
Transform(t T) K
|
||
|
}
|
||
|
|
||
|
type TransformStream[T any, K any] struct {
|
||
|
Options StreamOption
|
||
|
IWriteAble[T]
|
||
|
WriteStream[T]
|
||
|
IReadAble[K]
|
||
|
ReadStream[K]
|
||
|
ITransformer[T, K]
|
||
|
writer chan T
|
||
|
reader chan K
|
||
|
}
|
||
|
|
||
|
func (tr *TransformStream[T, K]) StartRead() chan K {
|
||
|
if tr.reader != nil {
|
||
|
return tr.reader
|
||
|
}
|
||
|
return make(chan K, tr.Options.WaterMark)
|
||
|
}
|
||
|
|
||
|
func (tr *TransformStream[T, K]) StartWrite() chan T {
|
||
|
if tr.writer != nil {
|
||
|
return tr.writer
|
||
|
}
|
||
|
return make(chan T, tr.Options.WaterMark)
|
||
|
}
|
||
|
|
||
|
func (tr *TransformStream[T, K]) OnData(cb func(K)) error {
|
||
|
err := tr.ReadStream.OnData(cb)
|
||
|
if err == nil {
|
||
|
if tr.writer == nil {
|
||
|
tr.writer = tr.StartWrite()
|
||
|
}
|
||
|
go func() {
|
||
|
for k := range tr.writer {
|
||
|
transfered := tr.ITransformer.Transform(k)
|
||
|
tr.reader <- transfered
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func NewTransform[T any, K any](t ITransformer[T, K], options StreamOption) *TransformStream[T, K] {
|
||
|
ret := &TransformStream[T, K]{
|
||
|
Options: options,
|
||
|
ITransformer: t,
|
||
|
ReadStream: ReadStream[K]{},
|
||
|
WriteStream: WriteStream[T]{},
|
||
|
}
|
||
|
ret.ReadStream.IReadAble = ret
|
||
|
ret.WriteStream.IWriteAble = ret
|
||
|
return ret
|
||
|
}
|
||
|
|
||
|
func NewReadStream[T any](reader IReadAble[T]) *ReadStream[T] {
|
||
|
ret := &ReadStream[T]{
|
||
|
IReadAble: reader,
|
||
|
}
|
||
|
return ret
|
||
|
}
|
||
|
|
||
|
func NewWriteStream[T any](writer IWriteAble[T]) *WriteStream[T] {
|
||
|
ret := &WriteStream[T]{
|
||
|
IWriteAble: writer,
|
||
|
}
|
||
|
return ret
|
||
|
}
|