gotunnelserver/server/handlePipe.go

124 lines
2.4 KiB
Go
Raw Permalink Normal View History

package server
import (
"net/http"
"git.pyer.club/kingecg/goemitter"
"git.pyer.club/kingecg/gologger"
2024-11-15 16:49:01 +08:00
"git.pyer.club/kingecg/gotunnelserver/util"
"github.com/gorilla/websocket"
)
2024-11-13 22:26:29 +08:00
type Pipe struct {
Id string
Src string
Dst string
*goemitter.EventEmitter
2024-11-13 22:26:29 +08:00
src *websocket.Conn
dst *websocket.Conn
stopChan chan int
logger *gologger.Logger
2024-11-13 22:26:29 +08:00
}
func (p *Pipe) Start() {
p.stopChan = make(chan int)
go p.forward(p.src, p.dst)
go p.forward(p.dst, p.src)
<-p.stopChan
p.src.Close()
p.dst.Close()
p.Emit("Close")
2024-11-13 22:26:29 +08:00
}
2024-11-15 16:49:01 +08:00
func NewPipe(src, dst string) *Pipe {
return &Pipe{
Id: util.GenRandomstring(16),
Src: src,
Dst: dst,
EventEmitter: goemitter.NewEmitter(),
logger: gologger.GetLogger("pipe"),
2024-11-15 16:49:01 +08:00
}
}
2024-11-13 22:26:29 +08:00
func (p *Pipe) forward(src, dst *websocket.Conn) {
for {
mtype, message, err := src.ReadMessage()
if err != nil {
break
}
err = dst.WriteMessage(mtype, message)
if err != nil {
break
}
p.logger.Debug("pipe forward:", len(message), "type", mtype)
2024-11-13 22:26:29 +08:00
}
p.stopChan <- 1
}
func (s *Server) HandlePipe(conn *websocket.Conn, r *http.Request) {
2024-11-13 22:26:29 +08:00
session := r.Header.Get("Session")
if session == "" {
s.logger.Error("no session header")
conn.Close()
return
}
cmdSession := s.findSession(session)
if cmdSession == nil {
conn.Close()
s.logger.Error("command session not found")
return
}
lpath := r.URL.Path
pipeId := lpath[len("/ws/pipe/"):]
pipe, ok := s.pipes[pipeId]
if !ok {
conn.Close()
s.logger.Error("pipe not found")
cmdSession.Send(NewErrorResponse("pipe not found", nil))
return
}
if pipe.Src == session {
pipe.src = conn
} else if pipe.Dst == session {
pipe.dst = conn
} else {
cmdSession.Send(NewErrorResponse("not endpoint of current pipe", nil))
conn.Close()
return
}
if pipe.src != nil && pipe.dst != nil {
2024-11-15 16:49:01 +08:00
go pipe.Start()
2024-11-13 22:26:29 +08:00
clientCmdSession := s.findSession(pipe.Src)
clientCmdSession.Send(NcmdConnectionReady(pipe.Id)) // info src endpoint ready and can setup proxy listener
}
}
func (s *Server) findSession(sidOrName string) *Session {
clientSession, ok := s.clientSession[sidOrName]
2024-11-13 22:26:29 +08:00
if ok {
return clientSession
}
agentSession, ok := s.agentSession[sidOrName]
2024-11-13 22:26:29 +08:00
if ok {
return agentSession
}
for _, v := range s.clientSession {
if v.Name == sidOrName {
return v
}
}
for _, v := range s.agentSession {
if v.Name == sidOrName {
return v
}
}
2024-11-13 22:26:29 +08:00
return nil
}