diff --git a/.vscode/launch.json b/.vscode/launch.json index a929be1..59a9cba 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -10,7 +10,7 @@ "request": "launch", "mode": "auto", "program": "${workspaceFolder}/tunnelclient", - "args":["--target-session", "HNCZhU8G86HSfTZv", "--local-port", "2222"] + "args":["--target-session", "0OqhwXRIE9IoFqHE", "--local-port", "2222"] }, { "name": "Launch Package", diff --git a/Makefile b/Makefile index 8d01f9a..d6f8a50 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ PROJECT_DIR := $(shell pwd) BIN_DIR := dist # 定义应用程序的名称 -APPS := tunnelserver +APPS := tunnelserver tunnelclient tunnelagent # 创建输出目录 $(BIN_DIR): diff --git a/client/dataconn.go b/client/dataconn.go index 538b83f..ea32add 100644 --- a/client/dataconn.go +++ b/client/dataconn.go @@ -7,6 +7,7 @@ import ( "log" "net" "strconv" + "sync" "git.pyer.club/kingecg/goemitter" "git.pyer.club/kingecg/gotunnelserver/util" @@ -21,6 +22,7 @@ type DataEndPoint struct { dataSession string wsConn *ws.Conn conns map[int32]*DataConn + mux sync.Mutex } func (d *DataEndPoint) Close() { @@ -43,11 +45,13 @@ func (d *DataEndPoint) Connect() { for { // d.wsConn.SetReadDeadline(time.Now().Add(time.Minute * 5)) _, data, err := d.wsConn.ReadMessage() + log.Println("recv data:", len(data)) if err != nil { + log.Println("recv err:", err) break } var packet Packet - err = packet.BinaryUnmarshaler(data) + err = packet.UnmarshalBinary(data) if err != nil { continue } @@ -58,11 +62,14 @@ func (d *DataEndPoint) Connect() { log.Println(err) continue } + log.Println("R new connection:", packet.id, d.Host, d.Port) conn = NewDataConnection(packet.id, &tcpconn, d) conn.Start() conn.Once("close", func(args ...interface{}) { + log.Println("connection closed:", conn.id) d.onDataConnClose(conn) }) + d.conns[packet.id] = conn } conn.Write(packet) @@ -84,13 +91,17 @@ func (d *DataEndPoint) Listen() { dconn := NewDataConnection(0, &conn, d) d.conns[dconn.id] = dconn dconn.Start() + log.Println("L new connection:", dconn.id, d.Host, d.Port) dconn.Once("close", func(args ...interface{}) { + log.Println("connection closed:", dconn.id) d.onDataConnClose(dconn) }) } } func (d *DataEndPoint) Write(p []byte) (n int, err error) { + d.mux.Lock() + defer d.mux.Unlock() return len(p), d.wsConn.WriteMessage(ws.BinaryMessage, p) } @@ -115,11 +126,48 @@ type Packet struct { } func (p *Packet) MarshalBinary() ([]byte, error) { + // 创建一个字节切片缓冲区 buf := new(bytes.Buffer) - binary.Write(buf, binary.LittleEndian, p) + + // 写入 ID + if err := binary.Write(buf, binary.BigEndian, p.id); err != nil { + return nil, err + } + + // 写入 Data 的长度 + if err := binary.Write(buf, binary.BigEndian, int32(len(p.data))); err != nil { + return nil, err + } + + // 写入 Data + if _, err := buf.Write(p.data); err != nil { + return nil, err + } + + // 返回字节切片 return buf.Bytes(), nil } +func (p *Packet) UnmarshalBinary(data []byte) error { + // 创建一个字节切片缓冲区 + buf := bytes.NewReader(data) + // 读取 ID + if err := binary.Read(buf, binary.BigEndian, &p.id); err != nil { + return err + } + // 读取 Data 的长度 + var dataLen int32 + if err := binary.Read(buf, binary.BigEndian, &dataLen); err != nil { + return err + } + // 读取 Data + p.data = make([]byte, dataLen) + if _, err := buf.Read(p.data); err != nil { + return err + } + return nil +} + func (p *Packet) BinaryUnmarshaler(data []byte) error { buf := bytes.NewReader(data) return binary.Read(buf, binary.LittleEndian, p) @@ -129,16 +177,18 @@ func (d *DataConn) Close() { } func (d *DataConn) Write(p Packet) (n int, err error) { + log.Println("connection write data:", p.id, len(p.data)) return (*d.conn).Write(p.data) } func (d *DataConn) Start() { go func() { for { - buf := make([]byte, 1024) + buf := make([]byte, 4096) n, err := (*d.conn).Read(buf) if err != nil { // panic(err) + log.Println("Error:", err) break } packet := Packet{ @@ -149,7 +199,8 @@ func (d *DataConn) Start() { if err != nil { continue } - d.out.Write(data) + wn, werr := d.out.Write(data) + log.Println("write data:", werr, wn) } d.Emit("close") }() diff --git a/server/handleAgent.go b/server/handleAgent.go index c9f0e1e..5714743 100644 --- a/server/handleAgent.go +++ b/server/handleAgent.go @@ -3,13 +3,24 @@ package server import ( "net/http" + "git.pyer.club/kingecg/gotunnelserver/util" "github.com/gorilla/websocket" ) func (s *Server) HandleAgent(c *websocket.Conn, r *http.Request) { agentSession := NewSession(c) agentSession.Start() + authEntity := r.Context().Value("authEntity") + if authEntity == nil { + agentSession.Close() + return + } + agentSession.Name = authEntity.(*util.AuthEntity).Username + s.agentSession[agentSession.Id] = agentSession command := NcmdSession(agentSession.Id) agentSession.Send(command) + agentSession.Once("Close", func(p ...interface{}) { + delete(s.agentSession, agentSession.Id) + }) } diff --git a/server/handleClient.go b/server/handleClient.go index 24f0be5..33b2c11 100644 --- a/server/handleClient.go +++ b/server/handleClient.go @@ -3,12 +3,19 @@ package server import ( "net/http" + "git.pyer.club/kingecg/gotunnelserver/util" "github.com/gorilla/websocket" ) func (s *Server) HandleClient(conn *websocket.Conn, r *http.Request) { clientSession := NewSession(conn) clientSession.Start() + authEntity := r.Context().Value("authEntity") + if authEntity == nil { + clientSession.Close() + return + } + clientSession.Name = authEntity.(*util.AuthEntity).Username s.clientSession[clientSession.Id] = clientSession command := NcmdSession(clientSession.Id) clientSession.Send(command) @@ -26,6 +33,9 @@ func (s *Server) HandleClient(conn *websocket.Conn, r *http.Request) { return } spipe := NewPipe(clientSession.Id, targetSessionId) + spipe.Once("Close", func(p ...interface{}) { + delete(s.pipes, spipe.Id) + }) s.pipes[spipe.Id] = spipe command := NcmdConnectionInited(spipe.Id) @@ -36,4 +46,7 @@ func (s *Server) HandleClient(conn *websocket.Conn, r *http.Request) { targetSession.Send(command) }) + clientSession.Once("Close", func(p ...interface{}) { + delete(s.clientSession, clientSession.Id) + }) } diff --git a/server/handlePipe.go b/server/handlePipe.go index de8be09..1103168 100644 --- a/server/handlePipe.go +++ b/server/handlePipe.go @@ -3,17 +3,21 @@ package server import ( "net/http" + "git.pyer.club/kingecg/goemitter" + "git.pyer.club/kingecg/gologger" "git.pyer.club/kingecg/gotunnelserver/util" "github.com/gorilla/websocket" ) type Pipe struct { - Id string - Src string - Dst string + Id string + Src string + Dst string + *goemitter.EventEmitter src *websocket.Conn dst *websocket.Conn stopChan chan int + logger *gologger.Logger } func (p *Pipe) Start() { @@ -23,12 +27,15 @@ func (p *Pipe) Start() { <-p.stopChan p.src.Close() p.dst.Close() + p.Emit("Close") } func NewPipe(src, dst string) *Pipe { return &Pipe{ - Id: util.GenRandomstring(16), - Src: src, - Dst: dst, + Id: util.GenRandomstring(16), + Src: src, + Dst: dst, + EventEmitter: goemitter.NewEmitter(), + logger: gologger.GetLogger("pipe"), } } @@ -45,6 +52,7 @@ func (p *Pipe) forward(src, dst *websocket.Conn) { break } + p.logger.Debug("pipe forward:", len(message), "type", mtype) } p.stopChan <- 1 } @@ -90,14 +98,26 @@ func (s *Server) HandlePipe(conn *websocket.Conn, r *http.Request) { } } -func (s *Server) findSession(sessionId string) *Session { - clientSession, ok := s.clientSession[sessionId] +func (s *Server) findSession(sidOrName string) *Session { + clientSession, ok := s.clientSession[sidOrName] if ok { return clientSession } - agentSession, ok := s.agentSession[sessionId] + agentSession, ok := s.agentSession[sidOrName] if ok { return agentSession } + + for _, v := range s.clientSession { + if v.Name == sidOrName { + return v + } + } + + for _, v := range s.agentSession { + if v.Name == sidOrName { + return v + } + } return nil } diff --git a/server/main.go b/server/main.go index 262f663..3f236c8 100644 --- a/server/main.go +++ b/server/main.go @@ -16,8 +16,8 @@ import ( ) var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, + ReadBufferSize: 4096, + WriteBufferSize: 4096, } type ServerConfig struct { diff --git a/server/session.go b/server/session.go index e9c529d..61a59a8 100644 --- a/server/session.go +++ b/server/session.go @@ -11,6 +11,7 @@ import ( type Session struct { Id string + Name string conn *websocket.Conn ctx context.Context cancel context.CancelFunc @@ -26,7 +27,8 @@ func (s *Session) Send(cmd *Command) (err error) { return s.conn.WriteJSON(cmd) } func (s *Session) Close() { - s.conn.Close() + _ = s.conn.Close() + s.Emit("Close") } func (s *Session) Start() { diff --git a/tunnelagent/Makefile b/tunnelagent/Makefile new file mode 100644 index 0000000..16016ab --- /dev/null +++ b/tunnelagent/Makefile @@ -0,0 +1,18 @@ +APP_NAME := tunnelagent +BIN_DIR := ../dist/tunnelagent + +all: $(BIN_DIR) $(BIN_DIR)/$(APP_NAME) + +$(BIN_DIR): + mkdir -p $(BIN_DIR) + +$(BIN_DIR)/$(APP_NAME): main.go + go build -o $@ $< + + +clean: + rm -rf $(BIN_DIR) +test: + go test -v ./... + +.PHONY: clean all \ No newline at end of file diff --git a/tunnelagent/main.go b/tunnelagent/main.go index db3c2a4..8072591 100644 --- a/tunnelagent/main.go +++ b/tunnelagent/main.go @@ -11,7 +11,7 @@ import ( ) type TunnelAgent struct { - Username string `flag_default:"tcclient" flag_usage:"username for tunnel server"` + Username string `flag_default:"tagent" flag_usage:"username for tunnel server"` Salt string `flag_default:"" flag_usage:"salt for tunnel server"` Address string `flag_default:"ws://127.0.0.1:8080" flag_usage:"address for tunnel server"` } diff --git a/tunnelclient/Makefile b/tunnelclient/Makefile new file mode 100644 index 0000000..08f084b --- /dev/null +++ b/tunnelclient/Makefile @@ -0,0 +1,18 @@ +APP_NAME := tunnelclient +BIN_DIR := ../dist/tunnelclient + +all: $(BIN_DIR) $(BIN_DIR)/$(APP_NAME) + +$(BIN_DIR): + mkdir -p $(BIN_DIR) + +$(BIN_DIR)/$(APP_NAME): main.go + go build -o $@ $< + + +clean: + rm -rf $(BIN_DIR) +test: + go test -v ./... + +.PHONY: clean all \ No newline at end of file diff --git a/tunnelserver/__debug_bin2913043442 b/tunnelserver/__debug_bin2913043442 deleted file mode 100755 index eb894c4..0000000 Binary files a/tunnelserver/__debug_bin2913043442 and /dev/null differ