diff --git a/net/common/ccnet-db.c b/net/common/ccnet-db.c index d6ecd3c..19e3a68 100644 --- a/net/common/ccnet-db.c +++ b/net/common/ccnet-db.c @@ -12,12 +12,21 @@ #include #include +struct DBConnPool { + GPtrArray *connections; + pthread_mutex_t lock; + int max_connections; +}; +typedef struct DBConnPool DBConnPool; + struct CcnetDB { int type; + DBConnPool *pool; }; typedef struct DBConnection { - /* Empty */ + gboolean is_available; + DBConnPool *pool; } DBConnection; struct CcnetDBRow { @@ -77,6 +86,97 @@ static int mysql_db_row_get_column_int (CcnetDBRow *row, int idx); static gint64 mysql_db_row_get_column_int64 (CcnetDBRow *row, int idx); +static gboolean +mysql_db_connection_ping (DBConnection *vconn); +static DBConnPool * + +init_conn_pool_common (int max_connections) +{ + DBConnPool *pool = g_new0(DBConnPool, 1); + pool->connections = g_ptr_array_sized_new (max_connections); + pthread_mutex_init (&pool->lock, NULL); + pool->max_connections = max_connections; + + return pool; +} + +static DBConnection * +mysql_conn_pool_get_connection (CcnetDB *db) +{ + DBConnPool *pool = db->pool; + DBConnection *conn = NULL; + + if (pool->max_connections == 0) { + conn = mysql_db_get_connection (db); + conn->pool = pool; + return conn; + } + + pthread_mutex_lock (&pool->lock); + + guint i, size = pool->connections->len; + for (i = 0; i < size; ++i) { + conn = g_ptr_array_index (pool->connections, i); + if (conn->is_available && mysql_db_connection_ping (conn)) { + conn->is_available = FALSE; + goto out; + } + } + conn = NULL; + if (size < pool->max_connections) { + conn = mysql_db_get_connection (db); + if (conn) { + conn->pool = pool; + conn->is_available = FALSE; + g_ptr_array_add (pool->connections, conn); + } + } + +out: + pthread_mutex_unlock (&pool->lock); + return conn; +} + +static void +mysql_conn_pool_release_connection (DBConnection *conn) +{ + if (!conn) + return; + + if (conn->pool->max_connections == 0) { + mysql_db_release_connection (conn); + return; + } + + pthread_mutex_lock (&conn->pool->lock); + conn->is_available = TRUE; + pthread_mutex_unlock (&conn->pool->lock); +} + +#define KEEPALIVE_INTERVAL 30 +static void * +mysql_conn_keepalive (void *arg) +{ + DBConnPool *pool = arg; + DBConnection *conn = NULL; + + while (1) { + pthread_mutex_lock (&pool->lock); + + guint i, size = pool->connections->len; + for (i = 0; i < size; ++i) { + conn = g_ptr_array_index (pool->connections, i); + if (conn->is_available) { + mysql_db_connection_ping (conn); + } + } + pthread_mutex_unlock (&pool->lock); + + sleep (KEEPALIVE_INTERVAL); + } + + return NULL; +} CcnetDB * ccnet_db_new_mysql (const char *host, @@ -96,8 +196,8 @@ ccnet_db_new_mysql (const char *host, return NULL; db->type = CCNET_DB_TYPE_MYSQL; - db_ops.get_connection = mysql_db_get_connection; - db_ops.release_connection = mysql_db_release_connection; + db_ops.get_connection = mysql_conn_pool_get_connection; + db_ops.release_connection = mysql_conn_pool_release_connection; db_ops.execute_sql_no_stmt = mysql_db_execute_sql_no_stmt; db_ops.execute_sql = mysql_db_execute_sql; db_ops.query_foreach_row = mysql_db_query_foreach_row; @@ -106,6 +206,17 @@ ccnet_db_new_mysql (const char *host, db_ops.row_get_column_int = mysql_db_row_get_column_int; db_ops.row_get_column_int64 = mysql_db_row_get_column_int64; + db->pool = init_conn_pool_common (max_connections); + + pthread_t tid; + int ret = pthread_create (&tid, NULL, mysql_conn_keepalive, db->pool); + if (ret != 0) { + ccnet_warning ("Failed to create mysql connection keepalive thread.\n"); + return NULL; + } + pthread_detach (tid); + + return db; } @@ -527,6 +638,14 @@ typedef struct MySQLDBConnection { MYSQL *db_conn; } MySQLDBConnection; +static gboolean +mysql_db_connection_ping (DBConnection *vconn) +{ + MySQLDBConnection *conn = (MySQLDBConnection *)vconn; + + return (mysql_ping (conn->db_conn) == 0); +} + static CcnetDB * mysql_db_new (const char *host, int port,