gostream/main.go

166 lines
2.9 KiB
Go
Raw Normal View History

2023-12-02 13:08:31 +08:00
package gostream
import (
"errors"
"reflect"
)
2023-12-02 13:08:31 +08:00
type StreamOption struct {
WaterMark int // buffer len
}
type IReadAble[T any] interface {
StartRead() chan T
}
type IReadStream[T any] interface {
OnData(func(T)) error
Read() (T, error)
Pipe(w IWriteStream[T]) error
}
type IWriteAble[T any] interface {
StartWrite() chan T
}
type IWriteStream[T any] interface {
Write(T)
}
type WriteStream[T any] struct {
IWriteAble[T]
IWriteStream[T]
writeChan chan T
// isSync bool
}
func (w *WriteStream[T]) Write(t T) {
if w.writeChan == nil {
w.writeChan = w.StartWrite()
}
w.writeChan <- t
}
type ReadStream[T any] struct {
IReadAble[T]
IReadStream[T]
readChan chan T
isSync bool
}
// func (r *ReadStream[T]) StartRead() <-chan T {
// return nil
// }
func (r *ReadStream[T]) OnData(cb func(T)) error {
if r.readChan != nil {
return errors.New("already has reader")
}
r.readChan = r.StartRead()
go func() {
for t := range r.readChan {
cb(t)
}
}()
return nil
}
func (r *ReadStream[T]) Read() (T, error) {
if r.readChan != nil && !r.isSync {
return *new(T), errors.New("already has reader")
}
if r.readChan == nil {
r.readChan = r.StartRead()
r.isSync = true
}
return <-r.readChan, nil
}
func (r *ReadStream[T]) Pipe(w IWriteStream[T]) error {
if r.readChan != nil {
return errors.New("already has reader")
}
r.readChan = r.StartRead()
go func() {
for t := range r.readChan {
w.Write(t)
}
}()
return nil
}
type ITransformer[T any, K any] interface {
Transform(t T) K
}
type TransformStream[T any, K any] struct {
Options StreamOption
IWriteAble[T]
WriteStream[T]
IReadAble[K]
ReadStream[K]
ITransformer[T, K]
writer chan T
reader chan K
}
func (tr *TransformStream[T, K]) StartRead() chan K {
if tr.reader != nil {
return tr.reader
}
return make(chan K, tr.Options.WaterMark)
}
func (tr *TransformStream[T, K]) StartWrite() chan T {
if tr.writer != nil {
return tr.writer
}
return make(chan T, tr.Options.WaterMark)
}
func (tr *TransformStream[T, K]) OnData(cb func(K)) error {
err := tr.ReadStream.OnData(cb)
if err == nil {
if tr.writer == nil {
tr.writer = tr.StartWrite()
}
go func() {
for k := range tr.writer {
transfered := tr.ITransformer.Transform(k)
tvalue := reflect.ValueOf(transfered)
if !tvalue.IsZero() && !tvalue.IsValid() {
tr.reader <- transfered
}
2023-12-02 13:08:31 +08:00
}
}()
}
return err
}
func NewTransform[T any, K any](t ITransformer[T, K], options StreamOption) *TransformStream[T, K] {
ret := &TransformStream[T, K]{
Options: options,
ITransformer: t,
ReadStream: ReadStream[K]{},
WriteStream: WriteStream[T]{},
}
ret.ReadStream.IReadAble = ret
ret.WriteStream.IWriteAble = ret
return ret
}
func NewReadStream[T any](reader IReadAble[T]) *ReadStream[T] {
ret := &ReadStream[T]{
IReadAble: reader,
}
return ret
}
func NewWriteStream[T any](writer IWriteAble[T]) *WriteStream[T] {
ret := &WriteStream[T]{
IWriteAble: writer,
}
return ret
}