package server import ( "net/http" "github.com/gorilla/websocket" ) type Pipe struct { Id string Src string Dst string src *websocket.Conn dst *websocket.Conn stopChan chan int } func (p *Pipe) Start() { p.stopChan = make(chan int) go p.forward(p.src, p.dst) go p.forward(p.dst, p.src) <-p.stopChan p.src.Close() p.dst.Close() } func (p *Pipe) forward(src, dst *websocket.Conn) { for { mtype, message, err := src.ReadMessage() if err != nil { break } err = dst.WriteMessage(mtype, message) if err != nil { break } } p.stopChan <- 1 } func (s *Server) HandlePipe(conn *websocket.Conn, r *http.Request) { session := r.Header.Get("Session") if session == "" { s.logger.Error("no session header") conn.Close() return } cmdSession := s.findSession(session) if cmdSession == nil { conn.Close() s.logger.Error("command session not found") return } lpath := r.URL.Path pipeId := lpath[len("/ws/pipe/"):] pipe, ok := s.pipes[pipeId] if !ok { conn.Close() s.logger.Error("pipe not found") cmdSession.Send(NewErrorResponse("pipe not found", nil)) return } if pipe.Src == session { pipe.src = conn } else if pipe.Dst == session { pipe.dst = conn } else { cmdSession.Send(NewErrorResponse("not endpoint of current pipe", nil)) conn.Close() return } if pipe.src != nil && pipe.dst != nil { pipe.Start() clientCmdSession := s.findSession(pipe.Src) clientCmdSession.Send(NcmdConnectionReady(pipe.Id)) // info src endpoint ready and can setup proxy listener } } func (s *Server) findSession(sessionId string) *Session { clientSession, ok := s.clientSession[sessionId] if ok { return clientSession } agentSession, ok := s.agentSession[sessionId] if ok { return agentSession } return nil }