mirror of
https://github.com/haiwen/seafile-server.git
synced 2025-08-15 21:45:14 +00:00
* Update python to 3.12.8 for ci test * Fix error for notification server --------- Co-authored-by: 杨赫然 <heran.yang@seafile.com>
375 lines
8.3 KiB
Go
375 lines
8.3 KiB
Go
package main
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
_ "github.com/go-sql-driver/mysql"
|
|
jwt "github.com/golang-jwt/jwt/v5"
|
|
"github.com/gorilla/mux"
|
|
"github.com/gorilla/websocket"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
var configDir string
|
|
var logFile, absLogFile string
|
|
var privateKey string
|
|
var host string
|
|
var port uint32
|
|
var logFp *os.File
|
|
|
|
var ccnetDB *sql.DB
|
|
|
|
var logToStdout bool
|
|
|
|
func init() {
|
|
flag.StringVar(&configDir, "c", "", "config directory")
|
|
flag.StringVar(&logFile, "l", "", "log file path")
|
|
|
|
env := os.Getenv("SEAFILE_LOG_TO_STDOUT")
|
|
if env == "true" {
|
|
logToStdout = true
|
|
}
|
|
|
|
log.SetFormatter(&LogFormatter{})
|
|
}
|
|
|
|
func loadNotifConfig() {
|
|
host = os.Getenv("NOTIFICATION_SERVER_HOST")
|
|
if host == "" {
|
|
host = "0.0.0.0"
|
|
}
|
|
|
|
port = 8083
|
|
if os.Getenv("NOTIFICATION_SERVER_PORT") != "" {
|
|
i, err := strconv.Atoi(os.Getenv("NOTIFICATION_SERVER_PORT"))
|
|
if err == nil {
|
|
port = uint32(i)
|
|
}
|
|
}
|
|
|
|
logLevel := os.Getenv("NOTIFICATION_SERVER_LOG_LEVEL")
|
|
if logLevel == "" {
|
|
logLevel = "info"
|
|
}
|
|
|
|
level, err := log.ParseLevel(logLevel)
|
|
if err != nil {
|
|
log.Info("use the default log level: info")
|
|
log.SetLevel(log.InfoLevel)
|
|
} else {
|
|
log.SetLevel(level)
|
|
}
|
|
}
|
|
|
|
func loadCcnetDB() {
|
|
option, err := loadDBOptionFromEnv()
|
|
if err != nil {
|
|
log.Fatalf("Failed to load database from env: %v", err)
|
|
}
|
|
|
|
var dsn string
|
|
if option.UnixSocket == "" {
|
|
dsn = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?tls=%t&readTimeout=60s&writeTimeout=60s", option.User, option.Password, option.Host, option.Port, option.CcnetDbName, option.UseTLS)
|
|
} else {
|
|
dsn = fmt.Sprintf("%s:%s@unix(%s)/%s?readTimeout=60s&writeTimeout=60s", option.User, option.Password, option.UnixSocket, option.CcnetDbName)
|
|
}
|
|
ccnetDB, err = sql.Open("mysql", dsn)
|
|
if err != nil {
|
|
log.Fatalf("Failed to open database: %v", err)
|
|
}
|
|
if err := ccnetDB.Ping(); err != nil {
|
|
log.Fatalf("Failed to connected to mysql: %v", err)
|
|
}
|
|
ccnetDB.SetConnMaxLifetime(5 * time.Minute)
|
|
ccnetDB.SetMaxOpenConns(8)
|
|
ccnetDB.SetMaxIdleConns(8)
|
|
}
|
|
|
|
type DBOption struct {
|
|
User string
|
|
Password string
|
|
Host string
|
|
Port int
|
|
CcnetDbName string
|
|
SeafileDbName string
|
|
UnixSocket string
|
|
UseTLS bool
|
|
}
|
|
|
|
func loadDBOptionFromEnv() (*DBOption, error) {
|
|
user := os.Getenv("SEAFILE_MYSQL_DB_USER")
|
|
if user == "" {
|
|
return nil, fmt.Errorf("failed to read SEAFILE_MYSQL_DB_USER")
|
|
}
|
|
password := os.Getenv("SEAFILE_MYSQL_DB_PASSWORD")
|
|
if password == "" {
|
|
return nil, fmt.Errorf("failed to read SEAFILE_MYSQL_DB_PASSWORD")
|
|
}
|
|
host := os.Getenv("SEAFILE_MYSQL_DB_HOST")
|
|
if host == "" {
|
|
return nil, fmt.Errorf("failed to read SEAFILE_MYSQL_DB_HOST")
|
|
}
|
|
ccnetDbName := os.Getenv("SEAFILE_MYSQL_DB_CCNET_DB_NAME")
|
|
if ccnetDbName == "" {
|
|
ccnetDbName = "ccnet_db"
|
|
log.Infof("Failed to read SEAFILE_MYSQL_DB_CCNET_DB_NAME, use ccnet_db by default")
|
|
}
|
|
seafileDbName := os.Getenv("SEAFILE_MYSQL_DB_SEAFILE_DB_NAME")
|
|
if seafileDbName == "" {
|
|
seafileDbName = "seafile_db"
|
|
log.Infof("Failed to read SEAFILE_MYSQL_DB_SEAFILE_DB_NAME, use seafile_db by default")
|
|
}
|
|
|
|
log.Infof("Database: user = %s", user)
|
|
log.Infof("Database: host = %s", host)
|
|
log.Infof("Database: ccnet_db_name = %s", ccnetDbName)
|
|
log.Infof("Database: seafile_db_name = %s", seafileDbName)
|
|
|
|
option := new(DBOption)
|
|
option.User = user
|
|
option.Password = password
|
|
option.Host = host
|
|
option.Port = 3306
|
|
option.CcnetDbName = ccnetDbName
|
|
option.SeafileDbName = seafileDbName
|
|
return option, nil
|
|
}
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
if configDir == "" {
|
|
log.Fatal("config directory must be specified.")
|
|
}
|
|
|
|
_, err := os.Stat(configDir)
|
|
if os.IsNotExist(err) {
|
|
log.Fatalf("config directory %s doesn't exist: %v.", configDir, err)
|
|
}
|
|
|
|
if logToStdout {
|
|
// Use default output (StdOut)
|
|
} else if logFile == "" {
|
|
absLogFile = filepath.Join(configDir, "notification-server.log")
|
|
fp, err := os.OpenFile(absLogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
|
|
if err != nil {
|
|
log.Fatalf("Failed to open or create log file: %v", err)
|
|
}
|
|
logFp = fp
|
|
log.SetOutput(fp)
|
|
} else if logFile != "-" {
|
|
absLogFile, err = filepath.Abs(logFile)
|
|
if err != nil {
|
|
log.Fatalf("Failed to convert log file path to absolute path: %v", err)
|
|
}
|
|
fp, err := os.OpenFile(absLogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
|
|
if err != nil {
|
|
log.Fatalf("Failed to open or create log file: %v", err)
|
|
}
|
|
logFp = fp
|
|
log.SetOutput(fp)
|
|
}
|
|
|
|
if absLogFile != "" && !logToStdout {
|
|
Dup(int(logFp.Fd()), int(os.Stderr.Fd()))
|
|
}
|
|
|
|
if err := loadJwtPrivateKey(); err != nil {
|
|
log.Fatalf("Failed to read config: %v", err)
|
|
}
|
|
|
|
loadNotifConfig()
|
|
loadCcnetDB()
|
|
|
|
Init()
|
|
|
|
go handleUser1Signal()
|
|
|
|
router := newHTTPRouter()
|
|
|
|
log.Info("notification server started.")
|
|
|
|
server := new(http.Server)
|
|
server.Addr = fmt.Sprintf("%s:%d", host, port)
|
|
server.Handler = router
|
|
|
|
err = server.ListenAndServe()
|
|
if err != nil {
|
|
log.Infof("notificationserver exiting: %v", err)
|
|
}
|
|
}
|
|
|
|
func loadJwtPrivateKey() error {
|
|
privateKey = os.Getenv("JWT_PRIVATE_KEY")
|
|
if privateKey == "" {
|
|
return fmt.Errorf("failed to read JWT_PRIVATE_KEY")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func handleUser1Signal() {
|
|
signalChan := make(chan os.Signal, 1)
|
|
signal.Notify(signalChan, syscall.SIGUSR1)
|
|
|
|
for {
|
|
<-signalChan
|
|
logRotate()
|
|
}
|
|
}
|
|
|
|
func logRotate() {
|
|
if logToStdout {
|
|
return
|
|
}
|
|
fp, err := os.OpenFile(absLogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
|
|
if err != nil {
|
|
log.Fatalf("Failed to reopen notification log: %v", err)
|
|
}
|
|
log.SetOutput(fp)
|
|
if logFp != nil {
|
|
logFp.Close()
|
|
logFp = fp
|
|
}
|
|
|
|
Dup(int(logFp.Fd()), int(os.Stderr.Fd()))
|
|
}
|
|
|
|
func newHTTPRouter() *mux.Router {
|
|
r := mux.NewRouter()
|
|
r.Handle("/", appHandler(messageCB))
|
|
r.Handle("/events{slash:\\/?}", appHandler(eventCB))
|
|
r.Handle("/ping{slash:\\/?}", appHandler(pingCB))
|
|
|
|
return r
|
|
}
|
|
|
|
// Any http request will be automatically upgraded to websocket.
|
|
func messageCB(rsp http.ResponseWriter, r *http.Request) *appError {
|
|
upgrader := newUpgrader()
|
|
conn, err := upgrader.Upgrade(rsp, r, nil)
|
|
if err != nil {
|
|
log.Warnf("failed to upgrade http to websocket: %v", err)
|
|
// Don't return eror here, because the upgrade fails, then Upgrade replies to the client with an HTTP error response.
|
|
return nil
|
|
}
|
|
|
|
addr := r.Header.Get("x-forwarded-for")
|
|
if addr == "" {
|
|
addr = conn.RemoteAddr().String()
|
|
}
|
|
client := NewClient(conn, addr)
|
|
RegisterClient(client)
|
|
client.HandleMessages()
|
|
|
|
return nil
|
|
}
|
|
|
|
func eventCB(rsp http.ResponseWriter, r *http.Request) *appError {
|
|
msg := Message{}
|
|
|
|
token := getAuthorizationToken(r.Header)
|
|
if !checkAuthToken(token) {
|
|
return &appError{Error: nil,
|
|
Message: "Notification token not match",
|
|
Code: http.StatusBadRequest,
|
|
}
|
|
}
|
|
|
|
body, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
return &appError{Error: err,
|
|
Message: "",
|
|
Code: http.StatusInternalServerError,
|
|
}
|
|
}
|
|
|
|
if err := json.Unmarshal(body, &msg); err != nil {
|
|
return &appError{Error: err,
|
|
Message: "",
|
|
Code: http.StatusInternalServerError,
|
|
}
|
|
}
|
|
|
|
Notify(&msg)
|
|
|
|
return nil
|
|
}
|
|
|
|
func getAuthorizationToken(h http.Header) string {
|
|
auth := h.Get("Authorization")
|
|
splitResult := strings.Split(auth, " ")
|
|
if len(splitResult) > 1 {
|
|
return splitResult[1]
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func checkAuthToken(tokenString string) bool {
|
|
if len(tokenString) == 0 {
|
|
return false
|
|
}
|
|
claims := new(myClaims)
|
|
token, err := jwt.ParseWithClaims(tokenString, claims, func(token *jwt.Token) (interface{}, error) {
|
|
return []byte(privateKey), nil
|
|
})
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
if !token.Valid {
|
|
return false
|
|
}
|
|
|
|
now := time.Now()
|
|
|
|
return claims.Exp > now.Unix()
|
|
}
|
|
|
|
func newUpgrader() *websocket.Upgrader {
|
|
upgrader := &websocket.Upgrader{
|
|
ReadBufferSize: 4096,
|
|
WriteBufferSize: 1024,
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
return true
|
|
},
|
|
}
|
|
|
|
return upgrader
|
|
}
|
|
|
|
func pingCB(rsp http.ResponseWriter, r *http.Request) *appError {
|
|
fmt.Fprintln(rsp, "{\"ret\": \"pong\"}")
|
|
return nil
|
|
}
|
|
|
|
type appError struct {
|
|
Error error
|
|
Message string
|
|
Code int
|
|
}
|
|
|
|
type appHandler func(http.ResponseWriter, *http.Request) *appError
|
|
|
|
func (fn appHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
e := fn(w, r)
|
|
if e != nil {
|
|
if e.Error != nil && e.Code == http.StatusInternalServerError {
|
|
log.Infof("path %s internal server error: %v\n", r.URL.Path, e.Error)
|
|
}
|
|
http.Error(w, e.Message, e.Code)
|
|
}
|
|
}
|