gosocketio/vendor/github.com/googollee/go-socket.io/engineio/session/session.go

348 lines
6.5 KiB
Go

package session
import (
"io"
"net"
"net/http"
"net/url"
"sync"
"time"
"github.com/googollee/go-socket.io/engineio/frame"
"github.com/googollee/go-socket.io/engineio/packet"
"github.com/googollee/go-socket.io/engineio/payload"
"github.com/googollee/go-socket.io/engineio/transport"
)
// Pauser is connection which can be paused and resumes.
type Pauser interface {
Pause()
Resume()
}
type Session struct {
conn transport.Conn
params transport.ConnParameters
transport string
context interface{}
upgradeLocker sync.RWMutex
}
func New(conn transport.Conn, sid, transport string, params transport.ConnParameters) (*Session, error) {
params.SID = sid
ses := &Session{
transport: transport,
conn: conn,
params: params,
}
if err := ses.setDeadline(); err != nil {
ses.Close()
return nil, err
}
return ses, nil
}
func (s *Session) SetContext(v interface{}) {
s.context = v
}
func (s *Session) Context() interface{} {
return s.context
}
func (s *Session) ID() string {
return s.params.SID
}
func (s *Session) Transport() string {
s.upgradeLocker.RLock()
defer s.upgradeLocker.RUnlock()
return s.transport
}
func (s *Session) Close() error {
s.upgradeLocker.RLock()
defer s.upgradeLocker.RUnlock()
return s.conn.Close()
}
// NextReader attempts to obtain a ReadCloser from the session's connection.
// When finished writing, the caller MUST Close the ReadCloser to unlock the
// connection's FramerReader.
func (s *Session) NextReader() (FrameType, io.ReadCloser, error) {
for {
ft, pt, r, err := s.nextReader()
if err != nil {
s.Close()
return 0, nil, err
}
switch pt {
case packet.PING:
// Respond to a ping with a pong.
err := func() error {
w, err := s.nextWriter(ft, packet.PONG)
if err != nil {
return err
}
// echo
_, err = io.Copy(w, r)
w.Close() // unlocks the wrapped connection's FrameWriter
r.Close() // unlocks the wrapped connection's FrameReader
return err
}()
if err != nil {
s.Close()
return 0, nil, err
}
// Read another frame.
if err := s.setDeadline(); err != nil {
s.Close()
return 0, nil, err
}
case packet.CLOSE:
r.Close() // unlocks the wrapped connection's FrameReader
s.Close()
return 0, nil, io.EOF
case packet.MESSAGE:
// Caller must Close the ReadCloser to unlock the connection's
// FrameReader when finished reading.
return FrameType(ft), r, nil
default:
// Unknown packet type. Close reader and try again.
r.Close()
}
}
}
func (s *Session) URL() url.URL {
s.upgradeLocker.RLock()
defer s.upgradeLocker.RUnlock()
return s.conn.URL()
}
func (s *Session) LocalAddr() net.Addr {
s.upgradeLocker.RLock()
defer s.upgradeLocker.RUnlock()
return s.conn.LocalAddr()
}
func (s *Session) RemoteAddr() net.Addr {
s.upgradeLocker.RLock()
defer s.upgradeLocker.RUnlock()
return s.conn.RemoteAddr()
}
func (s *Session) RemoteHeader() http.Header {
s.upgradeLocker.RLock()
defer s.upgradeLocker.RUnlock()
return s.conn.RemoteHeader()
}
// NextWriter attempts to obtain a WriteCloser from the session's connection.
// When finished writing, the caller MUST Close the WriteCloser to unlock the
// connection's FrameWriter.
func (s *Session) NextWriter(typ FrameType) (io.WriteCloser, error) {
return s.nextWriter(frame.Type(typ), packet.MESSAGE)
}
func (s *Session) Upgrade(transport string, conn transport.Conn) {
go s.upgrading(transport, conn)
}
func (s *Session) InitSession() error {
w, err := s.nextWriter(frame.String, packet.OPEN)
if err != nil {
s.Close()
return err
}
if _, err := s.params.WriteTo(w); err != nil {
w.Close()
s.Close()
return err
}
if err := w.Close(); err != nil {
s.Close()
return err
}
return nil
}
func (s *Session) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.upgradeLocker.RLock()
conn := s.conn
s.upgradeLocker.RUnlock()
if h, ok := conn.(http.Handler); ok {
h.ServeHTTP(w, r)
}
}
func (s *Session) nextReader() (frame.Type, packet.Type, io.ReadCloser, error) {
for {
s.upgradeLocker.RLock()
conn := s.conn
s.upgradeLocker.RUnlock()
ft, pt, r, err := conn.NextReader()
if err != nil {
if op, ok := err.(payload.Error); ok && op.Temporary() {
continue
}
return 0, 0, nil, err
}
return ft, pt, r, nil
}
}
func (s *Session) nextWriter(ft frame.Type, pt packet.Type) (io.WriteCloser, error) {
for {
s.upgradeLocker.RLock()
conn := s.conn
s.upgradeLocker.RUnlock()
w, err := conn.NextWriter(ft, pt)
if err != nil {
if op, ok := err.(payload.Error); ok && op.Temporary() {
continue
}
return nil, err
}
// Caller must Close the WriteCloser to unlock the connection's
// FrameWriter when finished writing.
return w, nil
}
}
func (s *Session) setDeadline() error {
s.upgradeLocker.RLock()
defer s.upgradeLocker.RUnlock()
deadline := time.Now().Add(s.params.PingTimeout)
err := s.conn.SetReadDeadline(deadline)
if err != nil {
return err
}
return s.conn.SetWriteDeadline(deadline)
}
func (s *Session) upgrading(t string, conn transport.Conn) {
// Read a ping from the client.
err := conn.SetReadDeadline(time.Now().Add(s.params.PingTimeout))
if err != nil {
conn.Close()
return
}
ft, pt, r, err := conn.NextReader()
if err != nil {
conn.Close()
return
}
if pt != packet.PING {
r.Close()
conn.Close()
return
}
// Wait to close the reader until after data is read and echoed in the reply.
// Sent a pong in reply.
err = conn.SetWriteDeadline(time.Now().Add(s.params.PingTimeout))
if err != nil {
r.Close()
conn.Close()
return
}
w, err := conn.NextWriter(ft, packet.PONG)
if err != nil {
r.Close()
conn.Close()
return
}
// echo
if _, err = io.Copy(w, r); err != nil {
w.Close()
r.Close()
conn.Close()
return
}
if err = r.Close(); err != nil {
w.Close()
conn.Close()
return
}
if err = w.Close(); err != nil {
conn.Close()
return
}
// Pause the old connection.
s.upgradeLocker.RLock()
old := s.conn
s.upgradeLocker.RUnlock()
p, ok := old.(Pauser)
if !ok {
// old transport doesn't support upgrading
conn.Close()
return
}
p.Pause()
// Prepare to resume the connection if upgrade fails.
defer func() {
if p != nil {
p.Resume()
}
}()
// Check for upgrade packet from the client.
_, pt, r, err = conn.NextReader()
if err != nil {
conn.Close()
return
}
if pt != packet.UPGRADE {
r.Close()
conn.Close()
return
}
if err = r.Close(); err != nil {
conn.Close()
return
}
// Successful upgrade.
s.upgradeLocker.Lock()
s.conn = conn
s.transport = t
s.upgradeLocker.Unlock()
p = nil
old.Close()
}