1
0
mirror of https://github.com/haiwen/ccnet-server.git synced 2025-05-09 15:46:19 +00:00

mysql add connection pool and keepalive thread

This commit is contained in:
杨赫然 2020-07-06 11:04:50 +08:00
parent f7e2a8d9ec
commit a1892cf9f3

View File

@ -12,12 +12,21 @@
#include <sqlite3.h>
#include <pthread.h>
struct DBConnPool {
GPtrArray *connections;
pthread_mutex_t lock;
int max_connections;
};
typedef struct DBConnPool DBConnPool;
struct CcnetDB {
int type;
DBConnPool *pool;
};
typedef struct DBConnection {
/* Empty */
gboolean is_available;
DBConnPool *pool;
} DBConnection;
struct CcnetDBRow {
@ -77,6 +86,97 @@ static int
mysql_db_row_get_column_int (CcnetDBRow *row, int idx);
static gint64
mysql_db_row_get_column_int64 (CcnetDBRow *row, int idx);
static gboolean
mysql_db_connection_ping (DBConnection *vconn);
static DBConnPool *
init_conn_pool_common (int max_connections)
{
DBConnPool *pool = g_new0(DBConnPool, 1);
pool->connections = g_ptr_array_sized_new (max_connections);
pthread_mutex_init (&pool->lock, NULL);
pool->max_connections = max_connections;
return pool;
}
static DBConnection *
mysql_conn_pool_get_connection (CcnetDB *db)
{
DBConnPool *pool = db->pool;
DBConnection *conn = NULL;
if (pool->max_connections == 0) {
conn = mysql_db_get_connection (db);
conn->pool = pool;
return conn;
}
pthread_mutex_lock (&pool->lock);
guint i, size = pool->connections->len;
for (i = 0; i < size; ++i) {
conn = g_ptr_array_index (pool->connections, i);
if (conn->is_available && mysql_db_connection_ping (conn)) {
conn->is_available = FALSE;
goto out;
}
}
conn = NULL;
if (size < pool->max_connections) {
conn = mysql_db_get_connection (db);
if (conn) {
conn->pool = pool;
conn->is_available = FALSE;
g_ptr_array_add (pool->connections, conn);
}
}
out:
pthread_mutex_unlock (&pool->lock);
return conn;
}
static void
mysql_conn_pool_release_connection (DBConnection *conn)
{
if (!conn)
return;
if (conn->pool->max_connections == 0) {
mysql_db_release_connection (conn);
return;
}
pthread_mutex_lock (&conn->pool->lock);
conn->is_available = TRUE;
pthread_mutex_unlock (&conn->pool->lock);
}
#define KEEPALIVE_INTERVAL 30
static void *
mysql_conn_keepalive (void *arg)
{
DBConnPool *pool = arg;
DBConnection *conn = NULL;
while (1) {
pthread_mutex_lock (&pool->lock);
guint i, size = pool->connections->len;
for (i = 0; i < size; ++i) {
conn = g_ptr_array_index (pool->connections, i);
if (conn->is_available) {
mysql_db_connection_ping (conn);
}
}
pthread_mutex_unlock (&pool->lock);
sleep (KEEPALIVE_INTERVAL);
}
return NULL;
}
CcnetDB *
ccnet_db_new_mysql (const char *host,
@ -96,8 +196,8 @@ ccnet_db_new_mysql (const char *host,
return NULL;
db->type = CCNET_DB_TYPE_MYSQL;
db_ops.get_connection = mysql_db_get_connection;
db_ops.release_connection = mysql_db_release_connection;
db_ops.get_connection = mysql_conn_pool_get_connection;
db_ops.release_connection = mysql_conn_pool_release_connection;
db_ops.execute_sql_no_stmt = mysql_db_execute_sql_no_stmt;
db_ops.execute_sql = mysql_db_execute_sql;
db_ops.query_foreach_row = mysql_db_query_foreach_row;
@ -106,6 +206,17 @@ ccnet_db_new_mysql (const char *host,
db_ops.row_get_column_int = mysql_db_row_get_column_int;
db_ops.row_get_column_int64 = mysql_db_row_get_column_int64;
db->pool = init_conn_pool_common (max_connections);
pthread_t tid;
int ret = pthread_create (&tid, NULL, mysql_conn_keepalive, db->pool);
if (ret != 0) {
ccnet_warning ("Failed to create mysql connection keepalive thread.\n");
return NULL;
}
pthread_detach (tid);
return db;
}
@ -527,6 +638,14 @@ typedef struct MySQLDBConnection {
MYSQL *db_conn;
} MySQLDBConnection;
static gboolean
mysql_db_connection_ping (DBConnection *vconn)
{
MySQLDBConnection *conn = (MySQLDBConnection *)vconn;
return (mysql_ping (conn->db_conn) == 0);
}
static CcnetDB *
mysql_db_new (const char *host,
int port,