mirror of
https://github.com/haiwen/seafile-server.git
synced 2025-04-28 03:20:10 +00:00
Fix crash when concurrent close channel (#612)
* Fix crash when concurrent close channel * Add ErrCh to notify client's goruntine to exit * Add WaitGroup to wait for all goruntine to exit * Call Signal on error --------- Co-authored-by: 杨赫然 <heran.yang@seafile.com>
This commit is contained in:
parent
30f26ef24f
commit
54ecfbee42
@ -51,10 +51,6 @@ func (*myClaims) Valid() error {
|
||||
|
||||
func (client *Client) Close() {
|
||||
client.conn.Close()
|
||||
if !client.ConnClosed {
|
||||
close(client.WCh)
|
||||
}
|
||||
client.ConnClosed = true
|
||||
}
|
||||
|
||||
func RecoverWrapper(f func()) {
|
||||
@ -69,39 +65,48 @@ func RecoverWrapper(f func()) {
|
||||
|
||||
// HandleMessages connects to the client to process message.
|
||||
func (client *Client) HandleMessages() {
|
||||
|
||||
go RecoverWrapper(client.readMessages)
|
||||
go RecoverWrapper(client.writeMessages)
|
||||
go RecoverWrapper(client.checkTokenExpired)
|
||||
|
||||
// Set keep alive.
|
||||
client.conn.SetPongHandler(func(string) error {
|
||||
client.Alive = time.Now()
|
||||
return nil
|
||||
})
|
||||
go client.keepAlive()
|
||||
|
||||
client.ConnCloser.AddRunning(4)
|
||||
go RecoverWrapper(client.readMessages)
|
||||
go RecoverWrapper(client.writeMessages)
|
||||
go RecoverWrapper(client.checkTokenExpired)
|
||||
go RecoverWrapper(client.keepAlive)
|
||||
client.ConnCloser.Wait()
|
||||
client.Close()
|
||||
UnregisterClient(client)
|
||||
for id := range client.Repos {
|
||||
client.unsubscribe(id)
|
||||
}
|
||||
}
|
||||
|
||||
func (client *Client) readMessages() {
|
||||
conn := client.conn
|
||||
defer func() {
|
||||
client.Close()
|
||||
UnregisterClient(client)
|
||||
for id := range client.Repos {
|
||||
client.unsubscribe(id)
|
||||
}
|
||||
client.ConnCloser.Done()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-client.ConnCloser.HasBeenClosed():
|
||||
return
|
||||
default:
|
||||
}
|
||||
var msg Message
|
||||
err := conn.ReadJSON(&msg)
|
||||
if err != nil {
|
||||
client.ConnCloser.Signal()
|
||||
log.Debugf("failed to read json data from client: %s: %v", client.Addr, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = client.handleMessage(&msg)
|
||||
if err != nil {
|
||||
client.ConnCloser.Signal()
|
||||
log.Debugf("%v", err)
|
||||
return
|
||||
}
|
||||
@ -207,67 +212,86 @@ func (client *Client) unsubscribe(repoID string) {
|
||||
}
|
||||
|
||||
func (client *Client) writeMessages() {
|
||||
defer client.Close()
|
||||
for msg := range client.WCh {
|
||||
client.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
client.connMutex.Lock()
|
||||
err := client.conn.WriteJSON(msg)
|
||||
client.connMutex.Unlock()
|
||||
if err != nil {
|
||||
log.Debugf("failed to send notification to client: %v", err)
|
||||
defer func() {
|
||||
client.ConnCloser.Done()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg := <-client.WCh:
|
||||
client.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
client.connMutex.Lock()
|
||||
err := client.conn.WriteJSON(msg)
|
||||
client.connMutex.Unlock()
|
||||
if err != nil {
|
||||
client.ConnCloser.Signal()
|
||||
log.Debugf("failed to send notification to client: %v", err)
|
||||
return
|
||||
}
|
||||
m, _ := msg.(*Message)
|
||||
log.Debugf("send %s event to client %s(%d): %s", m.Type, client.User, client.ID, string(m.Content))
|
||||
case <-client.ConnCloser.HasBeenClosed():
|
||||
return
|
||||
}
|
||||
m, _ := msg.(*Message)
|
||||
log.Debugf("send %s event to client %s(%d): %s", m.Type, client.User, client.ID, string(m.Content))
|
||||
}
|
||||
}
|
||||
|
||||
func (client *Client) keepAlive() {
|
||||
defer client.Close()
|
||||
defer func() {
|
||||
client.ConnCloser.Done()
|
||||
}()
|
||||
|
||||
ticker := time.NewTicker(pingPeriod)
|
||||
for {
|
||||
<-ticker.C
|
||||
if client.ConnClosed {
|
||||
return
|
||||
}
|
||||
if time.Since(client.Alive) > pongWait {
|
||||
log.Debugf("disconnected because no pong was received for more than %v", pongWait)
|
||||
return
|
||||
}
|
||||
client.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
client.connMutex.Lock()
|
||||
err := client.conn.WriteMessage(websocket.PingMessage, nil)
|
||||
client.connMutex.Unlock()
|
||||
if err != nil {
|
||||
log.Debugf("failed to send ping message to client: %v", err)
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if time.Since(client.Alive) > pongWait {
|
||||
client.ConnCloser.Signal()
|
||||
log.Debugf("disconnected because no pong was received for more than %v", pongWait)
|
||||
return
|
||||
}
|
||||
client.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
client.connMutex.Lock()
|
||||
err := client.conn.WriteMessage(websocket.PingMessage, nil)
|
||||
client.connMutex.Unlock()
|
||||
if err != nil {
|
||||
client.ConnCloser.Signal()
|
||||
log.Debugf("failed to send ping message to client: %v", err)
|
||||
return
|
||||
}
|
||||
case <-client.ConnCloser.HasBeenClosed():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (client *Client) checkTokenExpired() {
|
||||
defer func() {
|
||||
client.ConnCloser.Done()
|
||||
}()
|
||||
|
||||
ticker := time.NewTicker(checkTokenPeriod)
|
||||
for {
|
||||
<-ticker.C
|
||||
if client.ConnClosed {
|
||||
return
|
||||
}
|
||||
|
||||
// unsubscribe will delete repo from client.Repos, we'd better unsubscribe repos later.
|
||||
pendingRepos := make(map[string]struct{})
|
||||
now := time.Now()
|
||||
client.ReposMutex.Lock()
|
||||
for repoID, exp := range client.Repos {
|
||||
if exp >= now.Unix() {
|
||||
continue
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// unsubscribe will delete repo from client.Repos, we'd better unsubscribe repos later.
|
||||
pendingRepos := make(map[string]struct{})
|
||||
now := time.Now()
|
||||
client.ReposMutex.Lock()
|
||||
for repoID, exp := range client.Repos {
|
||||
if exp >= now.Unix() {
|
||||
continue
|
||||
}
|
||||
pendingRepos[repoID] = struct{}{}
|
||||
}
|
||||
pendingRepos[repoID] = struct{}{}
|
||||
}
|
||||
client.ReposMutex.Unlock()
|
||||
client.ReposMutex.Unlock()
|
||||
|
||||
for repoID := range pendingRepos {
|
||||
client.unsubscribe(repoID)
|
||||
client.notifJWTExpired(repoID)
|
||||
for repoID := range pendingRepos {
|
||||
client.unsubscribe(repoID)
|
||||
client.notifJWTExpired(repoID)
|
||||
}
|
||||
case <-client.ConnCloser.HasBeenClosed():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3,14 +3,19 @@ module github.com/haiwen/seafile-server/notification-server
|
||||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/dgraph-io/ristretto v0.1.0
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||
github.com/go-sql-driver/mysql v1.5.0
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
github.com/haiwen/seafile-server/fileserver v0.0.0-20220114093911-524f227b02cc
|
||||
github.com/mattn/go-sqlite3 v1.14.0
|
||||
github.com/sirupsen/logrus v1.8.1
|
||||
gopkg.in/ini.v1 v1.66.2
|
||||
)
|
||||
|
||||
require golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd // indirect
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.1.1 // indirect
|
||||
github.com/dustin/go-humanize v1.0.0 // indirect
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 //indirect
|
||||
)
|
||||
|
@ -1,40 +1,42 @@
|
||||
github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc=
|
||||
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
|
||||
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
|
||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI=
|
||||
github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
|
||||
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
|
||||
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
||||
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
|
||||
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
|
||||
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
|
||||
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
|
||||
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
|
||||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
||||
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
|
||||
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
|
||||
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
|
||||
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/haiwen/seafile-server/fileserver v0.0.0-20220114093911-524f227b02cc h1:oCQqZqWfcp1c7t9u1SgDMoTmCDrzy/Yhru8ihE8+rkY=
|
||||
github.com/haiwen/seafile-server/fileserver v0.0.0-20220114093911-524f227b02cc/go.mod h1:1m6GwwACvRncWbgV8rNnVfZ+UUCmIoX6y+ne6MwDdNQ=
|
||||
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
github.com/mattn/go-sqlite3 v1.14.0 h1:mLyGNKR8+Vv9CAU7PphKa2hkEqxxhn8i32J6FPj1/QA=
|
||||
github.com/mattn/go-sqlite3 v1.14.0/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
|
||||
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
|
||||
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
|
||||
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884=
|
||||
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
gopkg.in/ini.v1 v1.55.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0=
|
||||
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/ini.v1 v1.66.2 h1:XfR1dOYubytKy4Shzc2LHrrGhU0lDCfDGG1yLPmpgsI=
|
||||
gopkg.in/ini.v1 v1.66.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
@ -46,7 +46,7 @@ func loadNotifConfig() {
|
||||
|
||||
section, err := config.GetSection("notification")
|
||||
if err != nil {
|
||||
log.Fatal("No general section in seafile.conf.")
|
||||
log.Fatal("No notification section in seafile.conf.")
|
||||
}
|
||||
|
||||
host = "0.0.0.0"
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/dgraph-io/ristretto/z"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
@ -44,9 +45,8 @@ type Client struct {
|
||||
Repos map[string]int64
|
||||
ReposMutex sync.Mutex
|
||||
// Alive is the last time received pong.
|
||||
Alive time.Time
|
||||
// ConnClosed indicates whether the client's connection has been closed
|
||||
ConnClosed bool
|
||||
Alive time.Time
|
||||
ConnCloser *z.Closer
|
||||
// Addr is the address of client.
|
||||
Addr string
|
||||
// User is the user of client.
|
||||
@ -75,6 +75,7 @@ func NewClient(conn *websocket.Conn, addr string) *Client {
|
||||
client.Repos = make(map[string]int64)
|
||||
client.Alive = time.Now()
|
||||
client.Addr = addr
|
||||
client.ConnCloser = z.NewCloser(0)
|
||||
|
||||
return client
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user