1
0
mirror of https://github.com/haiwen/seafile-server.git synced 2025-04-27 19:15:07 +00:00

Add notification server (#535)

* Add notification server of go

Modify path of pkg

Send notification for update-repo event

Delete client pkg and use reflect select to send message

Modify output of log

Add formatter of log

Add jwt authentication

go add get jwt token api

CI support compile libjwt

Get group users from database

* Add ping to test mysql is alive

Co-authored-by: 杨赫然 <heran.yang@seafile.com>
This commit is contained in:
feiniks 2022-12-16 15:29:01 +08:00 committed by GitHub
parent 40b59d56a9
commit d6f6127641
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1737 additions and 11 deletions

View File

@ -178,17 +178,43 @@ class Libevhtp(Project):
for cmd in cmds:
shell(cmd)
class Libjwt(Project):
def __init__(self):
super(Libjwt, self).__init__('libjwt')
def branch(self):
return 'v1.13.1'
@property
def url(self):
return 'https://www.github.com/benmcollins/libjwt.git'
@chdir
def compile_and_install(self):
cmds = [
'autoreconf -i',
'./configure',
'sudo make all',
'sudo make install',
]
for cmd in cmds:
shell(cmd)
def fetch_and_build():
libsearpc = Libsearpc()
libjwt = Libjwt()
libevhtp = Libevhtp()
ccnet = CcnetServer()
seafile = SeafileServer()
libsearpc.clone()
libjwt.clone()
libevhtp.clone()
ccnet.clone()
libsearpc.compile_and_install()
libjwt.compile_and_install()
libevhtp.compile_and_install()
seafile.compile_and_install()

View File

@ -10,6 +10,10 @@
#include "seafile-session.h"
#ifdef FULL_FEATURE
#include "notif-mgr.h"
#endif
#include "branch-mgr.h"
#define BRANCH_DB "branch.db"
@ -323,16 +327,44 @@ publish_repo_update_event (const char *repo_id, const char *commit_id)
seaf_mq_manager_publish_event (seaf->mq_mgr, SEAFILE_SERVER_CHANNEL_EVENT, buf);
}
static void
on_branch_updated (SeafBranchManager *mgr, SeafBranch *branch)
{
seaf_repo_manager_update_repo_info (seaf->repo_mgr, branch->repo_id, branch->commit_id);
if (seaf_repo_manager_is_virtual_repo (seaf->repo_mgr, branch->repo_id))
return;
static void
notify_repo_update (const char *repo_id, const char *commit_id)
{
json_t *event = NULL;
json_t *content = NULL;
char *msg = NULL;
publish_repo_update_event (branch->repo_id, branch->commit_id);
}
event = json_object ();
content = json_object ();
json_object_set_new (event, "type", json_string("repo-update"));
json_object_set_new (content, "repo_id", json_string(repo_id));
json_object_set_new (content, "commit_id", json_string(commit_id));
json_object_set_new (event, "content", content);
msg = json_dumps (event, JSON_COMPACT);
if (seaf->notif_mgr)
seaf_notif_manager_send_event (seaf->notif_mgr, msg);
json_decref (event);
g_free (msg);
}
static void
on_branch_updated (SeafBranchManager *mgr, SeafBranch *branch)
{
seaf_repo_manager_update_repo_info (seaf->repo_mgr, branch->repo_id, branch->commit_id);
notify_repo_update(branch->repo_id, branch->commit_id);
if (seaf_repo_manager_is_virtual_repo (seaf->repo_mgr, branch->repo_id))
return;
publish_repo_update_event (branch->repo_id, branch->commit_id);
}
int
seaf_branch_manager_test_and_update_branch (SeafBranchManager *mgr,

View File

@ -260,6 +260,14 @@ if test "${compile_ldap}" = "yes"; then
fi
PKG_CHECK_MODULES(CURL, [libcurl >= $CURL_REQUIRED])
AC_SUBST(CURL_CFLAGS)
AC_SUBST(CURL_LIBS)
PKG_CHECK_MODULES(JWT, [libjwt])
AC_SUBST(JWT_CFLAGS)
AC_SUBST(JWT_LIBS)
if test x${compile_python} = xyes; then
AM_PATH_PYTHON([2.6])
if test "$bwin32" = true; then

View File

@ -40,6 +40,7 @@ var logFp *os.File
var dbType string
var groupTableName string
var cloudMode bool
var privateKey string
var seafileDB, ccnetDB *sql.DB
// when SQLite is used, user and group db are separated.
@ -299,6 +300,12 @@ func loadFileServerOptions() {
}
}
if section, err := config.GetSection("notification"); err == nil {
if key, err := section.GetKey("private_key"); err == nil {
privateKey = key.String()
}
}
initDefaultOptions()
if section, err := config.GetSection("httpserver"); err == nil {
@ -621,6 +628,8 @@ func newHTTPRouter() *mux.Router {
appHandler(recvFSCB))
r.Handle("/repo/{repoid:[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}}/quota-check{slash:\\/?}",
appHandler(getCheckQuotaCB))
r.Handle("/repo/{repoid:[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}}/jwt-token{slash:\\/?}",
appHandler(getJWTTokenCB))
// seadrive api
r.Handle("/repo/{repoid:[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}}/block-map/{id:[\\da-z]{40}}",

View File

@ -4,6 +4,7 @@ go 1.17
require (
github.com/dgraph-io/ristretto v0.1.1
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/go-sql-driver/mysql v1.5.0
github.com/google/uuid v1.1.1
github.com/gorilla/mux v1.7.4

View File

@ -11,6 +11,8 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=

View File

@ -17,6 +17,8 @@ import (
"sync"
"time"
jwt "github.com/dgrijalva/jwt-go"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/haiwen/seafile-server/fileserver/blockmgr"
"github.com/haiwen/seafile-server/fileserver/commitmgr"
@ -710,6 +712,54 @@ func getCheckQuotaCB(rsp http.ResponseWriter, r *http.Request) *appError {
return nil
}
type MyClaims struct {
Exp int64
RepoID string `json:"repo_id"`
UserName string `json:"username"`
}
func (*MyClaims) Valid() error {
return nil
}
func getJWTTokenCB(rsp http.ResponseWriter, r *http.Request) *appError {
vars := mux.Vars(r)
repoID := vars["repoid"]
if privateKey == "" {
return nil
}
user, appErr := validateToken(r, repoID, false)
if appErr != nil {
return appErr
}
claims := MyClaims{
time.Now().Add(time.Hour * 72).Unix(),
repoID,
user,
}
token := jwt.NewWithClaims(jwt.GetSigningMethod("HS256"), &claims)
tokenString, err := token.SignedString([]byte(privateKey))
if err != nil {
err := fmt.Errorf("failed to gen jwt token for repo %s", repoID)
return &appError{err, "", http.StatusInternalServerError}
}
data := fmt.Sprintf("{\"jwt_token\":\"%s\"}", tokenString)
rsp.Write([]byte(data))
return nil
}
func isValidUUID(u string) bool {
_, err := uuid.Parse(u)
return err == nil
}
func getFsObjIDCB(rsp http.ResponseWriter, r *http.Request) *appError {
recvChan := make(chan *calResult)

View File

@ -0,0 +1,7 @@
[Database]
ENGINE = mysql
HOST = 127.0.0.1
USER = seafile
PASSWD = seafile
DB = ccnet-db
CREATE_TABLES=true

View File

@ -0,0 +1,267 @@
package main
import (
"encoding/json"
"fmt"
"time"
"github.com/dgrijalva/jwt-go"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)
const (
writeWait = 1 * time.Second
pongWait = 5 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = 1 * time.Second
checkTokenPeriod = 1 * time.Hour
)
// Message is the message communicated between clients and server.
type Message struct {
Type string `json:"type"`
Content json.RawMessage `json:"content"`
}
type SubList struct {
Repos []Repo `json:"repos"`
}
type UnsubList struct {
Repos []string `json:"repos"`
}
type Repo struct {
RepoID string `json:"id"`
Token string `json:"jwt_token"`
}
type myClaims struct {
Exp int64
RepoID string `json:"repo_id"`
UserName string `json:"username"`
}
func (*myClaims) Valid() error {
return nil
}
func (client *Client) Close() {
client.conn.Close()
if !client.ConnClosed {
close(client.WCh)
}
client.ConnClosed = true
}
// HandleMessages connects to the client to process message.
func (client *Client) HandleMessages() {
go client.readMessages()
go client.writeMessages()
go client.checkTokenExpired()
// Set keep alive.
client.conn.SetPongHandler(func(string) error {
client.Alive = time.Now()
return nil
})
go client.keepAlive()
}
func (client *Client) readMessages() {
conn := client.conn
defer func() {
client.Close()
UnregisterClient(client)
for id := range client.Repos {
client.unsubscribe(id)
}
}()
for {
var msg Message
err := conn.ReadJSON(&msg)
if err != nil {
log.Debugf("failed to read json data from client: %s: %v", client.Addr, err)
return
}
err = client.handleMessage(&msg)
if err != nil {
log.Debugf("%v", err)
return
}
}
}
func checkToken(tokenString, repoID string) (string, int64, bool) {
if len(tokenString) == 0 {
return "", -1, false
}
claims := new(myClaims)
token, err := jwt.ParseWithClaims(tokenString, claims, func(token *jwt.Token) (interface{}, error) {
return []byte(privateKey), nil
})
if err != nil {
return "", -1, false
}
if !token.Valid {
return "", -1, false
}
now := time.Now()
if claims.RepoID != repoID || claims.Exp <= now.Unix() {
return "", -1, false
}
return claims.UserName, claims.Exp, true
}
func (client *Client) handleMessage(msg *Message) error {
content := msg.Content
if msg.Type == "subscribe" {
var list SubList
err := json.Unmarshal(content, &list)
if err != nil {
return err
}
for _, repo := range list.Repos {
user, exp, valid := checkToken(repo.Token, repo.RepoID)
if !valid {
client.notifJWTExpired(repo.RepoID)
continue
}
client.subscribe(repo.RepoID, user, exp)
}
} else if msg.Type == "unsubscribe" {
var list UnsubList
err := json.Unmarshal(content, &list)
if err != nil {
return err
}
for _, id := range list.Repos {
client.unsubscribe(id)
}
} else {
err := fmt.Errorf("recv unexpected type of message: %s", msg.Type)
return err
}
return nil
}
// subscribe subscribes to notifications of repos.
func (client *Client) subscribe(repoID, user string, exp int64) {
client.User = user
client.ReposMutex.Lock()
client.Repos[repoID] = exp
client.ReposMutex.Unlock()
subMutex.Lock()
subscribers, ok := subscriptions[repoID]
if !ok {
subscribers = newSubscribers(client)
subscriptions[repoID] = subscribers
}
subMutex.Unlock()
subscribers.Mutex.Lock()
subscribers.Clients[client.ID] = client
subscribers.Mutex.Unlock()
}
func (client *Client) unsubscribe(repoID string) {
client.ReposMutex.Lock()
delete(client.Repos, repoID)
client.ReposMutex.Unlock()
subMutex.Lock()
subscribers, ok := subscriptions[repoID]
if !ok {
subMutex.Unlock()
return
}
subMutex.Unlock()
subscribers.Mutex.Lock()
delete(subscribers.Clients, client.ID)
subscribers.Mutex.Unlock()
}
func (client *Client) writeMessages() {
defer client.Close()
for msg := range client.WCh {
client.conn.SetWriteDeadline(time.Now().Add(writeWait))
client.connMutex.Lock()
err := client.conn.WriteJSON(msg)
client.connMutex.Unlock()
if err != nil {
log.Debugf("failed to send notification to client: %v", err)
return
}
}
}
func (client *Client) keepAlive() {
defer client.Close()
ticker := time.NewTicker(pingPeriod)
for {
<-ticker.C
if client.ConnClosed {
return
}
if time.Since(client.Alive) > pongWait {
log.Debugf("disconnected because no pong was received for more than %v", pongWait)
return
}
client.conn.SetWriteDeadline(time.Now().Add(writeWait))
client.connMutex.Lock()
err := client.conn.WriteMessage(websocket.PingMessage, nil)
client.connMutex.Unlock()
if err != nil {
log.Debugf("failed to send ping message to client: %v", err)
return
}
}
}
func (client *Client) checkTokenExpired() {
ticker := time.NewTicker(checkTokenPeriod)
for {
<-ticker.C
if client.ConnClosed {
return
}
// unsubscribe will delete repo from client.Repos, we'd better unsubscribe repos later.
pendingRepos := make(map[string]struct{})
now := time.Now()
client.ReposMutex.Lock()
for repoID, exp := range client.Repos {
if exp >= now.Unix() {
continue
}
pendingRepos[repoID] = struct{}{}
}
client.ReposMutex.Unlock()
for repoID := range pendingRepos {
client.unsubscribe(repoID)
client.notifJWTExpired(repoID)
}
}
}
func (client *Client) notifJWTExpired(repoID string) {
msg := new(Message)
msg.Type = "jwt-expired"
content := fmt.Sprintf("{\"repo_id\":\"%s\"}", repoID)
msg.Content = []byte(content)
client.WCh <- msg
}

View File

@ -0,0 +1,149 @@
package main
import (
"encoding/json"
"reflect"
log "github.com/sirupsen/logrus"
)
type RepoUpdateEvent struct {
RepoID string `json:"repo_id"`
CommitID string `json:"commit_id"`
}
type FileLockEvent struct {
RepoID string `json:"repo_id"`
Path string `json:"path"`
ChangeEvent string `json:"change_event"`
LockUser string `json:"lock_user"`
}
type FolderPermEvent struct {
RepoID string `json:"repo_id"`
Path string `json:"path"`
Type string `json:"type"`
ChangeEvent string `json:"change_event"`
User string `json:"user"`
Group int `json:"group"`
Perm string `json:"perm"`
}
func Notify(msg *Message) {
var repoID string
// userList is the list of users who need to be notified, if it is nil, all subscribed users will be notified.
var userList map[string]struct{}
content := msg.Content
switch msg.Type {
case "repo-update":
var event RepoUpdateEvent
err := json.Unmarshal(content, &event)
if err != nil {
log.Warn(err)
return
}
repoID = event.RepoID
case "file-lock-changed":
var event FileLockEvent
err := json.Unmarshal(content, &event)
if err != nil {
log.Warn(err)
return
}
repoID = event.RepoID
case "folder-perm-changed":
var event FolderPermEvent
err := json.Unmarshal(content, &event)
if err != nil {
log.Warn(err)
return
}
repoID = event.RepoID
if event.User != "" {
userList = make(map[string]struct{})
userList[event.User] = struct{}{}
} else if event.Group != -1 {
userList = getGroupMembers(event.Group)
}
default:
return
}
clients := make(map[uint64]*Client)
subMutex.RLock()
subscribers := subscriptions[repoID]
if subscribers == nil {
subMutex.RUnlock()
return
}
subMutex.RUnlock()
subscribers.Mutex.RLock()
for clientID, client := range subscribers.Clients {
clients[clientID] = client
}
subscribers.Mutex.RUnlock()
go func() {
// In order to avoid being blocked on a Client for a long time, it is necessary to write WCh in a non-blocking way,
// and the waiting WCh needs to be blocked and processed after other Clients have finished writing.
value := reflect.ValueOf(msg)
var branches []reflect.SelectCase
for _, client := range clients {
if !needToNotif(userList, client.User) {
continue
}
branch := reflect.SelectCase{Dir: reflect.SelectSend, Chan: reflect.ValueOf(client.WCh), Send: value}
branches = append(branches, branch)
}
for len(branches) != 0 {
index, _, _ := reflect.Select(branches)
branches = append(branches[:index], branches[index+1:]...)
}
}()
}
func getGroupMembers(group int) map[string]struct{} {
query := `SELECT user_name FROM GroupUser WHERE group_id = ?`
stmt, err := ccnetDB.Prepare(query)
if err != nil {
log.Printf("failed to prepare sql: %s%v", query, err)
return nil
}
defer stmt.Close()
rows, err := stmt.Query(group)
if err != nil {
log.Printf("failed to query sql: %v", err)
return nil
}
defer rows.Close()
userList := make(map[string]struct{})
var userName string
for rows.Next() {
if err := rows.Scan(&userName); err == nil {
userList[userName] = struct{}{}
}
}
if err := rows.Err(); err != nil {
log.Printf("failed to scan sql rows: %v", err)
return nil
}
return userList
}
func needToNotif(userList map[string]struct{}, user string) bool {
if userList == nil {
return true
}
_, ok := userList[user]
return ok
}

View File

@ -0,0 +1,16 @@
module github.com/haiwen/seafile-server/notification-server
go 1.17
require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/go-sql-driver/mysql v1.5.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/haiwen/seafile-server/fileserver v0.0.0-20220114093911-524f227b02cc
github.com/mattn/go-sqlite3 v1.14.0
github.com/sirupsen/logrus v1.8.1
gopkg.in/ini.v1 v1.66.2
)
require golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd // indirect

View File

@ -0,0 +1,40 @@
github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/haiwen/seafile-server/fileserver v0.0.0-20220114093911-524f227b02cc h1:oCQqZqWfcp1c7t9u1SgDMoTmCDrzy/Yhru8ihE8+rkY=
github.com/haiwen/seafile-server/fileserver v0.0.0-20220114093911-524f227b02cc/go.mod h1:1m6GwwACvRncWbgV8rNnVfZ+UUCmIoX6y+ne6MwDdNQ=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/mattn/go-sqlite3 v1.14.0 h1:mLyGNKR8+Vv9CAU7PphKa2hkEqxxhn8i32J6FPj1/QA=
github.com/mattn/go-sqlite3 v1.14.0/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
gopkg.in/ini.v1 v1.55.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.66.2 h1:XfR1dOYubytKy4Shzc2LHrrGhU0lDCfDGG1yLPmpgsI=
gopkg.in/ini.v1 v1.66.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=

View File

@ -0,0 +1,19 @@
package main
import (
log "github.com/sirupsen/logrus"
)
const (
timestampFormat = "2006/01/02 15:04:05 "
)
type LogFormatter struct{}
func (f *LogFormatter) Format(entry *log.Entry) ([]byte, error) {
buf := make([]byte, 0, len(timestampFormat)+len(entry.Message)+1)
buf = entry.Time.AppendFormat(buf, timestampFormat)
buf = append(buf, entry.Message...)
buf = append(buf, '\n')
return buf, nil
}

View File

@ -0,0 +1,6 @@
[general]
host = 0.0.0.0
port = 8083
log_level = info
private_key = "my primary key"
notification_token = zzzz

View File

@ -0,0 +1,287 @@
package main
import (
"database/sql"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
_ "github.com/go-sql-driver/mysql"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
"gopkg.in/ini.v1"
)
var configDir string
var logFile, absLogFile string
var privateKey string
var notifToken string
var host string
var port uint32
var ccnetDB *sql.DB
func init() {
flag.StringVar(&configDir, "c", "", "config directory")
flag.StringVar(&logFile, "l", "", "log file path")
log.SetFormatter(&LogFormatter{})
}
func loadNotifConfig() {
notifyConfPath := filepath.Join(configDir, "notification.conf")
config, err := ini.Load(notifyConfPath)
if err != nil {
log.Fatalf("Failed to load notification.conf: %v", err)
}
section, err := config.GetSection("general")
if err != nil {
log.Fatal("No general section in seafile.conf.")
}
host = "0.0.0.0"
port = 8083
logLevel := "info"
if key, err := section.GetKey("host"); err == nil {
host = key.String()
}
if key, err := section.GetKey("port"); err == nil {
n, err := key.Uint()
if err == nil {
port = uint32(n)
}
}
if key, err := section.GetKey("log_level"); err == nil {
logLevel = key.String()
}
if key, err := section.GetKey("private_key"); err == nil {
privateKey = key.String()
}
if key, err := section.GetKey("notification_token"); err == nil {
notifToken = key.String()
}
level, err := log.ParseLevel(logLevel)
if err != nil {
log.Info("use the default log level: info")
log.SetLevel(log.InfoLevel)
} else {
log.SetLevel(level)
}
}
func loadCcnetDB() {
ccnetConfPath := filepath.Join(configDir, "ccnet.conf")
config, err := ini.Load(ccnetConfPath)
if err != nil {
log.Fatalf("Failed to load ccnet.conf: %v", err)
}
section, err := config.GetSection("Database")
if err != nil {
log.Fatal("No database section in ccnet.conf.")
}
var dbEngine string = "mysql"
key, err := section.GetKey("ENGINE")
if err == nil {
dbEngine = key.String()
}
if !strings.EqualFold(dbEngine, "mysql") {
log.Fatalf("Unsupported database %s.", dbEngine)
}
if key, err = section.GetKey("HOST"); err != nil {
log.Fatal("No database host in ccnet.conf.")
}
host := key.String()
if key, err = section.GetKey("USER"); err != nil {
log.Fatal("No database user in ccnet.conf.")
}
user := key.String()
if key, err = section.GetKey("PASSWD"); err != nil {
log.Fatal("No database password in ccnet.conf.")
}
password := key.String()
if key, err = section.GetKey("DB"); err != nil {
log.Fatal("No database db_name in ccnet.conf.")
}
dbName := key.String()
port := 3306
if key, err = section.GetKey("PORT"); err == nil {
port, _ = key.Int()
}
unixSocket := ""
if key, err = section.GetKey("UNIX_SOCKET"); err == nil {
unixSocket = key.String()
}
useTLS := false
if key, err = section.GetKey("USE_SSL"); err == nil {
useTLS, _ = key.Bool()
}
var dsn string
if unixSocket == "" {
dsn = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?tls=%t", user, password, host, port, dbName, useTLS)
} else {
dsn = fmt.Sprintf("%s:%s@unix(%s)/%s", user, password, unixSocket, dbName)
}
ccnetDB, err = sql.Open("mysql", dsn)
if err != nil {
log.Fatalf("Failed to open database: %v", err)
}
if err := ccnetDB.Ping(); err != nil {
log.Fatalf("Failed to connected to mysql: %v", err)
}
}
func main() {
flag.Parse()
if configDir == "" {
log.Fatal("config directory must be specified.")
}
_, err := os.Stat(configDir)
if os.IsNotExist(err) {
log.Fatalf("config directory %s doesn't exist: %v.", configDir, err)
}
if logFile == "" {
absLogFile = filepath.Join(configDir, "notification.log")
fp, err := os.OpenFile(absLogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
if err != nil {
log.Fatalf("Failed to open or create log file: %v", err)
}
log.SetOutput(fp)
} else if logFile != "-" {
absLogFile, err = filepath.Abs(logFile)
if err != nil {
log.Fatalf("Failed to convert log file path to absolute path: %v", err)
}
fp, err := os.OpenFile(absLogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
if err != nil {
log.Fatalf("Failed to open or create log file: %v", err)
}
log.SetOutput(fp)
}
loadNotifConfig()
loadCcnetDB()
Init()
router := newHTTPRouter()
log.Info("notification server started.")
addr := fmt.Sprintf("%s:%d", host, port)
err = http.ListenAndServe(addr, router)
if err != nil {
log.Info("notificationserver exiting: %v", err)
}
}
func newHTTPRouter() *mux.Router {
r := mux.NewRouter()
r.Handle("/", appHandler(messageCB))
r.Handle("/events{slash:\\/?}", appHandler(eventCB))
return r
}
// Any http request will be automatically upgraded to websocket.
func messageCB(rsp http.ResponseWriter, r *http.Request) *appError {
upgrader := newUpgrader()
conn, err := upgrader.Upgrade(rsp, r, nil)
if err != nil {
err := fmt.Errorf("failed to upgrade http to websocket: %v", err)
return &appError{Error: err,
Message: "",
Code: http.StatusInternalServerError,
}
}
addr := r.Header.Get("x-forwarded-for")
if addr == "" {
addr = conn.RemoteAddr().String()
}
client := NewClient(conn, addr)
RegisterClient(client)
client.HandleMessages()
return nil
}
func eventCB(rsp http.ResponseWriter, r *http.Request) *appError {
msg := Message{}
token := r.Header.Get("Seafile-Repo-Token")
if token != notifToken {
return &appError{Error: nil,
Message: "Notification token not match",
Code: http.StatusBadRequest,
}
}
body, err := io.ReadAll(r.Body)
if err != nil {
return &appError{Error: err,
Message: "",
Code: http.StatusInternalServerError,
}
}
if err := json.Unmarshal(body, &msg); err != nil {
return &appError{Error: err,
Message: "",
Code: http.StatusInternalServerError,
}
}
Notify(&msg)
return nil
}
func newUpgrader() *websocket.Upgrader {
upgrader := &websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
return upgrader
}
type appError struct {
Error error
Message string
Code int
}
type appHandler func(http.ResponseWriter, *http.Request) *appError
func (fn appHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
e := fn(w, r)
if e != nil {
if e.Error != nil && e.Code == http.StatusInternalServerError {
log.Infof("path %s internal server error: %v\n", r.URL.Path, e.Error)
}
http.Error(w, e.Message, e.Code)
}
}

View File

@ -0,0 +1,102 @@
package main
import (
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
)
const (
chanBufSize = 10
)
// clients is a map from client id to Client structs.
// It contains all current connected clients. Each client is identified by 64-bit ID.
var clients map[uint64]*Client
var clientsMutex sync.RWMutex
// Use atomic operation to increase this value.
var nextClientID uint64 = 1
// subscriptions is a map from repo_id to Subscribers struct.
// It's protected by rw mutex.
var subscriptions map[string]*Subscribers
var subMutex sync.RWMutex
// Client contains information about a client.
// Two go routines are associated with each client to handle message reading and writting.
// Messages sent to the client have to be written into WCh, since only one go routine can write to a websocket connection.
type Client struct {
// The ID of this client
ID uint64
// Websocket connection.
conn *websocket.Conn
// Connections do not support concurrent writers. Protect write with a mutex.
connMutex sync.Mutex
// WCh is used to write messages to a client.
// The structs written into the channel will be converted to JSON and sent to client.
WCh chan interface{}
// Repos is the repos this client subscribed to.
Repos map[string]int64
ReposMutex sync.Mutex
// Alive is the last time received pong.
Alive time.Time
// ConnClosed indicates whether the client's connection has been closed
ConnClosed bool
// Addr is the address of client.
Addr string
// User is the user of client.
User string
}
// Subscribers contains the clients who subscribe to a repo's notifications.
type Subscribers struct {
// Clients is a map from client id to Client struct, protected by rw mutex.
Clients map[uint64]*Client
Mutex sync.RWMutex
}
// Init inits clients and subscriptions.
func Init() {
clients = make(map[uint64]*Client)
subscriptions = make(map[string]*Subscribers)
}
// NewClient creates a new client.
func NewClient(conn *websocket.Conn, addr string) *Client {
client := new(Client)
client.ID = atomic.AddUint64(&nextClientID, 1)
client.conn = conn
client.WCh = make(chan interface{}, chanBufSize)
client.Repos = make(map[string]int64)
client.Alive = time.Now()
client.Addr = addr
return client
}
// Register adds the client to the list of clients.
func RegisterClient(client *Client) {
clientsMutex.Lock()
clients[client.ID] = client
clientsMutex.Unlock()
}
// Unregister deletes the client from the list of clients.
func UnregisterClient(client *Client) {
clientsMutex.Lock()
delete(clients, client.ID)
clientsMutex.Unlock()
}
func newSubscribers(client *Client) *Subscribers {
subscribers := new(Subscribers)
subscribers.Clients = make(map[uint64]*Client)
subscribers.Clients[client.ID] = client
return subscribers
}

View File

@ -34,7 +34,9 @@ noinst_HEADERS = web-accesstoken-mgr.h seafile-session.h \
../common/user-mgr.h \
../common/group-mgr.h \
../common/org-mgr.h \
index-blocks-mgr.h
index-blocks-mgr.h \
http-tx-mgr.h \
notif-mgr.h
seaf_server_SOURCES = \
seaf-server.c \
@ -54,6 +56,8 @@ seaf_server_SOURCES = \
access-file.c \
pack-dir.c \
fileserver-config.c \
http-tx-mgr.c \
notif-mgr.c \
../common/seaf-db.c \
../common/branch-mgr.c ../common/fs-mgr.c \
../common/config-mgr.c \
@ -81,4 +85,5 @@ seaf_server_LDADD = $(top_builddir)/lib/libseafile_common.la \
$(top_builddir)/common/cdc/libcdc.la \
@SEARPC_LIBS@ @JANSSON_LIBS@ ${LIB_WS32} @ZLIB_LIBS@ \
@LIBARCHIVE_LIBS@ @LIB_ICONV@ \
@LDAP_LIBS@ @MYSQL_LIBS@ -lsqlite3
@LDAP_LIBS@ @MYSQL_LIBS@ -lsqlite3 \
@CURL_LIBS@ @JWT_LIBS@

View File

@ -14,6 +14,8 @@
#include <evhtp.h>
#include <jwt.h>
#include "mq-mgr.h"
#include "utils.h"
#include "log.h"
@ -53,6 +55,7 @@
#define TOKEN_EXPIRE_TIME 7200 /* 2 hours */
#define PERM_EXPIRE_TIME 7200 /* 2 hours */
#define VIRINFO_EXPIRE_TIME 7200 /* 2 hours */
#define JWT_TOKEN_EXPIRE_TIME 3*24*3600 /* 3 days*/
#define FS_ID_LIST_MAX_WORKERS 3
#define FS_ID_LIST_TOKEN_LEN 36
@ -135,6 +138,7 @@ const char *POST_CHECK_BLOCK_REGEX = "^/repo/[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}
const char *POST_RECV_FS_REGEX = "^/repo/[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}/recv-fs";
const char *POST_PACK_FS_REGEX = "^/repo/[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}/pack-fs";
const char *GET_BLOCK_MAP_REGEX = "^/repo/[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}/block-map/[\\da-z]{40}";
const char *GET_JWT_TOKEN_REGEX = "^/repo/[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}/jwt-token";
//accessible repos
const char *GET_ACCESSIBLE_REPO_LIST_REGEX = "/accessible-repos";
@ -2438,6 +2442,96 @@ out:
g_strfreev (parts);
}
static char *
gen_jwt_token (const char *repo_id, const char *username)
{
char *jwt_token = NULL;
gint64 now = (gint64)time(NULL);
jwt_t *jwt = NULL;
if (!seaf->private_key) {
seaf_warning ("No private key is configured for generating jwt token\n");
return NULL;
}
int ret = jwt_new (&jwt);
if (ret != 0 || jwt == NULL) {
seaf_warning ("Failed to create jwt\n");
goto out;
}
ret = jwt_add_grant (jwt, "repo_id", repo_id);
if (ret != 0) {
seaf_warning ("Failed to add repo_id to jwt\n");
goto out;
}
ret = jwt_add_grant (jwt, "username", username);
if (ret != 0) {
seaf_warning ("Failed to add username to jwt\n");
goto out;
}
ret = jwt_add_grant_int (jwt, "exp", now + JWT_TOKEN_EXPIRE_TIME);
if (ret != 0) {
seaf_warning ("Failed to expire time to jwt\n");
goto out;
}
ret = jwt_set_alg (jwt, JWT_ALG_HS256, (unsigned char *)seaf->private_key, strlen(seaf->private_key));
if (ret != 0) {
seaf_warning ("Failed to set alg\n");
goto out;
}
jwt_token = jwt_encode_str (jwt);
out:
jwt_free (jwt);
return jwt_token;
}
static void
get_jwt_token_cb (evhtp_request_t *req, void *arg)
{
const char *repo_id = NULL;
HttpServer *htp_server = arg;
json_t *obj = NULL;
char *data = NULL;
char *username = NULL;
char *jwt_token = NULL;
char **parts = g_strsplit (req->uri->path->full + 1, "/", 0);
repo_id = parts[1];
int token_status = validate_token (htp_server, req, repo_id, &username, FALSE);
if (token_status != EVHTP_RES_OK) {
evhtp_send_reply (req, token_status);
goto out;
}
jwt_token = gen_jwt_token (repo_id, username);
if (!jwt_token) {
seaf_warning ("Failed to gen jwt token for repo %s\n", repo_id);
evhtp_send_reply (req, EVHTP_RES_SERVERR);
goto out;
}
obj = json_object ();
json_object_set_new (obj, "jwt_token", json_string (jwt_token));
data = json_dumps (obj, JSON_COMPACT);
evbuffer_add (req->buffer_out, data, strlen (data));
evhtp_send_reply (req, EVHTP_RES_OK);
out:
g_free (jwt_token);
g_free (username);
if (obj)
json_decref (obj);
if (data)
free (data);
g_strfreev (parts);
}
static json_t *
fill_obj_from_seafilerepo (SeafileRepo *srepo, GHashTable *table)
{
@ -2758,6 +2852,10 @@ http_request_init (HttpServerStruct *server)
GET_BLOCK_MAP_REGEX, get_block_map_cb,
priv);
evhtp_set_regex_cb (priv->evhtp,
GET_JWT_TOKEN_REGEX, get_jwt_token_cb,
priv);
evhtp_set_regex_cb (priv->evhtp,
GET_ACCESSIBLE_REPO_LIST_REGEX, get_accessible_repo_list_cb,
priv);

386
server/http-tx-mgr.c Normal file
View File

@ -0,0 +1,386 @@
#include "common.h"
#include <pthread.h>
#include <curl/curl.h>
#include <jansson.h>
#include <timer.h>
#include "seafile-session.h"
#include "http-tx-mgr.h"
#include "utils.h"
#include "seaf-db.h"
#include "seafile-error.h"
#define DEBUG_FLAG SEAFILE_DEBUG_TRANSFER
#include "log.h"
#ifndef SEAFILE_CLIENT_VERSION
#define SEAFILE_CLIENT_VERSION PACKAGE_VERSION
#endif
#ifdef WIN32
#define USER_AGENT_OS "Windows NT"
#endif
#ifdef __APPLE__
#define USER_AGENT_OS "Apple OS X"
#endif
#ifdef __linux__
#define USER_AGENT_OS "Linux"
#endif
/* Http connection and connection pool. */
struct _Connection {
CURL *curl;
gint64 ctime; /* Used to clean up unused connection. */
gboolean release; /* If TRUE, the connection will be released. */
};
struct _ConnectionPool {
GQueue *queue;
pthread_mutex_t lock;
};
static Connection *
connection_new ()
{
Connection *conn = g_new0 (Connection, 1);
if (!conn)
return NULL;
conn->curl = curl_easy_init();
conn->ctime = (gint64)time(NULL);
return conn;
}
static void
connection_free (Connection *conn)
{
if (!conn)
return;
curl_easy_cleanup (conn->curl);
g_free (conn);
}
ConnectionPool *
connection_pool_new ()
{
ConnectionPool *pool = g_new0 (ConnectionPool, 1);
if (!pool)
return NULL;
pool->queue = g_queue_new ();
pthread_mutex_init (&pool->lock, NULL);
return pool;
}
void
connection_pool_free (ConnectionPool *pool)
{
if (!pool)
return;
g_queue_free (pool->queue);
g_free (pool);
}
Connection *
connection_pool_get_connection (ConnectionPool *pool)
{
Connection *conn = NULL;
pthread_mutex_lock (&pool->lock);
conn = g_queue_pop_head (pool->queue);
if (!conn) {
conn = connection_new ();
}
pthread_mutex_unlock (&pool->lock);
return conn;
}
void
connection_pool_return_connection (ConnectionPool *pool, Connection *conn)
{
if (!conn)
return;
if (conn->release) {
connection_free (conn);
return;
}
curl_easy_reset (conn->curl);
pthread_mutex_lock (&pool->lock);
g_queue_push_tail (pool->queue, conn);
pthread_mutex_unlock (&pool->lock);
}
char*
http_code_to_str (int http_code)
{
switch (http_code) {
case HTTP_OK:
return "Successful";
case HTTP_BAD_REQUEST:
return "Bad request";
case HTTP_FORBIDDEN:
return "Permission denied";
case HTTP_NOT_FOUND:
return "Resource not found";
}
if (http_code >= HTTP_INTERNAL_SERVER_ERROR)
return "Internal server error";
return "Unknown error";
}
void
http_tx_manager_init ()
{
curl_global_init (CURL_GLOBAL_ALL);
}
typedef struct _HttpResponse {
char *content;
size_t size;
} HttpResponse;
static size_t
recv_response (void *contents, size_t size, size_t nmemb, void *userp)
{
size_t realsize = size * nmemb;
HttpResponse *rsp = userp;
rsp->content = g_realloc (rsp->content, rsp->size + realsize);
if (!rsp->content) {
seaf_warning ("Not enough memory.\n");
/* return a value other than realsize to signify an error. */
return 0;
}
memcpy (rsp->content + rsp->size, contents, realsize);
rsp->size += realsize;
return realsize;
}
#define HTTP_TIMEOUT_SEC 45
/*
* The @timeout parameter is for detecting network connection problems.
* The @timeout parameter should be set to TRUE for data-transfer-only operations,
* such as getting objects, blocks. For operations that requires calculations
* on the server side, the timeout should be set to FALSE. Otherwise when
* the server sometimes takes more than 45 seconds to calculate the result,
* the client will time out.
*/
int
http_get (Connection *conn, const char *url, const char *token,
int *rsp_status, char **rsp_content, gint64 *rsp_size,
HttpRecvCallback callback, void *cb_data,
gboolean timeout)
{
char *token_header;
struct curl_slist *headers = NULL;
int ret = 0;
CURL *curl;
curl = conn->curl;
headers = curl_slist_append (headers, "User-Agent: Seafile/"SEAFILE_CLIENT_VERSION" ("USER_AGENT_OS")");
if (token) {
token_header = g_strdup_printf ("Seafile-Repo-Token: %s", token);
headers = curl_slist_append (headers, token_header);
g_free (token_header);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
}
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
if (timeout) {
/* Set low speed limit to 1 bytes. This effectively means no data. */
curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1);
curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, HTTP_TIMEOUT_SEC);
}
/*if (seaf->disable_verify_certificate) {
curl_easy_setopt (curl, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt (curl, CURLOPT_SSL_VERIFYHOST, 0L);
}*/
HttpResponse rsp;
memset (&rsp, 0, sizeof(rsp));
if (rsp_content) {
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, recv_response);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &rsp);
} else if (callback) {
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, cb_data);
}
/*gboolean is_https = (strncasecmp(url, "https", strlen("https")) == 0);
set_proxy (curl, is_https);*/
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
int rc = curl_easy_perform (curl);
if (rc != 0) {
seaf_warning ("libcurl failed to GET %s: %s.\n",
url, curl_easy_strerror(rc));
ret = -1;
goto out;
}
long status;
rc = curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &status);
if (rc != CURLE_OK) {
seaf_warning ("Failed to get status code for GET %s.\n", url);
ret = -1;
goto out;
}
*rsp_status = status;
if (rsp_content) {
*rsp_content = rsp.content;
*rsp_size = rsp.size;
}
out:
if (ret < 0) {
conn->release = TRUE;
g_free (rsp.content);
}
curl_slist_free_all (headers);
return ret;
}
typedef struct _HttpRequest {
const char *content;
size_t size;
} HttpRequest;
static size_t
send_request (void *ptr, size_t size, size_t nmemb, void *userp)
{
size_t realsize = size *nmemb;
size_t copy_size;
HttpRequest *req = userp;
if (req->size == 0)
return 0;
copy_size = MIN(req->size, realsize);
memcpy (ptr, req->content, copy_size);
req->size -= copy_size;
req->content = req->content + copy_size;
return copy_size;
}
int
http_post (Connection *conn, const char *url, const char *token,
const char *req_content, gint64 req_size,
int *rsp_status, char **rsp_content, gint64 *rsp_size,
gboolean timeout, int timeout_sec)
{
char *token_header;
struct curl_slist *headers = NULL;
int ret = 0;
CURL *curl;
curl = conn->curl;
g_return_val_if_fail (req_content != NULL, -1);
headers = curl_slist_append (headers, "User-Agent: Seafile/"SEAFILE_CLIENT_VERSION" ("USER_AGENT_OS")");
if (token) {
token_header = g_strdup_printf ("Seafile-Repo-Token: %s", token);
headers = curl_slist_append (headers, token_header);
g_free (token_header);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
}
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_POST, 1L);
if (timeout) {
/* Set low speed limit to 1 bytes. This effectively means no data. */
curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1);
curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, timeout_sec);
}
/*if (seaf->disable_verify_certificate) {
curl_easy_setopt (curl, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt (curl, CURLOPT_SSL_VERIFYHOST, 0L);
}*/
HttpRequest req;
if (req_content) {
memset (&req, 0, sizeof(req));
req.content = req_content;
req.size = req_size;
curl_easy_setopt(curl, CURLOPT_READFUNCTION, send_request);
curl_easy_setopt(curl, CURLOPT_READDATA, &req);
}
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t)req_size);
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
HttpResponse rsp;
memset (&rsp, 0, sizeof(rsp));
if (rsp_content) {
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, recv_response);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &rsp);
}
/*gboolean is_https = (strncasecmp(url, "https", strlen("https")) == 0);
set_proxy (curl, is_https);*/
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
/* All POST requests should remain POST after redirect. */
curl_easy_setopt(curl, CURLOPT_POSTREDIR, CURL_REDIR_POST_ALL);
int rc = curl_easy_perform (curl);
if (rc != 0) {
seaf_warning ("libcurl failed to POST %s: %s.\n",
url, curl_easy_strerror(rc));
ret = -1;
goto out;
}
long status;
rc = curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &status);
if (rc != CURLE_OK) {
seaf_warning ("Failed to get status code for POST %s.\n", url);
ret = -1;
goto out;
}
*rsp_status = status;
if (rsp_content) {
*rsp_content = rsp.content;
*rsp_size = rsp.size;
}
out:
if (ret < 0) {
conn->release = TRUE;
g_free (rsp.content);
}
curl_slist_free_all (headers);
return ret;
}

48
server/http-tx-mgr.h Normal file
View File

@ -0,0 +1,48 @@
#ifndef HTTP_TX_MGR_H
#define HTTP_TX_MGR_H
#include <curl/curl.h>
#define HTTP_OK 200
#define HTTP_BAD_REQUEST 400
#define HTTP_FORBIDDEN 403
#define HTTP_NOT_FOUND 404
#define HTTP_NO_QUOTA 443
#define HTTP_REPO_DELETED 444
#define HTTP_INTERNAL_SERVER_ERROR 500
typedef struct _Connection Connection;
typedef struct _ConnectionPool ConnectionPool;
ConnectionPool *
connection_pool_new ();
Connection *
connection_pool_get_connection (ConnectionPool *pool);
void
connection_pool_return_connection (ConnectionPool *pool, Connection *conn);
void
connection_pool_free (ConnectionPool *pool);
char*
http_code_to_str (int http_code);
typedef size_t (*HttpRecvCallback) (void *, size_t, size_t, void *);
int
http_get (Connection *conn, const char *url, const char *token,
int *rsp_status, char **rsp_content, gint64 *rsp_size,
HttpRecvCallback callback, void *cb_data,
gboolean timeout);
int
http_post (Connection *conn, const char *url, const char *token,
const char *req_content, gint64 req_size,
int *rsp_status, char **rsp_content, gint64 *rsp_size,
gboolean timeout, int timeout_sec);
void
http_tx_manager_init ();
#endif

117
server/notif-mgr.c Normal file
View File

@ -0,0 +1,117 @@
#include "common.h"
#include <pthread.h>
#include <jansson.h>
#include <timer.h>
#include "seafile-session.h"
#include "http-tx-mgr.h"
#include "notif-mgr.h"
#include "utils.h"
#include "seafile-error.h"
#include "log.h"
#define NOTIF_TIMEOUT_SEC 1
struct _NotifPriv {
char *notif_url;
char *notif_token;
ConnectionPool *connection_pool;
};
typedef struct _NotifPriv NotifPriv;
typedef struct Event {
NotifPriv *priv;
char *msg;
} Event;
NotifManager *
seaf_notif_manager_new (struct _SeafileSession *seaf, char *url, char *token)
{
NotifManager *mgr = g_new0 (NotifManager, 1);
mgr->seaf = seaf;
NotifPriv *priv = g_new0 (NotifPriv, 1);
priv->connection_pool = connection_pool_new ();
if (!priv->connection_pool) {
g_free (priv);
g_free (mgr);
return NULL;
}
priv->notif_url = url;
priv->notif_token = token;
mgr->priv = priv;
return mgr;
}
static void*
send_event (void *data)
{
Event *event= data;
NotifPriv *priv = event->priv;
Connection *conn = NULL;
int rsp_status;
char *req_url = NULL;
conn = connection_pool_get_connection (priv->connection_pool);
if (!conn) {
seaf_warning ("Failed to get connection: out of memory.\n");
return event;
}
req_url = g_strdup_printf ("%s/events", priv->notif_url);
int ret;
ret = http_post (conn, req_url, priv->notif_token, event->msg, strlen (event->msg),
&rsp_status, NULL, NULL, TRUE, NOTIF_TIMEOUT_SEC);
if (ret < 0) {
goto out;
}
if (rsp_status != HTTP_OK) {
seaf_warning ("Failed to send event to notification server %s: %d.\n",
priv->notif_url, rsp_status);
}
out:
g_free (req_url);
connection_pool_return_connection (priv->connection_pool, conn);
return event;
}
static void
free_send_event(void *data)
{
if (!data)
return;
Event *event= data;
if (event->msg)
g_free (event->msg);
g_free (event);
}
void
seaf_notif_manager_send_event (NotifManager *mgr, const char *msg)
{
Event *event = g_new0 (Event, 1);
event->priv = mgr->priv;
event->msg = g_strdup (msg);
ccnet_job_manager_schedule_job (seaf->job_mgr,
send_event,
free_send_event,
event);
}

19
server/notif-mgr.h Normal file
View File

@ -0,0 +1,19 @@
#ifndef HTTP_NOTIFICATION_MGR_H
#define HTTP_NOTIFICATION_MGR_H
struct _NotifManager {
struct _SeafileSession *seaf;
struct _NotifPriv *priv;
};
typedef struct _NotifManager NotifManager;
NotifManager *
seaf_notif_manager_new (struct _SeafileSession *seaf, char *url, char *token);
void
seaf_notif_manager_send_event (NotifManager *mgr,
const char *msg);
#endif

View File

@ -44,6 +44,9 @@ seafile_session_new(const char *central_config_dir,
GKeyFile *config;
GKeyFile *ccnet_config;
SeafileSession *session = NULL;
char *notif_url = NULL;
char *notif_token = NULL;
char *private_key = NULL;
abs_ccnet_dir = ccnet_expand_path (ccnet_dir);
abs_seafile_dir = ccnet_expand_path (seafile_dir);
@ -123,6 +126,21 @@ seafile_session_new(const char *central_config_dir,
g_free (type);
}
notif_url = g_key_file_get_string (config,
"notification", "notification_url",
NULL);
notif_token = g_key_file_get_string (config,
"notification", "notification_token",
NULL);
private_key = g_key_file_get_string (config,
"notification", "private_key",
NULL);
if (private_key) {
session->private_key = private_key;
}
if (load_database_config (session) < 0) {
seaf_warning ("Failed to load database config.\n");
goto onerror;
@ -204,9 +222,18 @@ seafile_session_new(const char *central_config_dir,
if (!session->org_mgr)
goto onerror;
if (notif_url != NULL && notif_token != NULL) {
session->notif_mgr = seaf_notif_manager_new (session, notif_url, notif_token);
if (!session->notif_mgr)
goto onerror;
}
return session;
onerror:
g_free (notif_url);
g_free (notif_token);
g_free (private_key);
free (abs_seafile_dir);
free (abs_ccnet_dir);
g_free (tmp_file_dir);

View File

@ -28,6 +28,7 @@
#include "http-server.h"
#include "zip-download-mgr.h"
#include "index-blocks-mgr.h"
#include "notif-mgr.h"
#include <searpc-client.h>
@ -78,6 +79,10 @@ struct _SeafileSession {
gboolean ccnet_create_tables;
gboolean go_fileserver;
// For notification server
NotifManager *notif_mgr;
char *private_key;
};
extern SeafileSession *seaf;