fix connection write issue and add make file

This commit is contained in:
程广 2024-11-16 13:18:22 +08:00
parent c8b2e99003
commit a56afa8af3
12 changed files with 152 additions and 19 deletions

2
.vscode/launch.json vendored
View File

@ -10,7 +10,7 @@
"request": "launch", "request": "launch",
"mode": "auto", "mode": "auto",
"program": "${workspaceFolder}/tunnelclient", "program": "${workspaceFolder}/tunnelclient",
"args":["--target-session", "HNCZhU8G86HSfTZv", "--local-port", "2222"] "args":["--target-session", "0OqhwXRIE9IoFqHE", "--local-port", "2222"]
}, },
{ {
"name": "Launch Package", "name": "Launch Package",

View File

@ -5,7 +5,7 @@ PROJECT_DIR := $(shell pwd)
BIN_DIR := dist BIN_DIR := dist
# 定义应用程序的名称 # 定义应用程序的名称
APPS := tunnelserver APPS := tunnelserver tunnelclient tunnelagent
# 创建输出目录 # 创建输出目录
$(BIN_DIR): $(BIN_DIR):

View File

@ -7,6 +7,7 @@ import (
"log" "log"
"net" "net"
"strconv" "strconv"
"sync"
"git.pyer.club/kingecg/goemitter" "git.pyer.club/kingecg/goemitter"
"git.pyer.club/kingecg/gotunnelserver/util" "git.pyer.club/kingecg/gotunnelserver/util"
@ -21,6 +22,7 @@ type DataEndPoint struct {
dataSession string dataSession string
wsConn *ws.Conn wsConn *ws.Conn
conns map[int32]*DataConn conns map[int32]*DataConn
mux sync.Mutex
} }
func (d *DataEndPoint) Close() { func (d *DataEndPoint) Close() {
@ -43,11 +45,13 @@ func (d *DataEndPoint) Connect() {
for { for {
// d.wsConn.SetReadDeadline(time.Now().Add(time.Minute * 5)) // d.wsConn.SetReadDeadline(time.Now().Add(time.Minute * 5))
_, data, err := d.wsConn.ReadMessage() _, data, err := d.wsConn.ReadMessage()
log.Println("recv data:", len(data))
if err != nil { if err != nil {
log.Println("recv err:", err)
break break
} }
var packet Packet var packet Packet
err = packet.BinaryUnmarshaler(data) err = packet.UnmarshalBinary(data)
if err != nil { if err != nil {
continue continue
} }
@ -58,11 +62,14 @@ func (d *DataEndPoint) Connect() {
log.Println(err) log.Println(err)
continue continue
} }
log.Println("R new connection:", packet.id, d.Host, d.Port)
conn = NewDataConnection(packet.id, &tcpconn, d) conn = NewDataConnection(packet.id, &tcpconn, d)
conn.Start() conn.Start()
conn.Once("close", func(args ...interface{}) { conn.Once("close", func(args ...interface{}) {
log.Println("connection closed:", conn.id)
d.onDataConnClose(conn) d.onDataConnClose(conn)
}) })
d.conns[packet.id] = conn
} }
conn.Write(packet) conn.Write(packet)
@ -84,13 +91,17 @@ func (d *DataEndPoint) Listen() {
dconn := NewDataConnection(0, &conn, d) dconn := NewDataConnection(0, &conn, d)
d.conns[dconn.id] = dconn d.conns[dconn.id] = dconn
dconn.Start() dconn.Start()
log.Println("L new connection:", dconn.id, d.Host, d.Port)
dconn.Once("close", func(args ...interface{}) { dconn.Once("close", func(args ...interface{}) {
log.Println("connection closed:", dconn.id)
d.onDataConnClose(dconn) d.onDataConnClose(dconn)
}) })
} }
} }
func (d *DataEndPoint) Write(p []byte) (n int, err error) { func (d *DataEndPoint) Write(p []byte) (n int, err error) {
d.mux.Lock()
defer d.mux.Unlock()
return len(p), d.wsConn.WriteMessage(ws.BinaryMessage, p) return len(p), d.wsConn.WriteMessage(ws.BinaryMessage, p)
} }
@ -115,11 +126,48 @@ type Packet struct {
} }
func (p *Packet) MarshalBinary() ([]byte, error) { func (p *Packet) MarshalBinary() ([]byte, error) {
// 创建一个字节切片缓冲区
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, p)
// 写入 ID
if err := binary.Write(buf, binary.BigEndian, p.id); err != nil {
return nil, err
}
// 写入 Data 的长度
if err := binary.Write(buf, binary.BigEndian, int32(len(p.data))); err != nil {
return nil, err
}
// 写入 Data
if _, err := buf.Write(p.data); err != nil {
return nil, err
}
// 返回字节切片
return buf.Bytes(), nil return buf.Bytes(), nil
} }
func (p *Packet) UnmarshalBinary(data []byte) error {
// 创建一个字节切片缓冲区
buf := bytes.NewReader(data)
// 读取 ID
if err := binary.Read(buf, binary.BigEndian, &p.id); err != nil {
return err
}
// 读取 Data 的长度
var dataLen int32
if err := binary.Read(buf, binary.BigEndian, &dataLen); err != nil {
return err
}
// 读取 Data
p.data = make([]byte, dataLen)
if _, err := buf.Read(p.data); err != nil {
return err
}
return nil
}
func (p *Packet) BinaryUnmarshaler(data []byte) error { func (p *Packet) BinaryUnmarshaler(data []byte) error {
buf := bytes.NewReader(data) buf := bytes.NewReader(data)
return binary.Read(buf, binary.LittleEndian, p) return binary.Read(buf, binary.LittleEndian, p)
@ -129,16 +177,18 @@ func (d *DataConn) Close() {
} }
func (d *DataConn) Write(p Packet) (n int, err error) { func (d *DataConn) Write(p Packet) (n int, err error) {
log.Println("connection write data:", p.id, len(p.data))
return (*d.conn).Write(p.data) return (*d.conn).Write(p.data)
} }
func (d *DataConn) Start() { func (d *DataConn) Start() {
go func() { go func() {
for { for {
buf := make([]byte, 1024) buf := make([]byte, 4096)
n, err := (*d.conn).Read(buf) n, err := (*d.conn).Read(buf)
if err != nil { if err != nil {
// panic(err) // panic(err)
log.Println("Error:", err)
break break
} }
packet := Packet{ packet := Packet{
@ -149,7 +199,8 @@ func (d *DataConn) Start() {
if err != nil { if err != nil {
continue continue
} }
d.out.Write(data) wn, werr := d.out.Write(data)
log.Println("write data:", werr, wn)
} }
d.Emit("close") d.Emit("close")
}() }()

View File

@ -3,13 +3,24 @@ package server
import ( import (
"net/http" "net/http"
"git.pyer.club/kingecg/gotunnelserver/util"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
func (s *Server) HandleAgent(c *websocket.Conn, r *http.Request) { func (s *Server) HandleAgent(c *websocket.Conn, r *http.Request) {
agentSession := NewSession(c) agentSession := NewSession(c)
agentSession.Start() agentSession.Start()
authEntity := r.Context().Value("authEntity")
if authEntity == nil {
agentSession.Close()
return
}
agentSession.Name = authEntity.(*util.AuthEntity).Username
s.agentSession[agentSession.Id] = agentSession s.agentSession[agentSession.Id] = agentSession
command := NcmdSession(agentSession.Id) command := NcmdSession(agentSession.Id)
agentSession.Send(command) agentSession.Send(command)
agentSession.Once("Close", func(p ...interface{}) {
delete(s.agentSession, agentSession.Id)
})
} }

View File

@ -3,12 +3,19 @@ package server
import ( import (
"net/http" "net/http"
"git.pyer.club/kingecg/gotunnelserver/util"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
func (s *Server) HandleClient(conn *websocket.Conn, r *http.Request) { func (s *Server) HandleClient(conn *websocket.Conn, r *http.Request) {
clientSession := NewSession(conn) clientSession := NewSession(conn)
clientSession.Start() clientSession.Start()
authEntity := r.Context().Value("authEntity")
if authEntity == nil {
clientSession.Close()
return
}
clientSession.Name = authEntity.(*util.AuthEntity).Username
s.clientSession[clientSession.Id] = clientSession s.clientSession[clientSession.Id] = clientSession
command := NcmdSession(clientSession.Id) command := NcmdSession(clientSession.Id)
clientSession.Send(command) clientSession.Send(command)
@ -26,6 +33,9 @@ func (s *Server) HandleClient(conn *websocket.Conn, r *http.Request) {
return return
} }
spipe := NewPipe(clientSession.Id, targetSessionId) spipe := NewPipe(clientSession.Id, targetSessionId)
spipe.Once("Close", func(p ...interface{}) {
delete(s.pipes, spipe.Id)
})
s.pipes[spipe.Id] = spipe s.pipes[spipe.Id] = spipe
command := NcmdConnectionInited(spipe.Id) command := NcmdConnectionInited(spipe.Id)
@ -36,4 +46,7 @@ func (s *Server) HandleClient(conn *websocket.Conn, r *http.Request) {
targetSession.Send(command) targetSession.Send(command)
}) })
clientSession.Once("Close", func(p ...interface{}) {
delete(s.clientSession, clientSession.Id)
})
} }

View File

@ -3,6 +3,8 @@ package server
import ( import (
"net/http" "net/http"
"git.pyer.club/kingecg/goemitter"
"git.pyer.club/kingecg/gologger"
"git.pyer.club/kingecg/gotunnelserver/util" "git.pyer.club/kingecg/gotunnelserver/util"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
@ -11,9 +13,11 @@ type Pipe struct {
Id string Id string
Src string Src string
Dst string Dst string
*goemitter.EventEmitter
src *websocket.Conn src *websocket.Conn
dst *websocket.Conn dst *websocket.Conn
stopChan chan int stopChan chan int
logger *gologger.Logger
} }
func (p *Pipe) Start() { func (p *Pipe) Start() {
@ -23,12 +27,15 @@ func (p *Pipe) Start() {
<-p.stopChan <-p.stopChan
p.src.Close() p.src.Close()
p.dst.Close() p.dst.Close()
p.Emit("Close")
} }
func NewPipe(src, dst string) *Pipe { func NewPipe(src, dst string) *Pipe {
return &Pipe{ return &Pipe{
Id: util.GenRandomstring(16), Id: util.GenRandomstring(16),
Src: src, Src: src,
Dst: dst, Dst: dst,
EventEmitter: goemitter.NewEmitter(),
logger: gologger.GetLogger("pipe"),
} }
} }
@ -45,6 +52,7 @@ func (p *Pipe) forward(src, dst *websocket.Conn) {
break break
} }
p.logger.Debug("pipe forward:", len(message), "type", mtype)
} }
p.stopChan <- 1 p.stopChan <- 1
} }
@ -90,14 +98,26 @@ func (s *Server) HandlePipe(conn *websocket.Conn, r *http.Request) {
} }
} }
func (s *Server) findSession(sessionId string) *Session { func (s *Server) findSession(sidOrName string) *Session {
clientSession, ok := s.clientSession[sessionId] clientSession, ok := s.clientSession[sidOrName]
if ok { if ok {
return clientSession return clientSession
} }
agentSession, ok := s.agentSession[sessionId] agentSession, ok := s.agentSession[sidOrName]
if ok { if ok {
return agentSession return agentSession
} }
for _, v := range s.clientSession {
if v.Name == sidOrName {
return v
}
}
for _, v := range s.agentSession {
if v.Name == sidOrName {
return v
}
}
return nil return nil
} }

View File

@ -16,8 +16,8 @@ import (
) )
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
ReadBufferSize: 1024, ReadBufferSize: 4096,
WriteBufferSize: 1024, WriteBufferSize: 4096,
} }
type ServerConfig struct { type ServerConfig struct {

View File

@ -11,6 +11,7 @@ import (
type Session struct { type Session struct {
Id string Id string
Name string
conn *websocket.Conn conn *websocket.Conn
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@ -26,7 +27,8 @@ func (s *Session) Send(cmd *Command) (err error) {
return s.conn.WriteJSON(cmd) return s.conn.WriteJSON(cmd)
} }
func (s *Session) Close() { func (s *Session) Close() {
s.conn.Close() _ = s.conn.Close()
s.Emit("Close")
} }
func (s *Session) Start() { func (s *Session) Start() {

18
tunnelagent/Makefile Normal file
View File

@ -0,0 +1,18 @@
APP_NAME := tunnelagent
BIN_DIR := ../dist/tunnelagent
all: $(BIN_DIR) $(BIN_DIR)/$(APP_NAME)
$(BIN_DIR):
mkdir -p $(BIN_DIR)
$(BIN_DIR)/$(APP_NAME): main.go
go build -o $@ $<
clean:
rm -rf $(BIN_DIR)
test:
go test -v ./...
.PHONY: clean all

View File

@ -11,7 +11,7 @@ import (
) )
type TunnelAgent struct { type TunnelAgent struct {
Username string `flag_default:"tcclient" flag_usage:"username for tunnel server"` Username string `flag_default:"tagent" flag_usage:"username for tunnel server"`
Salt string `flag_default:"" flag_usage:"salt for tunnel server"` Salt string `flag_default:"" flag_usage:"salt for tunnel server"`
Address string `flag_default:"ws://127.0.0.1:8080" flag_usage:"address for tunnel server"` Address string `flag_default:"ws://127.0.0.1:8080" flag_usage:"address for tunnel server"`
} }

18
tunnelclient/Makefile Normal file
View File

@ -0,0 +1,18 @@
APP_NAME := tunnelclient
BIN_DIR := ../dist/tunnelclient
all: $(BIN_DIR) $(BIN_DIR)/$(APP_NAME)
$(BIN_DIR):
mkdir -p $(BIN_DIR)
$(BIN_DIR)/$(APP_NAME): main.go
go build -o $@ $<
clean:
rm -rf $(BIN_DIR)
test:
go test -v ./...
.PHONY: clean all

Binary file not shown.