gotunnelserver/client/dataconn.go

228 lines
4.8 KiB
Go

package client
import (
"bytes"
"encoding/binary"
"io"
"log"
"net"
"strconv"
"sync"
"git.pyer.club/kingecg/goemitter"
"git.pyer.club/kingecg/gotunnelserver/util"
ws "github.com/gorilla/websocket"
)
type DataEndPoint struct {
Host string
Port int
*goemitter.EventEmitter
cmdSession *CommandClient
dataSession string
wsConn *ws.Conn
conns map[int32]*DataConn
mux sync.Mutex
}
func (d *DataEndPoint) Close() {
if d.wsConn == nil {
return
}
d.wsConn.Close()
}
func (d *DataEndPoint) onDataConnClose(conn *DataConn) {
delete(d.conns, conn.id)
}
func (d *DataEndPoint) Connect() {
conn, _, err := ws.DefaultDialer.Dial(d.cmdSession.Address+"/ws/pipe/"+d.dataSession, map[string][]string{
"Authorization": {d.cmdSession.makeAuthHeader()},
"Session": {d.cmdSession.SessionId},
})
if err != nil {
d.Emit("Error", d.dataSession)
return
}
d.wsConn = conn
for {
// d.wsConn.SetReadDeadline(time.Now().Add(time.Minute * 5))
_, data, err := d.wsConn.ReadMessage()
log.Println("recv data:", len(data))
if err != nil {
log.Println("recv err:", err)
break
}
var packet Packet
err = packet.UnmarshalBinary(data)
if err != nil {
continue
}
conn, ok := d.conns[packet.id]
if !ok {
tcpconn, err := net.Dial("tcp", d.Host+":"+strconv.Itoa(d.Port))
if err != nil {
log.Println(err)
continue
}
log.Println("R new connection:", packet.id, d.Host, d.Port)
conn = NewDataConnection(packet.id, &tcpconn, d)
conn.Start()
conn.Once("close", func(args ...interface{}) {
log.Println("connection closed:", conn.id)
d.onDataConnClose(conn)
})
d.conns[packet.id] = conn
}
conn.Write(packet)
}
}
func (d *DataEndPoint) Listen() {
// listen and accept connection at port
listener, err := net.Listen("tcp", d.Host+":"+strconv.Itoa(d.Port))
if err != nil {
log.Println(err, d)
d.Emit("Error", d.dataSession)
return
}
for {
conn, err := listener.Accept()
if err != nil {
d.Emit("Error", d.dataSession)
return
}
dconn := NewDataConnection(0, &conn, d)
d.conns[dconn.id] = dconn
dconn.Start()
log.Println("L new connection:", dconn.id, d.Host, d.Port)
dconn.Once("close", func(args ...interface{}) {
log.Println("connection closed:", dconn.id)
d.onDataConnClose(dconn)
})
}
}
func (d *DataEndPoint) Write(p []byte) (n int, err error) {
d.mux.Lock()
defer d.mux.Unlock()
return len(p), d.wsConn.WriteMessage(ws.BinaryMessage, p)
}
func NewDataEndPoint(cmdSession *CommandClient, dataSession string) *DataEndPoint {
return &DataEndPoint{
EventEmitter: goemitter.NewEmitter(),
cmdSession: cmdSession,
dataSession: dataSession,
conns: make(map[int32]*DataConn),
}
}
type DataConn struct {
id int32
conn *net.Conn
out io.Writer
*goemitter.EventEmitter
}
type Packet struct {
id int32
data []byte
}
func (p *Packet) MarshalBinary() ([]byte, error) {
// 创建一个字节切片缓冲区
buf := new(bytes.Buffer)
// 写入 ID
if err := binary.Write(buf, binary.BigEndian, p.id); err != nil {
return nil, err
}
// 写入 Data 的长度
if err := binary.Write(buf, binary.BigEndian, int32(len(p.data))); err != nil {
return nil, err
}
// 写入 Data
if _, err := buf.Write(p.data); err != nil {
return nil, err
}
// 返回字节切片
return buf.Bytes(), nil
}
func (p *Packet) UnmarshalBinary(data []byte) error {
// 创建一个字节切片缓冲区
buf := bytes.NewReader(data)
// 读取 ID
if err := binary.Read(buf, binary.BigEndian, &p.id); err != nil {
return err
}
// 读取 Data 的长度
var dataLen int32
if err := binary.Read(buf, binary.BigEndian, &dataLen); err != nil {
return err
}
// 读取 Data
p.data = make([]byte, dataLen)
if _, err := buf.Read(p.data); err != nil {
return err
}
return nil
}
func (p *Packet) BinaryUnmarshaler(data []byte) error {
buf := bytes.NewReader(data)
return binary.Read(buf, binary.LittleEndian, p)
}
func (d *DataConn) Close() {
(*d.conn).Close()
}
func (d *DataConn) Write(p Packet) (n int, err error) {
log.Println("connection write data:", p.id, len(p.data))
return (*d.conn).Write(p.data)
}
func (d *DataConn) Start() {
go func() {
for {
buf := make([]byte, 4096)
n, err := (*d.conn).Read(buf)
if err != nil {
// panic(err)
log.Println("Error:", err)
break
}
packet := Packet{
id: d.id,
data: buf[:n],
}
data, err := packet.MarshalBinary()
if err != nil {
continue
}
wn, werr := d.out.Write(data)
log.Println("write data:", werr, wn)
}
d.Emit("close")
}()
}
func NewDataConnection(id int32, con *net.Conn, writer io.Writer) *DataConn {
cid := id
if cid == 0 {
cid = util.GenRandomInt()
}
return &DataConn{
id: cid,
conn: con,
out: writer,
EventEmitter: goemitter.NewEmitter(),
}
}