mirror of
https://github.com/haiwen/seafile-server.git
synced 2025-05-11 09:35:41 +00:00
* Fix crash when concurrent close channel * Add ErrCh to notify client's goruntine to exit * Add WaitGroup to wait for all goruntine to exit * Call Signal on error --------- Co-authored-by: 杨赫然 <heran.yang@seafile.com>
104 lines
2.7 KiB
Go
104 lines
2.7 KiB
Go
package main
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/dgraph-io/ristretto/z"
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
const (
|
|
chanBufSize = 10
|
|
)
|
|
|
|
// clients is a map from client id to Client structs.
|
|
// It contains all current connected clients. Each client is identified by 64-bit ID.
|
|
var clients map[uint64]*Client
|
|
var clientsMutex sync.RWMutex
|
|
|
|
// Use atomic operation to increase this value.
|
|
var nextClientID uint64 = 1
|
|
|
|
// subscriptions is a map from repo_id to Subscribers struct.
|
|
// It's protected by rw mutex.
|
|
var subscriptions map[string]*Subscribers
|
|
var subMutex sync.RWMutex
|
|
|
|
// Client contains information about a client.
|
|
// Two go routines are associated with each client to handle message reading and writting.
|
|
// Messages sent to the client have to be written into WCh, since only one go routine can write to a websocket connection.
|
|
type Client struct {
|
|
// The ID of this client
|
|
ID uint64
|
|
// Websocket connection.
|
|
conn *websocket.Conn
|
|
// Connections do not support concurrent writers. Protect write with a mutex.
|
|
connMutex sync.Mutex
|
|
|
|
// WCh is used to write messages to a client.
|
|
// The structs written into the channel will be converted to JSON and sent to client.
|
|
WCh chan interface{}
|
|
|
|
// Repos is the repos this client subscribed to.
|
|
Repos map[string]int64
|
|
ReposMutex sync.Mutex
|
|
// Alive is the last time received pong.
|
|
Alive time.Time
|
|
ConnCloser *z.Closer
|
|
// Addr is the address of client.
|
|
Addr string
|
|
// User is the user of client.
|
|
User string
|
|
}
|
|
|
|
// Subscribers contains the clients who subscribe to a repo's notifications.
|
|
type Subscribers struct {
|
|
// Clients is a map from client id to Client struct, protected by rw mutex.
|
|
Clients map[uint64]*Client
|
|
Mutex sync.RWMutex
|
|
}
|
|
|
|
// Init inits clients and subscriptions.
|
|
func Init() {
|
|
clients = make(map[uint64]*Client)
|
|
subscriptions = make(map[string]*Subscribers)
|
|
}
|
|
|
|
// NewClient creates a new client.
|
|
func NewClient(conn *websocket.Conn, addr string) *Client {
|
|
client := new(Client)
|
|
client.ID = atomic.AddUint64(&nextClientID, 1)
|
|
client.conn = conn
|
|
client.WCh = make(chan interface{}, chanBufSize)
|
|
client.Repos = make(map[string]int64)
|
|
client.Alive = time.Now()
|
|
client.Addr = addr
|
|
client.ConnCloser = z.NewCloser(0)
|
|
|
|
return client
|
|
}
|
|
|
|
// Register adds the client to the list of clients.
|
|
func RegisterClient(client *Client) {
|
|
clientsMutex.Lock()
|
|
clients[client.ID] = client
|
|
clientsMutex.Unlock()
|
|
}
|
|
|
|
// Unregister deletes the client from the list of clients.
|
|
func UnregisterClient(client *Client) {
|
|
clientsMutex.Lock()
|
|
delete(clients, client.ID)
|
|
clientsMutex.Unlock()
|
|
}
|
|
|
|
func newSubscribers(client *Client) *Subscribers {
|
|
subscribers := new(Subscribers)
|
|
subscribers.Clients = make(map[uint64]*Client)
|
|
subscribers.Clients[client.ID] = client
|
|
|
|
return subscribers
|
|
}
|