From 48c3f5edefc53a4b8435c06ef7b6b4f430464560 Mon Sep 17 00:00:00 2001 From: Jiaqiang Xu Date: Sat, 20 Aug 2016 15:12:17 +0800 Subject: [PATCH] Add connection pool for db. --- net/common/ccnet-db.c | 14 ++-- net/common/ccnet-db.h | 3 +- net/common/db-wrapper/db-wrapper.c | 99 ++++++++++++++++++++++++--- net/common/db-wrapper/db-wrapper.h | 11 ++- net/common/db-wrapper/mysql-db-ops.c | 13 ++-- net/common/db-wrapper/mysql-db-ops.h | 6 +- net/common/db-wrapper/pgsql-db-ops.c | 9 ++- net/common/db-wrapper/pgsql-db-ops.h | 3 + net/common/db-wrapper/sqlite-db-ops.c | 10 ++- net/common/db-wrapper/sqlite-db-ops.h | 5 +- net/server/server-session.c | 3 +- 11 files changed, 146 insertions(+), 30 deletions(-) diff --git a/net/common/ccnet-db.c b/net/common/ccnet-db.c index 4844024..e36c2b1 100644 --- a/net/common/ccnet-db.c +++ b/net/common/ccnet-db.c @@ -50,10 +50,11 @@ ccnet_db_new_mysql (const char *host, CcnetDB * ccnet_db_new_pgsql (const char *host, - const char *user, - const char *passwd, - const char *db_name, - const char *unix_socket) + const char *user, + const char *passwd, + const char *db_name, + const char *unix_socket, + int max_connections) { CcnetDB *db; @@ -65,7 +66,8 @@ ccnet_db_new_pgsql (const char *host, db->type = CCNET_DB_TYPE_PGSQL; - db->pool = db_conn_pool_new_pgsql (host, user, passwd, db_name, unix_socket); + db->pool = db_conn_pool_new_pgsql (host, user, passwd, db_name, unix_socket, + max_connections); return db; } @@ -83,7 +85,7 @@ ccnet_db_new_sqlite (const char *db_path) db->type = CCNET_DB_TYPE_SQLITE; - db->pool = db_conn_pool_new_sqlite (db_path, 0); + db->pool = db_conn_pool_new_sqlite (db_path, 10); return db; } diff --git a/net/common/ccnet-db.h b/net/common/ccnet-db.h index 025b30e..fd9cf0e 100644 --- a/net/common/ccnet-db.h +++ b/net/common/ccnet-db.h @@ -33,7 +33,8 @@ ccnet_db_new_pgsql (const char *host, const char *user, const char *passwd, const char *db_name, - const char *unix_socket); + const char *unix_socket, + int max_connections); CcnetDB * ccnet_db_new_sqlite (const char *db_path); diff --git a/net/common/db-wrapper/db-wrapper.c b/net/common/db-wrapper/db-wrapper.c index 16d7083..c601e45 100644 --- a/net/common/db-wrapper/db-wrapper.c +++ b/net/common/db-wrapper/db-wrapper.c @@ -9,6 +9,7 @@ typedef struct DBOperations { void (*db_conn_pool_free) (DBConnPool *); DBConnection* (*get_db_connection) (DBConnPool *, GError **); void (*db_connection_close) (DBConnection *); + gboolean (*db_connection_ping) (DBConnection *); gboolean (*db_connection_execute) (DBConnection *, const char *, GError **); ResultSet* (*db_connection_execute_query) (DBConnection *, const char *, GError **); gboolean (*result_set_next) (ResultSet *, GError **); @@ -31,6 +32,14 @@ static DBOperations db_ops; /* DB Connection Pool. */ +static void +init_conn_pool_common (DBConnPool *pool, int max_connections) +{ + pool->connections = g_ptr_array_sized_new (max_connections); + pthread_mutex_init (&pool->lock, NULL); + pool->max_connections = max_connections; +} + DBConnPool * db_conn_pool_new_mysql (const char *host, const char *user, @@ -45,6 +54,7 @@ db_conn_pool_new_mysql (const char *host, db_ops.db_conn_pool_free = mysql_db_conn_pool_free; db_ops.get_db_connection = mysql_get_db_connection; db_ops.db_connection_close = mysql_db_connection_close; + db_ops.db_connection_ping = mysql_db_connection_ping; db_ops.db_connection_execute = mysql_db_connection_execute; db_ops.db_connection_execute_query = mysql_execute_query; db_ops.result_set_next = mysql_result_set_next; @@ -62,8 +72,13 @@ db_conn_pool_new_mysql (const char *host, db_ops.db_connection_commit = mysql_db_commit; db_ops.db_connection_rollback = mysql_db_rollback; - return mysql_db_conn_pool_new (host, user, password, port, db_name, unix_socket, - use_ssl, charset, max_connections); + DBConnPool *pool; + + pool = mysql_db_conn_pool_new (host, user, password, port, db_name, unix_socket, + use_ssl, charset); + init_conn_pool_common (pool, max_connections); + + return pool; } DBConnPool * @@ -71,11 +86,13 @@ db_conn_pool_new_pgsql (const char *host, const char *user, const char *password, const char *db_name, - const char *unix_socket) + const char *unix_socket, + int max_connections) { db_ops.db_conn_pool_free = pgsql_db_conn_pool_free; db_ops.get_db_connection = pgsql_get_db_connection; db_ops.db_connection_close = pgsql_db_connection_close; + db_ops.db_connection_ping = pgsql_db_connection_ping; db_ops.db_connection_execute = pgsql_db_connection_execute; db_ops.db_connection_execute_query = pgsql_execute_query; db_ops.result_set_next = pgsql_result_set_next; @@ -93,7 +110,12 @@ db_conn_pool_new_pgsql (const char *host, db_ops.db_connection_commit = pgsql_db_commit; db_ops.db_connection_rollback = pgsql_db_rollback; - return pgsql_db_conn_pool_new (host, user, password, db_name, unix_socket); + DBConnPool *pool; + + pool = pgsql_db_conn_pool_new (host, user, password, db_name, unix_socket); + init_conn_pool_common (pool, max_connections); + + return pool; } DBConnPool * @@ -102,6 +124,7 @@ db_conn_pool_new_sqlite (const char *db_path, int max_connections) db_ops.db_conn_pool_free = sqlite_db_conn_pool_free; db_ops.get_db_connection = sqlite_get_db_connection; db_ops.db_connection_close = sqlite_db_connection_close; + db_ops.db_connection_ping = sqlite_db_connection_ping; db_ops.db_connection_execute = sqlite_db_connection_execute; db_ops.db_connection_execute_query = sqlite_execute_query; db_ops.result_set_next = sqlite_result_set_next; @@ -119,12 +142,20 @@ db_conn_pool_new_sqlite (const char *db_path, int max_connections) db_ops.db_connection_commit = sqlite_db_commit; db_ops.db_connection_rollback = sqlite_db_rollback; - return sqlite_db_conn_pool_new (db_path, max_connections); + DBConnPool *pool; + + pool = sqlite_db_conn_pool_new (db_path); + init_conn_pool_common (pool, max_connections); + + return pool; } void db_conn_pool_free (DBConnPool *pool) { + g_ptr_array_free (pool->connections, TRUE); + pthread_mutex_destroy (&pool->lock); + return db_ops.db_conn_pool_free (pool); } @@ -133,15 +164,40 @@ db_conn_pool_free (DBConnPool *pool) DBConnection * db_conn_pool_get_connection (DBConnPool *pool, GError **error) { - return db_ops.get_db_connection (pool, error); + DBConnection *conn = NULL; + + pthread_mutex_lock (&pool->lock); + + guint i, size = pool->connections->len; + for (i = 0; i < size; ++i) { + conn = g_ptr_array_index (pool->connections, i); + if (conn->is_available && db_connection_ping (conn)) { + conn->is_available = FALSE; + goto out; + } + } + conn = NULL; + if (size < pool->max_connections) { + conn = db_ops.get_db_connection (pool, error); + if (conn) { + conn->is_available = TRUE; + conn->pool = pool; + g_ptr_array_add (pool->connections, conn); + } + } + +out: + pthread_mutex_unlock (&pool->lock); + return conn; } static void db_connection_clear (DBConnection *conn) { result_set_free (conn->result_set); - db_stmt_free (conn->stmt); + conn->result_set = NULL; + conn->stmt = NULL; } void @@ -150,9 +206,14 @@ db_connection_close (DBConnection *conn) if (!conn) return; + if (conn->in_transaction) + db_connection_rollback (conn, NULL); + db_connection_clear (conn); - db_ops.db_connection_close (conn); + pthread_mutex_lock (&conn->pool->lock); + conn->is_available = TRUE; + pthread_mutex_unlock (&conn->pool->lock); } gboolean @@ -161,6 +222,12 @@ db_connection_execute (DBConnection *conn, const char *sql, GError **error) return db_ops.db_connection_execute (conn, sql, error); } +gboolean +db_connection_ping (DBConnection *conn) +{ + return db_ops.db_connection_ping (conn); +} + /* Result Sets. */ void @@ -341,19 +408,31 @@ db_stmt_free (DBStmt *stmt) gboolean db_connection_begin_transaction (DBConnection *conn, GError **error) { - return db_ops.db_connection_begin_transaction (conn, error); + gboolean ret; + + ret = db_ops.db_connection_begin_transaction (conn, error); + if (ret) + conn->in_transaction++; + + return ret; } gboolean db_connection_commit (DBConnection *conn, GError **error) { + if (conn->in_transaction) + conn->in_transaction = 0; + return db_ops.db_connection_commit (conn, error); } gboolean db_connection_rollback (DBConnection *conn, GError **error) { - db_connection_clear (conn); + if (conn->in_transaction) { + db_connection_clear (conn); + conn->in_transaction = 0; + } return db_ops.db_connection_rollback (conn, error); } diff --git a/net/common/db-wrapper/db-wrapper.h b/net/common/db-wrapper/db-wrapper.h index ad897d6..4d3fdc8 100644 --- a/net/common/db-wrapper/db-wrapper.h +++ b/net/common/db-wrapper/db-wrapper.h @@ -2,6 +2,7 @@ #define DB_WARPPER_H #include +#include #define SEAF_DB_ERROR_DOMAIN g_quark_from_string("SEAF_DB") #define SEAF_DB_ERROR_CODE 0 @@ -9,6 +10,8 @@ /* DB Connection Pool. */ struct DBConnPool { + GPtrArray *connections; + pthread_mutex_t lock; int max_connections; }; typedef struct DBConnPool DBConnPool; @@ -29,7 +32,8 @@ db_conn_pool_new_pgsql (const char *host, const char *user, const char *password, const char *db_name, - const char *unix_socket); + const char *unix_socket, + int max_connections); DBConnPool * db_conn_pool_new_sqlite (const char *db_path, int max_connections); @@ -46,6 +50,8 @@ struct DBStmt; typedef struct DBStmt DBStmt; struct DBConnection { + gboolean is_available; + int in_transaction; DBConnPool *pool; ResultSet *result_set; DBStmt *stmt; @@ -58,6 +64,9 @@ db_conn_pool_get_connection (DBConnPool *pool, GError **error); void db_connection_close (DBConnection *conn); +gboolean +db_connection_ping (DBConnection *conn); + gboolean db_connection_execute (DBConnection *conn, const char *sql, GError **error); diff --git a/net/common/db-wrapper/mysql-db-ops.c b/net/common/db-wrapper/mysql-db-ops.c index f630f7c..a66324a 100644 --- a/net/common/db-wrapper/mysql-db-ops.c +++ b/net/common/db-wrapper/mysql-db-ops.c @@ -27,12 +27,10 @@ mysql_db_conn_pool_new (const char *host, const char *db_name, const char *unix_socket, gboolean use_ssl, - const char *charset, - int max_connections) + const char *charset) { MySQLDBConnPool *pool = g_new0 (MySQLDBConnPool, 1); - pool->parent.max_connections = max_connections; pool->host = g_strdup (host); pool->user = g_strdup (user); pool->password = g_strdup (password); @@ -117,7 +115,6 @@ mysql_get_db_connection (DBConnPool *vpool, GError **error) return NULL; conn = g_new0 (MySQLDBConnection, 1); conn->db = db; - conn->parent.pool = vpool; return (DBConnection *)conn; } @@ -134,6 +131,14 @@ mysql_db_connection_close (DBConnection *vconn) g_free (conn); } +gboolean +mysql_db_connection_ping (DBConnection *vconn) +{ + MySQLDBConnection *conn = (MySQLDBConnection *)vconn; + + return (mysql_ping (conn->db) == 0); +} + gboolean mysql_db_connection_execute (DBConnection *vconn, const char *sql, GError **error) { diff --git a/net/common/db-wrapper/mysql-db-ops.h b/net/common/db-wrapper/mysql-db-ops.h index 4a869d4..8c70f52 100644 --- a/net/common/db-wrapper/mysql-db-ops.h +++ b/net/common/db-wrapper/mysql-db-ops.h @@ -9,8 +9,7 @@ mysql_db_conn_pool_new (const char *host, const char *db_name, const char *unix_socket, gboolean use_ssl, - const char *charset, - int max_connections); + const char *charset); void mysql_db_conn_pool_free (DBConnPool *vpool); @@ -21,6 +20,9 @@ mysql_get_db_connection (DBConnPool *vpool, GError **error); void mysql_db_connection_close (DBConnection *vconn); +gboolean +mysql_db_connection_ping (DBConnection *vconn); + gboolean mysql_db_connection_execute (DBConnection *vconn, const char *sql, GError **error); diff --git a/net/common/db-wrapper/pgsql-db-ops.c b/net/common/db-wrapper/pgsql-db-ops.c index 3a7fed2..7d37755 100644 --- a/net/common/db-wrapper/pgsql-db-ops.c +++ b/net/common/db-wrapper/pgsql-db-ops.c @@ -114,7 +114,6 @@ pgsql_get_db_connection (DBConnPool *vpool, GError **error) conn = g_new0 (PGDBConnection, 1); conn->db = db; - conn->parent.pool = vpool; return (DBConnection *)conn; } @@ -132,6 +131,14 @@ pgsql_db_connection_close (DBConnection *vconn) g_free (conn); } +gboolean +pgsql_db_connection_ping (DBConnection *vconn) +{ + PGDBConnection *conn = (PGDBConnection *)vconn; + + return (PQstatus(conn->db) == CONNECTION_OK); +} + gboolean pgsql_db_connection_execute (DBConnection *vconn, const char *sql, GError **error) { diff --git a/net/common/db-wrapper/pgsql-db-ops.h b/net/common/db-wrapper/pgsql-db-ops.h index 2d87c7d..c8e884e 100644 --- a/net/common/db-wrapper/pgsql-db-ops.h +++ b/net/common/db-wrapper/pgsql-db-ops.h @@ -17,6 +17,9 @@ pgsql_get_db_connection (DBConnPool *vpool, GError **error); void pgsql_db_connection_close (DBConnection *vconn); +gboolean +pgsql_db_connection_ping (DBConnection *vconn); + gboolean pgsql_db_connection_execute (DBConnection *vconn, const char *sql, GError **error); diff --git a/net/common/db-wrapper/sqlite-db-ops.c b/net/common/db-wrapper/sqlite-db-ops.c index 7de7151..e746aa1 100644 --- a/net/common/db-wrapper/sqlite-db-ops.c +++ b/net/common/db-wrapper/sqlite-db-ops.c @@ -91,15 +91,13 @@ sqlite3_blocking_exec(sqlite3 *db, const char *sql, int (*callback)(void *, int, typedef struct SQLiteDBConnPool { DBConnPool parent; char *db_path; - int max_connections; } SQLiteDBConnPool; DBConnPool * -sqlite_db_conn_pool_new (const char *db_path, int max_connections) +sqlite_db_conn_pool_new (const char *db_path) { SQLiteDBConnPool *pool = g_new0 (SQLiteDBConnPool, 1); pool->db_path = g_strdup(db_path); - pool->max_connections = max_connections; return (DBConnPool *)pool; } @@ -158,6 +156,12 @@ sqlite_db_connection_close (DBConnection *vconn) g_free (conn); } +gboolean +sqlite_db_connection_ping (DBConnection *vconn) +{ + return TRUE; +} + gboolean sqlite_db_connection_execute (DBConnection *vconn, const char *sql, GError **error) { diff --git a/net/common/db-wrapper/sqlite-db-ops.h b/net/common/db-wrapper/sqlite-db-ops.h index 13f9835..9fb1bd8 100644 --- a/net/common/db-wrapper/sqlite-db-ops.h +++ b/net/common/db-wrapper/sqlite-db-ops.h @@ -2,7 +2,7 @@ #define SQLITE_DB_OPS_H DBConnPool * -sqlite_db_conn_pool_new (const char *db_path, int max_connections); +sqlite_db_conn_pool_new (const char *db_path); void sqlite_db_conn_pool_free (DBConnPool *vpool); @@ -13,6 +13,9 @@ sqlite_get_db_connection (DBConnPool *vpool, GError **error); void sqlite_db_connection_close (DBConnection *vconn); +gboolean +sqlite_db_connection_ping (DBConnection *vconn); + gboolean sqlite_db_connection_execute (DBConnection *vconn, const char *sql, GError **error); diff --git a/net/server/server-session.c b/net/server/server-session.c index 24cd4aa..d08e265 100644 --- a/net/server/server-session.c +++ b/net/server/server-session.c @@ -216,7 +216,8 @@ static int init_pgsql_database (CcnetSession *session) unix_socket = ccnet_key_file_get_string (session->keyf, "Database", "UNIX_SOCKET"); - session->db = ccnet_db_new_pgsql (host, user, passwd, db, unix_socket); + session->db = ccnet_db_new_pgsql (host, user, passwd, db, unix_socket, + DEFAULT_MAX_CONNECTIONS); if (!session->db) { g_warning ("Failed to open database.\n"); return -1;