From 4d19b8a8df73736bee7700c820ed75c7f0827647 Mon Sep 17 00:00:00 2001 From: kingecg Date: Sat, 2 Dec 2023 13:08:31 +0800 Subject: [PATCH] add cde --- .vscode/launch.json | 7 ++ go.mod | 3 + main.go | 158 ++++++++++++++++++++++++++++++++++++++++++++ test/main.go | 57 ++++++++++++++++ 4 files changed, 225 insertions(+) create mode 100644 .vscode/launch.json create mode 100644 go.mod create mode 100644 main.go create mode 100644 test/main.go diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..f980ab9 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,7 @@ +{ + // 使用 IntelliSense 了解相关属性。 + // 悬停以查看现有属性的描述。 + // 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [] +} \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..7e5163a --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.pyer.club/kingecg/gostream + +go 1.19 diff --git a/main.go b/main.go new file mode 100644 index 0000000..cbf0487 --- /dev/null +++ b/main.go @@ -0,0 +1,158 @@ +package gostream + +import "errors" + +type StreamOption struct { + WaterMark int // buffer len +} + +type IReadAble[T any] interface { + StartRead() chan T +} + +type IReadStream[T any] interface { + OnData(func(T)) error + Read() (T, error) + Pipe(w IWriteStream[T]) error +} + +type IWriteAble[T any] interface { + StartWrite() chan T +} + +type IWriteStream[T any] interface { + Write(T) +} +type WriteStream[T any] struct { + IWriteAble[T] + IWriteStream[T] + writeChan chan T + // isSync bool +} + +func (w *WriteStream[T]) Write(t T) { + if w.writeChan == nil { + w.writeChan = w.StartWrite() + } + w.writeChan <- t +} + +type ReadStream[T any] struct { + IReadAble[T] + IReadStream[T] + readChan chan T + isSync bool +} + +// func (r *ReadStream[T]) StartRead() <-chan T { +// return nil +// } +func (r *ReadStream[T]) OnData(cb func(T)) error { + if r.readChan != nil { + return errors.New("already has reader") + } + r.readChan = r.StartRead() + + go func() { + for t := range r.readChan { + cb(t) + } + }() + return nil +} + +func (r *ReadStream[T]) Read() (T, error) { + if r.readChan != nil && !r.isSync { + return *new(T), errors.New("already has reader") + } + if r.readChan == nil { + r.readChan = r.StartRead() + r.isSync = true + } + + return <-r.readChan, nil +} + +func (r *ReadStream[T]) Pipe(w IWriteStream[T]) error { + if r.readChan != nil { + return errors.New("already has reader") + } + r.readChan = r.StartRead() + go func() { + for t := range r.readChan { + w.Write(t) + } + }() + return nil +} + +type ITransformer[T any, K any] interface { + Transform(t T) K +} + +type TransformStream[T any, K any] struct { + Options StreamOption + IWriteAble[T] + WriteStream[T] + IReadAble[K] + ReadStream[K] + ITransformer[T, K] + writer chan T + reader chan K +} + +func (tr *TransformStream[T, K]) StartRead() chan K { + if tr.reader != nil { + return tr.reader + } + return make(chan K, tr.Options.WaterMark) +} + +func (tr *TransformStream[T, K]) StartWrite() chan T { + if tr.writer != nil { + return tr.writer + } + return make(chan T, tr.Options.WaterMark) +} + +func (tr *TransformStream[T, K]) OnData(cb func(K)) error { + err := tr.ReadStream.OnData(cb) + if err == nil { + if tr.writer == nil { + tr.writer = tr.StartWrite() + } + go func() { + for k := range tr.writer { + transfered := tr.ITransformer.Transform(k) + tr.reader <- transfered + } + }() + } + return err +} + +func NewTransform[T any, K any](t ITransformer[T, K], options StreamOption) *TransformStream[T, K] { + ret := &TransformStream[T, K]{ + Options: options, + ITransformer: t, + ReadStream: ReadStream[K]{}, + WriteStream: WriteStream[T]{}, + } + ret.ReadStream.IReadAble = ret + ret.WriteStream.IWriteAble = ret + return ret +} + +func NewReadStream[T any](reader IReadAble[T]) *ReadStream[T] { + ret := &ReadStream[T]{ + IReadAble: reader, + } + return ret +} + +func NewWriteStream[T any](writer IWriteAble[T]) *WriteStream[T] { + ret := &WriteStream[T]{ + IWriteAble: writer, + } + return ret +} diff --git a/test/main.go b/test/main.go new file mode 100644 index 0000000..826ce9e --- /dev/null +++ b/test/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "fmt" + + "git.pyer.club/kingecg/gostream" +) + +type TReadAble[T any] struct { + gostream.ReadStream[T] + sended int + E []T +} + +func (tr *TReadAble[T]) StartRead() chan T { + // var d T + // if tr.sended < len(tr.E) { + // d = tr.E[tr.sended] + // tr.sended++ + // return &d + // } + // return nil + + ch := make(chan T) + go func() { + for i := 0; i < len(tr.E); i++ { + ch <- tr.E[i] + } + close(ch) + }() + return ch +} +func main() { + + x := &TReadAble[int]{ + gostream.ReadStream[int]{}, + 0, + []int{1, 2, 3, 4, 5}, + } + x.ReadStream.IReadAble = x + // x.OnData(func(d int) { + // fmt.Println(d) + // }) + fmt.Println(x.Read()) + fmt.Println(x.Read()) + fmt.Println(x.Read()) + fmt.Println(x.Read()) + + // for t := tr.DoRead(); t != nil; t = tr.DoRead() { + // fmt.Println(*t) + // } + // tr.OnData(func(d int) { + // fmt.Println("tr.OnData", d) + // }) + select {} + +}