package client import ( "bytes" "encoding/binary" "io" "log" "net" "strconv" "git.pyer.club/kingecg/goemitter" "git.pyer.club/kingecg/gotunnelserver/util" ws "github.com/gorilla/websocket" ) type DataEndPoint struct { Host string Port int cmdSession *CommandClient dataSession string wsConn *ws.Conn conns map[int32]*DataConn } func (d *DataEndPoint) Close() { d.wsConn.Close() } func (d *DataEndPoint) onDataConnClose(conn *DataConn) { delete(d.conns, conn.id) } func (d *DataEndPoint) Connect() { conn, _, err := ws.DefaultDialer.Dial(d.cmdSession.Address+"/ws/pipe/"+d.dataSession, map[string][]string{ "Authorization": {d.cmdSession.makeAuthHeader()}, "Session": {d.cmdSession.SessionId}, }) if err != nil { panic(err) } d.wsConn = conn for { // d.wsConn.SetReadDeadline(time.Now().Add(time.Minute * 5)) _, data, err := d.wsConn.ReadMessage() if err != nil { break } var packet Packet err = packet.BinaryUnmarshaler(data) if err != nil { continue } conn, ok := d.conns[packet.id] if !ok { tcpconn, err := net.Dial("tcp", d.Host+":"+strconv.Itoa(d.Port)) if err != nil { log.Println(err) continue } conn = NewDataConnection(packet.id, &tcpconn, d) conn.Start() conn.Once("close", func(args ...interface{}) { d.onDataConnClose(conn) }) } conn.Write(packet) } } func (d *DataEndPoint) Listen() { // listen and accept connection at port listener, err := net.Listen("tcp", d.Host+":"+strconv.Itoa(d.Port)) if err != nil { log.Println(err, d) panic(err) } for { conn, err := listener.Accept() if err != nil { panic(err) } dconn := NewDataConnection(0, &conn, d) d.conns[dconn.id] = dconn dconn.Start() dconn.Once("close", func(args ...interface{}) { d.onDataConnClose(dconn) }) } } func (d *DataEndPoint) Write(p []byte) (n int, err error) { return len(p), d.wsConn.WriteMessage(ws.BinaryMessage, p) } func NewDataEndPoint(cmdSession *CommandClient, dataSession string) *DataEndPoint { return &DataEndPoint{ cmdSession: cmdSession, dataSession: dataSession, conns: make(map[int32]*DataConn), } } type DataConn struct { id int32 conn *net.Conn out io.Writer *goemitter.EventEmitter } type Packet struct { id int32 data []byte } func (p *Packet) MarshalBinary() ([]byte, error) { buf := new(bytes.Buffer) binary.Write(buf, binary.LittleEndian, p) return buf.Bytes(), nil } func (p *Packet) BinaryUnmarshaler(data []byte) error { buf := bytes.NewReader(data) return binary.Read(buf, binary.LittleEndian, p) } func (d *DataConn) Close() { (*d.conn).Close() } func (d *DataConn) Write(p Packet) (n int, err error) { return (*d.conn).Write(p.data) } func (d *DataConn) Start() { go func() { for { buf := make([]byte, 1024) n, err := (*d.conn).Read(buf) if err != nil { // panic(err) break } packet := Packet{ id: d.id, data: buf[:n], } data, err := packet.MarshalBinary() if err != nil { continue } d.out.Write(data) } d.Emit("close") }() } func NewDataConnection(id int32, con *net.Conn, writer io.Writer) *DataConn { cid := id if cid == 0 { cid = util.GenRandomInt() } return &DataConn{ id: cid, conn: con, out: writer, EventEmitter: goemitter.NewEmitter(), } }