1
0
mirror of https://github.com/haiwen/seafile-server.git synced 2025-09-09 03:08:57 +00:00

Support dm8 database

This commit is contained in:
杨赫然
2023-10-10 15:05:32 +08:00
parent 7bb8866889
commit e5e577b282
18 changed files with 1249 additions and 49 deletions

View File

@@ -13,6 +13,12 @@
#include <sqlite3.h>
#include <pthread.h>
#ifdef HAVE_ODBC
#include <sql.h>
#include <sqltypes.h>
#include <sqlext.h>
#endif
struct DBConnPool {
GPtrArray *connections;
pthread_mutex_t lock;
@@ -38,6 +44,7 @@ struct SeafDBRow {
struct SeafDBTrans {
DBConnection *conn;
gboolean need_close;
int type;
};
typedef struct DBOperations {
@@ -320,6 +327,210 @@ seaf_db_new_sqlite (const char *db_path, int max_connections)
return db;
}
#ifdef HAVE_ODBC
#define BUFFER_NOT_ENOUGH -70018
#define SUCCESS(rc) ((rc) == SQL_SUCCESS || (rc) == SQL_SUCCESS_WITH_INFO || (rc) == SQL_NO_DATA || (rc) == SQL_NO_DATA_FOUND)
typedef struct DMDBConnection {
struct DBConnection parent;
HDBC db_conn;
} DMDBConnection;
/* MySQL Ops */
static SeafDB *
dm_db_new (const char *user,
const char *password,
const char *db_name);
static DBConnection *
dm_db_get_connection (SeafDB *db);
static void
dm_db_release_connection (DBConnection *vconn);
static int
dm_db_execute_sql_no_stmt (DBConnection *vconn, const char *sql, gboolean *retry);
static int
dm_db_execute_sql (DBConnection *vconn, const char *sql, int n, va_list args, gboolean *retry);
static int
dm_db_query_foreach_row (DBConnection *vconn, const char *sql,
SeafDBRowFunc callback, void *data,
int n, va_list args, gboolean *retry);
static int
dm_db_row_get_column_count (SeafDBRow *row);
static const char *
dm_db_row_get_column_string (SeafDBRow *row, int idx);
static int
dm_db_row_get_column_int (SeafDBRow *row, int idx);
static gint64
dm_db_row_get_column_int64 (SeafDBRow *row, int idx);
static DBConnPool *
init_dm_conn_pool_common (int max_connections)
{
DBConnPool *pool = g_new0(DBConnPool, 1);
pool->connections = g_ptr_array_sized_new (max_connections);
pthread_mutex_init (&pool->lock, NULL);
pool->max_connections = max_connections;
return pool;
}
static DBConnection *
dm_conn_pool_get_connection (SeafDB *db)
{
DBConnPool *pool = db->pool;
DBConnection *conn = NULL;
DBConnection *d_conn = NULL;
if (pool->max_connections == 0) {
conn = dm_db_get_connection (db);
conn->pool = pool;
return conn;
}
pthread_mutex_lock (&pool->lock);
guint i, size = pool->connections->len;
for (i = 0; i < size; ++i) {
conn = g_ptr_array_index (pool->connections, i);
if (!conn->is_available) {
continue;
}
conn->is_available = FALSE;
goto out;
}
conn = NULL;
if (size < pool->max_connections) {
conn = dm_db_get_connection (db);
if (conn) {
conn->pool = pool;
conn->is_available = FALSE;
g_ptr_array_add (pool->connections, conn);
}
}
out:
size = pool->connections->len;
if (size > 0) {
int index;
for (index = size - 1; index >= 0; index--) {
d_conn = g_ptr_array_index (pool->connections, index);
if (d_conn->delete_pending) {
g_ptr_array_remove (conn->pool->connections, d_conn);
dm_db_release_connection (d_conn);
}
}
}
pthread_mutex_unlock (&pool->lock);
return conn;
}
static void
dm_conn_pool_release_connection (DBConnection *conn, gboolean need_close)
{
if (!conn)
return;
if (conn->pool->max_connections == 0) {
dm_db_release_connection (conn);
return;
}
if (need_close) {
pthread_mutex_lock (&conn->pool->lock);
g_ptr_array_remove (conn->pool->connections, conn);
pthread_mutex_unlock (&conn->pool->lock);
dm_db_release_connection (conn);
return;
}
pthread_mutex_lock (&conn->pool->lock);
conn->is_available = TRUE;
pthread_mutex_unlock (&conn->pool->lock);
}
#define KEEPALIVE_INTERVAL 30
static void *
dm_conn_keepalive (void *arg)
{
DBConnPool *pool = arg;
DBConnection *conn = NULL;
DBConnection *d_conn = NULL;
char *sql = "SELECT 1;";
int rc = 0;
va_list args;
while (1) {
pthread_mutex_lock (&pool->lock);
guint i, size = pool->connections->len;
for (i = 0; i < size; ++i) {
conn = g_ptr_array_index (pool->connections, i);
if (conn->is_available) {
rc = db_ops.execute_sql (conn, sql, 0, args, NULL);
if (rc < 0) {
conn->is_available = FALSE;
conn->delete_pending = TRUE;
}
}
}
if (size > 0) {
int index;
for (index = size - 1; index >= 0; index--) {
d_conn = g_ptr_array_index (pool->connections, index);
if (d_conn->delete_pending) {
g_ptr_array_remove (pool->connections, d_conn);
dm_db_release_connection (d_conn);
}
}
}
pthread_mutex_unlock (&pool->lock);
sleep (KEEPALIVE_INTERVAL);
}
return NULL;
}
SeafDB *
seaf_db_new_dm (const char *user,
const char *passwd,
const char *db_name,
int max_connections)
{
SeafDB *db;
db = dm_db_new (user, passwd, db_name);
if (!db)
return NULL;
db->type = SEAF_DB_TYPE_DM;
db_ops.get_connection = dm_conn_pool_get_connection;
db_ops.release_connection = dm_conn_pool_release_connection;
db_ops.execute_sql_no_stmt = dm_db_execute_sql_no_stmt;
db_ops.execute_sql = dm_db_execute_sql;
db_ops.query_foreach_row = dm_db_query_foreach_row;
db_ops.row_get_column_count = dm_db_row_get_column_count;
db_ops.row_get_column_string = dm_db_row_get_column_string;
db_ops.row_get_column_int = dm_db_row_get_column_int;
db_ops.row_get_column_int64 = dm_db_row_get_column_int64;
db->pool = init_dm_conn_pool_common (max_connections);
pthread_t tid;
int ret = pthread_create (&tid, NULL, dm_conn_keepalive, db->pool);
if (ret != 0) {
seaf_warning ("Failed to create dm connection keepalive thread.\n");
return NULL;
}
pthread_detach (tid);
return db;
}
#endif
int
seaf_db_type (SeafDB *db)
{
@@ -635,13 +846,21 @@ seaf_db_begin_transaction (SeafDB *db)
return trans;
}
if (db_ops.execute_sql_no_stmt (conn, "BEGIN", NULL) < 0) {
db_ops.release_connection (conn, TRUE);
return trans;
if (db->type == SEAF_DB_TYPE_DM) {
#ifdef HAVE_ODBC
DMDBConnection *dm_conn = (DMDBConnection *)conn;
SQLSetConnectAttr(dm_conn->db_conn, SQL_ATTR_AUTOCOMMIT, (SQLPOINTER)SQL_AUTOCOMMIT_OFF, SQL_IS_INTEGER);
#endif
} else {
if (db_ops.execute_sql_no_stmt (conn, "BEGIN", NULL) < 0) {
db_ops.release_connection (conn, TRUE);
return trans;
}
}
trans = g_new0 (SeafDBTrans, 1);
trans->conn = conn;
trans->type = db->type;
return trans;
}
@@ -649,6 +868,12 @@ seaf_db_begin_transaction (SeafDB *db)
void
seaf_db_trans_close (SeafDBTrans *trans)
{
if (trans->type == SEAF_DB_TYPE_DM) {
#ifdef HAVE_ODBC
DMDBConnection *dm_conn = (DMDBConnection *)trans->conn;
SQLSetConnectAttr(dm_conn->db_conn, SQL_ATTR_AUTOCOMMIT, (SQLPOINTER)SQL_AUTOCOMMIT_ON, SQL_IS_INTEGER);
#endif
}
db_ops.release_connection (trans->conn, trans->need_close);
g_free (trans);
}
@@ -658,9 +883,21 @@ seaf_db_commit (SeafDBTrans *trans)
{
DBConnection *conn = trans->conn;
if (db_ops.execute_sql_no_stmt (conn, "COMMIT", NULL) < 0) {
trans->need_close = TRUE;
return -1;
if (trans->type == SEAF_DB_TYPE_DM) {
#ifdef HAVE_ODBC
DMDBConnection *dm_conn = (DMDBConnection *)conn;
SQLRETURN ret;
ret = SQLEndTran (SQL_HANDLE_DBC, dm_conn->db_conn, SQL_COMMIT);
if (!SUCCESS(ret)) {
trans->need_close = TRUE;
return -1;
}
#endif
} else {
if (db_ops.execute_sql_no_stmt (conn, "COMMIT", NULL) < 0) {
trans->need_close = TRUE;
return -1;
}
}
return 0;
@@ -671,9 +908,21 @@ seaf_db_rollback (SeafDBTrans *trans)
{
DBConnection *conn = trans->conn;
if (db_ops.execute_sql_no_stmt (conn, "ROLLBACK", NULL) < 0) {
trans->need_close = TRUE;
return -1;
if (trans->type == SEAF_DB_TYPE_DM) {
#ifdef HAVE_ODBC
DMDBConnection *dm_conn = (DMDBConnection *)conn;
SQLRETURN ret;
ret = SQLEndTran (SQL_HANDLE_DBC, dm_conn->db_conn, SQL_ROLLBACK);
if (!SUCCESS(ret)) {
trans->need_close = TRUE;
return -1;
}
#endif
} else {
if (db_ops.execute_sql_no_stmt (conn, "ROLLBACK", NULL) < 0) {
trans->need_close = TRUE;
return -1;
}
}
return 0;
@@ -1575,3 +1824,450 @@ sqlite_db_row_get_column_int64 (SeafDBRow *vrow, int idx)
return sqlite3_column_int64 (row->stmt, idx);
}
#ifdef HAVE_ODBC
/* DM DB */
static int
dm_get_error_state (SQLSMALLINT type, SQLHANDLE handle)
{
SQLINTEGER native_err;
unsigned char errmsg[255];
SQLGetDiagRec(type, handle, 1, NULL, &native_err, errmsg, sizeof(errmsg), NULL);
return native_err;
}
char *
dm_db_error (SQLSMALLINT type, SQLHANDLE handle)
{
SQLINTEGER native_err;
unsigned char errmsg[255];
SQLGetDiagRec(type, handle, 1, NULL, &native_err, errmsg, sizeof(errmsg), NULL);
return g_strdup(errmsg);
}
typedef struct DMDB {
struct SeafDB parent;
char *user;
char *password;
char *db_name;
HENV env;
} DMDB;
typedef struct DM_BIND {
void *buffer;
int buffer_length;
SQLLEN length;
} DM_BIND;
static SeafDB *
dm_db_new (const char *user,
const char *password,
const char *db_name)
{
DMDB *db = g_new0 (DMDB, 1);
db->user = g_strdup (user);
db->password = g_strdup (password);
db->db_name = g_strdup(db_name);
SQLAllocHandle(SQL_HANDLE_ENV, NULL, &db->env);
SQLSetEnvAttr(db->env, SQL_ATTR_ODBC_VERSION, (SQLPOINTER)SQL_OV_ODBC3, SQL_IS_INTEGER);
return (SeafDB *)db;
}
typedef char my_bool;
static DBConnection *
dm_db_get_connection (SeafDB *vdb)
{
DMDB *db = (DMDB *)vdb;
HDBC db_conn;
DMDBConnection *conn = NULL;
SQLRETURN ret;
SQLAllocHandle(SQL_HANDLE_DBC, db->env, &db_conn);
ret = SQLConnect(db_conn, (SQLCHAR *)db->db_name, SQL_NTS, (SQLCHAR *)db->user, SQL_NTS, (SQLCHAR *)db->password, SQL_NTS);
if (!SUCCESS(ret)) {
char *errmsg = dm_db_error (SQL_HANDLE_DBC, db_conn);
seaf_warning ("Failed to connect dm server: %s\n", errmsg);
g_free (errmsg);
SQLFreeHandle(SQL_HANDLE_DBC, db_conn);
return NULL;
}
conn = g_new0 (DMDBConnection, 1);
conn->db_conn = db_conn;
return (DBConnection *)conn;
}
static void
dm_db_release_connection (DBConnection *vconn)
{
if (!vconn)
return;
DMDBConnection *conn = (DMDBConnection *)vconn;
SQLDisconnect(conn->db_conn);
SQLFreeHandle(SQL_HANDLE_DBC, conn->db_conn);
g_free (conn);
}
static int
dm_db_execute_sql_no_stmt (DBConnection *vconn, const char *sql, gboolean *retry)
{
DMDBConnection *conn = (DMDBConnection *)vconn;
HSTMT stmt;
SQLRETURN ret;
SQLAllocHandle(SQL_HANDLE_STMT, conn->db_conn, &stmt);
ret = SQLExecDirect(stmt, (SQLCHAR *)sql, SQL_NTS);
if (!SUCCESS(ret)) {
char *errmsg = dm_db_error (SQL_HANDLE_STMT, stmt);
seaf_warning ("Failed to execute sql %s: %s\n", sql, errmsg);
g_free (errmsg);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
return -1;
}
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
return 0;
}
static HSTMT
_prepare_stmt_dm (HDBC db, const char *sql, gboolean *retry)
{
HSTMT stmt;
SQLRETURN ret;
SQLAllocHandle(SQL_HANDLE_STMT, db, &stmt);
ret = SQLPrepare(stmt, sql, SQL_NTS);
if (!SUCCESS(ret)) {
char * errmsg = dm_db_error (SQL_HANDLE_STMT, stmt);
seaf_warning ("Failed to prepare sql %s: %s\n", sql, errmsg);
g_free (errmsg);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
return NULL;
}
return stmt;
}
static int
_bind_params_dm (HSTMT stmt, DM_BIND *params, int n, va_list args)
{
int i;
const char *type;
SQLRETURN ret;
char *errmsg = NULL;
for (i = 0; i < n; ++i) {
type = va_arg (args, const char *);
if (strcmp(type, "int") == 0) {
int x = va_arg (args, int);
int *pval = g_new (int, 1);
*pval = x;
params[i].buffer = pval;
ret = SQLBindParameter(stmt, i+1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_INTEGER, sizeof(x), 0, params[i].buffer, sizeof(x), NULL);
if (!SUCCESS(ret)) {
errmsg = dm_db_error (SQL_HANDLE_STMT, stmt);
seaf_warning ("Failed to bid params: %s\n", errmsg);
g_free (errmsg);
return -1;
}
} else if (strcmp (type, "int64") == 0) {
gint64 x = va_arg (args, gint64);
gint64 *pval = g_new (gint64, 1);
*pval = x;
params[i].buffer = pval;
ret = SQLBindParameter(stmt, i+1, SQL_PARAM_INPUT, SQL_C_SBIGINT, SQL_BIGINT, sizeof(x), 0, params[i].buffer, sizeof(x), NULL);
if (!SUCCESS(ret)) {
errmsg = dm_db_error (SQL_HANDLE_STMT, stmt);
seaf_warning ("Failed to bid params: %s\n", errmsg);
g_free (errmsg);
return -1;
}
} else if (strcmp (type, "string") == 0) {
const char *s = va_arg (args, const char *);
int len = 0;
if (s) {
len = strlen(s);
}
params[i].buffer = g_strdup(s);
ret = SQLBindParameter(stmt, i+1, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_CHAR, len, 0, params[i].buffer, len, NULL);
if (!SUCCESS(ret)) {
errmsg = dm_db_error (SQL_HANDLE_STMT, stmt);
seaf_warning ("Failed to bid params: %s\n", errmsg);
g_free (errmsg);
return -1;
}
} else {
seaf_warning ("BUG: invalid prep stmt parameter type %s.\n", type);
g_return_val_if_reached (-1);
}
}
return 0;
}
static int
dm_db_execute_sql (DBConnection *vconn, const char *sql, int n, va_list args, gboolean *retry)
{
DMDBConnection *conn = (DMDBConnection *)vconn;
HDBC db = conn->db_conn;
HSTMT stmt = NULL;
DM_BIND *params = NULL;
SQLRETURN rc;
int ret = 0;
stmt = _prepare_stmt_dm (db, sql, retry);
if (!stmt) {
return -1;
}
if (n > 0) {
params = g_new0 (DM_BIND, n);
if (_bind_params_dm(stmt, params, n, args) < 0) {
seaf_warning ("Failed to bind parameters for %s.\n", sql);
ret = -1;
goto out;
}
}
rc = SQLExecute (stmt);
if (!SUCCESS(rc)) {
char *errmsg = dm_db_error (SQL_HANDLE_STMT, stmt);
seaf_warning ("Failed to execute sql %s: %s\n", sql, errmsg);
g_free (errmsg);
ret = -1;
goto out;
}
out:
if (stmt) {
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
}
if (params) {
int i;
for (i = 0; i < n; ++i) {
g_free (params[i].buffer);
}
g_free (params);
}
return ret;
}
typedef struct DMDBRow {
SeafDBRow parent;
int column_count;
HSTMT stmt;
DM_BIND *results;
gboolean over_buffer;
} DMDBRow;
#define DEFAULT_DM_COLUMN_SIZE 1024
static int
dm_db_query_foreach_row (DBConnection *vconn, const char *sql,
SeafDBRowFunc callback, void *data,
int n, va_list args, gboolean *retry)
{
DMDBConnection *conn = (DMDBConnection *)vconn;
HDBC db = conn->db_conn;
HSTMT stmt = NULL;
DM_BIND *params = NULL;
DMDBRow row;
SQLRETURN rc;
short cols;
int nrows = 0;
int i;
memset (&row, 0, sizeof(row));
stmt = _prepare_stmt_dm (db, sql, retry);
if (!stmt) {
return -1;
}
if (n > 0) {
params = g_new0 (DM_BIND, n);
if (_bind_params_dm (stmt, params, n, args) < 0) {
nrows = -1;
goto out;
}
}
rc = SQLExecute (stmt);
if (!SUCCESS(rc)) {
char *errmsg = dm_db_error (SQL_HANDLE_STMT, stmt);
seaf_warning ("Failed to execute sql %s: error: %s\n", sql, errmsg);
g_free (errmsg);
nrows = -1;
goto out;
}
SQLNumResultCols(stmt, &cols);
row.column_count = cols;
row.stmt = stmt;
row.results = g_new0 (DM_BIND, row.column_count);
for (i = 0; i < row.column_count; ++i) {
row.results[i].buffer = g_new0(char, DEFAULT_DM_COLUMN_SIZE + 1);
/* Ask DM to convert fields to string, to avoid the trouble of
* checking field types.
*/
row.results[i].buffer_length = DEFAULT_DM_COLUMN_SIZE;
row.results[i].length = 0;
SQLBindCol(stmt, i+1, SQL_C_CHAR, row.results[i].buffer, row.results[i].buffer_length, &row.results[i].length);
}
gboolean next_row = TRUE;
while (1) {
rc = SQLFetch(stmt);
if (!SUCCESS(rc)) {
if (rc != SQL_ERROR || dm_get_error_state(SQL_HANDLE_STMT, stmt) != BUFFER_NOT_ENOUGH) {
char *err_msg = dm_db_error (SQL_HANDLE_STMT, stmt);
seaf_warning ("Failed to fetch result for sql %s: %s\n",
sql, err_msg);
g_free (err_msg);
nrows = -1;
goto out;
} else {
row.over_buffer = TRUE;
}
} else {
row.over_buffer = FALSE;
}
if (rc == SQL_NO_DATA_FOUND || rc == SQL_NO_DATA)
break;
++nrows;
if (callback)
next_row = callback ((SeafDBRow *)&row, data);
if (!next_row)
break;
}
out:
if (stmt) {
SQLCloseCursor(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
}
if (params) {
int i;
for (i = 0; i < n; ++i) {
g_free (params[i].buffer);
}
g_free (params);
}
if (row.results) {
for (i = 0; i < row.column_count; ++i) {
g_free (row.results[i].buffer);
}
g_free (row.results);
}
return nrows;
}
static int
dm_db_row_get_column_count (SeafDBRow *vrow)
{
DMDBRow *row = (DMDBRow *)vrow;
return row->column_count;
}
static const char *
dm_db_row_get_column_string (SeafDBRow *vrow, int i)
{
DMDBRow *row = (DMDBRow *)vrow;
if (row->results[i].length == -1) {
return NULL;
}
SQLRETURN rc;
char *ret = NULL;
int j = 1;
/* If column size is larger then allocated buffer size, re-allocate a new buffer
* and fetch the column directly.
*/
alloc_buffer:
if (row->results[i].length == 0 && row->over_buffer) {
g_free (row->results[i].buffer);
row->results[i].buffer = g_new0 (char, 2*j * DEFAULT_DM_COLUMN_SIZE + 1);
row->results[i].buffer_length = 2*j * DEFAULT_DM_COLUMN_SIZE;
rc = SQLGetData(row->stmt, i+1, SQL_C_CHAR, row->results[i].buffer, row->results[i].buffer_length, &row->results[i].length);
if (SUCCESS(rc)) {
ret = row->results[i].buffer;
} else {
if (dm_get_error_state(SQL_HANDLE_STMT, row->stmt) != BUFFER_NOT_ENOUGH) {
return NULL;
} else {
j++;
goto alloc_buffer;
}
}
} else {
ret = row->results[i].buffer;
}
ret[row->results[i].length] = 0;
return ret;
}
static int
dm_db_row_get_column_int (SeafDBRow *vrow, int idx)
{
const char *str;
char *e;
int ret;
str = dm_db_row_get_column_string (vrow, idx);
if (!str) {
return 0;
}
errno = 0;
ret = strtol (str, &e, 10);
if (errno || (e == str)) {
seaf_warning ("Number conversion failed.\n");
return -1;
}
return ret;
}
static gint64
dm_db_row_get_column_int64 (SeafDBRow *vrow, int idx)
{
const char *str;
char *e;
gint64 ret;
str = dm_db_row_get_column_string (vrow, idx);
if (!str) {
return 0;
}
errno = 0;
ret = strtoll (str, &e, 10);
if (errno || (e == str)) {
seaf_warning ("Number conversion failed.\n");
return -1;
}
return ret;
}
#endif /* HAVE_ODBC */