1
0
mirror of https://github.com/haiwen/ccnet-server.git synced 2025-04-28 02:30:08 +00:00
ccnet-server/net/common/ccnet-db.c
2019-07-23 15:24:28 +08:00

1262 lines
31 KiB
C

#include "common.h"
#include "log.h"
#include "ccnet-db.h"
#include <stdarg.h>
#ifdef HAVE_MYSQL
#include <mysql.h>
#endif
#include <sqlite3.h>
#include <pthread.h>
struct CcnetDB {
int type;
};
typedef struct DBConnection {
/* Empty */
} DBConnection;
struct CcnetDBRow {
/* Empty */
};
struct CcnetDBTrans {
DBConnection *conn;
};
typedef struct DBOperations {
DBConnection* (*get_connection)(CcnetDB *db);
void (*release_connection)(DBConnection *conn);
int (*execute_sql_no_stmt)(DBConnection *conn, const char *sql);
int (*execute_sql)(DBConnection *conn, const char *sql,
int n, va_list args);
int (*query_foreach_row)(DBConnection *conn,
const char *sql, CcnetDBRowFunc callback, void *data,
int n, va_list args);
int (*row_get_column_count)(CcnetDBRow *row);
const char* (*row_get_column_string)(CcnetDBRow *row, int idx);
int (*row_get_column_int)(CcnetDBRow *row, int idx);
gint64 (*row_get_column_int64)(CcnetDBRow *row, int idx);
} DBOperations;
static DBOperations db_ops;
#ifdef HAVE_MYSQL
/* MySQL Ops */
static CcnetDB *
mysql_db_new (const char *host,
int port,
const char *user,
const char *password,
const char *db_name,
const char *unix_socket,
gboolean use_ssl,
const char *charset);
static DBConnection *
mysql_db_get_connection (CcnetDB *db);
static void
mysql_db_release_connection (DBConnection *vconn);
static int
mysql_db_execute_sql_no_stmt (DBConnection *vconn, const char *sql);
static int
mysql_db_execute_sql (DBConnection *vconn, const char *sql, int n, va_list args);
static int
mysql_db_query_foreach_row (DBConnection *vconn, const char *sql,
CcnetDBRowFunc callback, void *data,
int n, va_list args);
static int
mysql_db_row_get_column_count (CcnetDBRow *row);
static const char *
mysql_db_row_get_column_string (CcnetDBRow *row, int idx);
static int
mysql_db_row_get_column_int (CcnetDBRow *row, int idx);
static gint64
mysql_db_row_get_column_int64 (CcnetDBRow *row, int idx);
CcnetDB *
ccnet_db_new_mysql (const char *host,
int port,
const char *user,
const char *passwd,
const char *db_name,
const char *unix_socket,
gboolean use_ssl,
const char *charset,
int max_connections)
{
CcnetDB *db;
db = mysql_db_new (host, port, user, passwd, db_name, unix_socket, use_ssl, charset);
if (!db)
return NULL;
db->type = CCNET_DB_TYPE_MYSQL;
db_ops.get_connection = mysql_db_get_connection;
db_ops.release_connection = mysql_db_release_connection;
db_ops.execute_sql_no_stmt = mysql_db_execute_sql_no_stmt;
db_ops.execute_sql = mysql_db_execute_sql;
db_ops.query_foreach_row = mysql_db_query_foreach_row;
db_ops.row_get_column_count = mysql_db_row_get_column_count;
db_ops.row_get_column_string = mysql_db_row_get_column_string;
db_ops.row_get_column_int = mysql_db_row_get_column_int;
db_ops.row_get_column_int64 = mysql_db_row_get_column_int64;
return db;
}
#endif
/* SQLite Ops */
static CcnetDB *
sqlite_db_new (const char *db_path);
static DBConnection *
sqlite_db_get_connection (CcnetDB *db);
static void
sqlite_db_release_connection (DBConnection *vconn);
static int
sqlite_db_execute_sql_no_stmt (DBConnection *vconn, const char *sql);
static int
sqlite_db_execute_sql (DBConnection *vconn, const char *sql, int n, va_list args);
static int
sqlite_db_query_foreach_row (DBConnection *vconn, const char *sql,
CcnetDBRowFunc callback, void *data,
int n, va_list args);
static int
sqlite_db_row_get_column_count (CcnetDBRow *row);
static const char *
sqlite_db_row_get_column_string (CcnetDBRow *row, int idx);
static int
sqlite_db_row_get_column_int (CcnetDBRow *row, int idx);
static gint64
sqlite_db_row_get_column_int64 (CcnetDBRow *row, int idx);
CcnetDB *
ccnet_db_new_sqlite (const char *db_path)
{
CcnetDB *db;
db = sqlite_db_new (db_path);
if (!db)
return NULL;
db->type = CCNET_DB_TYPE_SQLITE;
db_ops.get_connection = sqlite_db_get_connection;
db_ops.release_connection = sqlite_db_release_connection;
db_ops.execute_sql_no_stmt = sqlite_db_execute_sql_no_stmt;
db_ops.execute_sql = sqlite_db_execute_sql;
db_ops.query_foreach_row = sqlite_db_query_foreach_row;
db_ops.row_get_column_count = sqlite_db_row_get_column_count;
db_ops.row_get_column_string = sqlite_db_row_get_column_string;
db_ops.row_get_column_int = sqlite_db_row_get_column_int;
db_ops.row_get_column_int64 = sqlite_db_row_get_column_int64;
return db;
}
int
ccnet_db_type (CcnetDB *db)
{
return db->type;
}
int
ccnet_db_query (CcnetDB *db, const char *sql)
{
DBConnection *conn = db_ops.get_connection (db);
if (!conn)
return -1;
int ret;
ret = db_ops.execute_sql_no_stmt (conn, sql);
db_ops.release_connection (conn);
return ret;
}
gboolean
ccnet_db_check_for_existence (CcnetDB *db, const char *sql, gboolean *db_err)
{
return ccnet_db_statement_exists (db, sql, db_err, 0);
}
int
ccnet_db_foreach_selected_row (CcnetDB *db, const char *sql,
CcnetDBRowFunc callback, void *data)
{
return ccnet_db_statement_foreach_row (db, sql, callback, data, 0);
}
const char *
ccnet_db_row_get_column_text (CcnetDBRow *row, guint32 idx)
{
g_return_val_if_fail (idx < db_ops.row_get_column_count(row), NULL);
return db_ops.row_get_column_string (row, idx);
}
int
ccnet_db_row_get_column_int (CcnetDBRow *row, guint32 idx)
{
g_return_val_if_fail (idx < db_ops.row_get_column_count(row), -1);
return db_ops.row_get_column_int (row, idx);
}
gint64
ccnet_db_row_get_column_int64 (CcnetDBRow *row, guint32 idx)
{
g_return_val_if_fail (idx < db_ops.row_get_column_count(row), -1);
return db_ops.row_get_column_int64 (row, idx);
}
int
ccnet_db_get_int (CcnetDB *db, const char *sql)
{
return ccnet_db_statement_get_int (db, sql, 0);
}
gint64
ccnet_db_get_int64 (CcnetDB *db, const char *sql)
{
return ccnet_db_statement_get_int64 (db, sql, 0);
}
char *
ccnet_db_get_string (CcnetDB *db, const char *sql)
{
return ccnet_db_statement_get_string (db, sql, 0);
}
int
ccnet_db_statement_query (CcnetDB *db, const char *sql, int n, ...)
{
int ret;
DBConnection *conn = NULL;
conn = db_ops.get_connection (db);
if (!conn)
return -1;
va_list args;
va_start (args, n);
ret = db_ops.execute_sql (conn, sql, n, args);
va_end (args);
db_ops.release_connection (conn);
return ret;
}
gboolean
ccnet_db_statement_exists (CcnetDB *db, const char *sql, gboolean *db_err, int n, ...)
{
int n_rows;
DBConnection *conn = NULL;
conn = db_ops.get_connection(db);
if (!conn)
return FALSE;
va_list args;
va_start (args, n);
n_rows = db_ops.query_foreach_row (conn, sql, NULL, NULL, n, args);
va_end (args);
db_ops.release_connection(conn);
if (n_rows < 0) {
*db_err = TRUE;
return FALSE;
} else {
*db_err = FALSE;
return (n_rows != 0);
}
}
int
ccnet_db_statement_foreach_row (CcnetDB *db, const char *sql,
CcnetDBRowFunc callback, void *data,
int n, ...)
{
int ret;
DBConnection *conn = NULL;
conn = db_ops.get_connection (db);
if (!conn)
return -1;
va_list args;
va_start (args, n);
ret = db_ops.query_foreach_row (conn, sql, callback, data, n, args);
va_end (args);
db_ops.release_connection (conn);
return ret;
}
static gboolean
get_int_cb (CcnetDBRow *row, void *data)
{
int *pret = (int*)data;
*pret = ccnet_db_row_get_column_int (row, 0);
return FALSE;
}
int
ccnet_db_statement_get_int (CcnetDB *db, const char *sql, int n, ...)
{
int ret = -1;
int rc;
DBConnection *conn = NULL;
conn = db_ops.get_connection (db);
if (!conn)
return -1;
va_list args;
va_start (args, n);
rc = db_ops.query_foreach_row (conn, sql, get_int_cb, &ret, n, args);
va_end (args);
db_ops.release_connection (conn);
if (rc < 0)
return -1;
return ret;
}
static gboolean
get_int64_cb (CcnetDBRow *row, void *data)
{
gint64 *pret = (gint64*)data;
*pret = ccnet_db_row_get_column_int64 (row, 0);
return FALSE;
}
gint64
ccnet_db_statement_get_int64 (CcnetDB *db, const char *sql, int n, ...)
{
gint64 ret = -1;
int rc;
DBConnection *conn = NULL;
conn = db_ops.get_connection (db);
if (!conn)
return -1;
va_list args;
va_start (args, n);
rc = db_ops.query_foreach_row (conn, sql, get_int64_cb, &ret, n, args);
va_end(args);
db_ops.release_connection (conn);
if (rc < 0)
return -1;
return ret;
}
static gboolean
get_string_cb (CcnetDBRow *row, void *data)
{
char **pret = (char**)data;
*pret = g_strdup(ccnet_db_row_get_column_text (row, 0));
return FALSE;
}
char *
ccnet_db_statement_get_string (CcnetDB *db, const char *sql, int n, ...)
{
char *ret = NULL;
int rc;
DBConnection *conn = NULL;
conn = db_ops.get_connection (db);
if (!conn)
return NULL;
va_list args;
va_start (args, n);
rc = db_ops.query_foreach_row (conn, sql, get_string_cb, &ret, n, args);
va_end(args);
db_ops.release_connection (conn);
if (rc < 0)
return NULL;
return ret;
}
/* Transaction */
CcnetDBTrans *
ccnet_db_begin_transaction (CcnetDB *db)
{
CcnetDBTrans *trans = NULL;
DBConnection *conn = db_ops.get_connection(db);
if (!conn) {
return trans;
}
if (db_ops.execute_sql_no_stmt (conn, "BEGIN") < 0) {
db_ops.release_connection (conn);
return trans;
}
trans = g_new0 (CcnetDBTrans, 1);
trans->conn = conn;
return trans;
}
void
ccnet_db_trans_close (CcnetDBTrans *trans)
{
db_ops.release_connection (trans->conn);
g_free (trans);
}
int
ccnet_db_commit (CcnetDBTrans *trans)
{
DBConnection *conn = trans->conn;
if (db_ops.execute_sql_no_stmt (conn, "COMMIT") < 0) {
return -1;
}
return 0;
}
int
ccnet_db_rollback (CcnetDBTrans *trans)
{
DBConnection *conn = trans->conn;
if (db_ops.execute_sql_no_stmt (conn, "ROLLBACK") < 0) {
return -1;
}
return 0;
}
int
ccnet_db_trans_query (CcnetDBTrans *trans, const char *sql, int n, ...)
{
int ret;
va_list args;
va_start (args, n);
ret = db_ops.execute_sql (trans->conn, sql, n, args);
va_end (args);
return ret;
}
gboolean
ccnet_db_trans_check_for_existence (CcnetDBTrans *trans,
const char *sql,
gboolean *db_err,
int n, ...)
{
int n_rows;
va_list args;
va_start (args, n);
n_rows = db_ops.query_foreach_row (trans->conn, sql, NULL, NULL, n, args);
va_end (args);
if (n_rows < 0) {
*db_err = TRUE;
return FALSE;
} else {
*db_err = FALSE;
return (n_rows != 0);
}
}
int
ccnet_db_trans_foreach_selected_row (CcnetDBTrans *trans, const char *sql,
CcnetDBRowFunc callback, void *data,
int n, ...)
{
int ret;
va_list args;
va_start (args, n);
ret = db_ops.query_foreach_row (trans->conn, sql, callback, data, n, args);
va_end (args);
return ret;
}
#ifdef HAVE_MYSQL
/* MySQL DB */
typedef struct MySQLDB {
struct CcnetDB parent;
char *host;
char *user;
char *password;
unsigned int port;
char *db_name;
char *unix_socket;
gboolean use_ssl;
char *charset;
} MySQLDB;
typedef struct MySQLDBConnection {
struct DBConnection parent;
MYSQL *db_conn;
} MySQLDBConnection;
static CcnetDB *
mysql_db_new (const char *host,
int port,
const char *user,
const char *password,
const char *db_name,
const char *unix_socket,
gboolean use_ssl,
const char *charset)
{
MySQLDB *db = g_new0 (MySQLDB, 1);
db->host = g_strdup (host);
db->user = g_strdup (user);
db->password = g_strdup (password);
db->port = port;
db->db_name = g_strdup(db_name);
db->unix_socket = g_strdup(unix_socket);
db->use_ssl = use_ssl;
db->charset = g_strdup(charset);
mysql_library_init (0, NULL, NULL);
return (CcnetDB *)db;
}
static DBConnection *
mysql_db_get_connection (CcnetDB *vdb)
{
MySQLDB *db = (MySQLDB *)vdb;
my_bool yes = 1;
int conn_timeout = 1;
MYSQL *db_conn;
MySQLDBConnection *conn = NULL;
db_conn = mysql_init (NULL);
if (!db_conn) {
ccnet_warning ("Failed to init mysql connection object.\n");
return NULL;
}
if (db->use_ssl)
mysql_ssl_set(db_conn, 0,0,0,0,0);
if (db->charset)
mysql_options(db_conn, MYSQL_SET_CHARSET_NAME, db->charset);
mysql_options(db_conn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&conn_timeout);
mysql_options(db_conn, MYSQL_OPT_RECONNECT, (const char*)&yes);
if (!mysql_real_connect(db_conn, db->host, db->user, db->password,
db->db_name, db->port,
db->unix_socket, CLIENT_MULTI_STATEMENTS)) {
ccnet_warning ("Failed to connect to MySQL: %s\n", mysql_error(db_conn));
mysql_close (db_conn);
return NULL;
}
conn = g_new0 (MySQLDBConnection, 1);
conn->db_conn = db_conn;
return (DBConnection *)conn;
}
static void
mysql_db_release_connection (DBConnection *vconn)
{
if (!vconn)
return;
MySQLDBConnection *conn = (MySQLDBConnection *)vconn;
mysql_close (conn->db_conn);
g_free (conn);
}
static int
mysql_db_execute_sql_no_stmt (DBConnection *vconn, const char *sql)
{
MySQLDBConnection *conn = (MySQLDBConnection *)vconn;
if (mysql_query (conn->db_conn, sql) != 0) {
ccnet_warning ("Failed to execute sql %s: %s\n", sql, mysql_error(conn->db_conn));
return -1;
}
return 0;
}
static MYSQL_STMT *
_prepare_stmt_mysql (MYSQL *db, const char *sql)
{
MYSQL_STMT *stmt;
stmt = mysql_stmt_init (db);
if (!stmt) {
ccnet_warning ("mysql_stmt_init failed.\n");
return NULL;
}
if (mysql_stmt_prepare (stmt, sql, strlen(sql)) != 0) {
ccnet_warning ("Failed to prepare sql %s: %s\n", sql, mysql_stmt_error(stmt));
mysql_stmt_close (stmt);
return NULL;
}
return stmt;
}
static int
_bind_params_mysql (MYSQL_STMT *stmt, MYSQL_BIND *params, int n, va_list args)
{
int i;
const char *type;
for (i = 0; i < n; ++i) {
type = va_arg (args, const char *);
if (strcmp(type, "int") == 0) {
int x = va_arg (args, int);
int *pval = g_new (int, 1);
*pval = x;
params[i].buffer_type = MYSQL_TYPE_LONG;
params[i].buffer = pval;
params[i].is_null = 0;
} else if (strcmp (type, "int64") == 0) {
gint64 x = va_arg (args, gint64);
gint64 *pval = g_new (gint64, 1);
*pval = x;
params[i].buffer_type = MYSQL_TYPE_LONGLONG;
params[i].buffer = pval;
params[i].is_null = 0;
} else if (strcmp (type, "string") == 0) {
const char *s = va_arg (args, const char *);
static my_bool yes = TRUE;
params[i].buffer_type = MYSQL_TYPE_STRING;
params[i].buffer = g_strdup(s);
unsigned long *plen = g_new (unsigned long, 1);
params[i].length = plen;
if (!s) {
*plen = 0;
params[i].buffer_length = 0;
params[i].is_null = &yes;
} else {
*plen = strlen(s);
params[i].buffer_length = *plen + 1;
params[i].is_null = 0;
}
} else {
ccnet_warning ("BUG: invalid prep stmt parameter type %s.\n", type);
g_return_val_if_reached (-1);
}
}
if (mysql_stmt_bind_param (stmt, params) != 0) {
return -1;
}
return 0;
}
static int
mysql_db_execute_sql (DBConnection *vconn, const char *sql, int n, va_list args)
{
MySQLDBConnection *conn = (MySQLDBConnection *)vconn;
MYSQL *db = conn->db_conn;
MYSQL_STMT *stmt = NULL;
MYSQL_BIND *params = NULL;
int ret = 0;
stmt = _prepare_stmt_mysql (db, sql);
if (!stmt) {
return -1;
}
if (n > 0) {
params = g_new0 (MYSQL_BIND, n);
if (_bind_params_mysql (stmt, params, n, args) < 0) {
ccnet_warning ("Failed to bind parameters for %s: %s.\n",
sql, mysql_stmt_error(stmt));
ret = -1;
goto out;
}
}
if (mysql_stmt_execute (stmt) != 0) {
ccnet_warning ("Failed to execute sql %s: %s\n", sql, mysql_stmt_error(stmt));
ret = -1;
goto out;
}
out:
if (stmt)
mysql_stmt_close (stmt);
if (params) {
int i;
for (i = 0; i < n; ++i) {
g_free (params[i].buffer);
g_free (params[i].length);
}
g_free (params);
}
return ret;
}
typedef struct MySQLDBRow {
CcnetDBRow parent;
int column_count;
MYSQL_STMT *stmt;
MYSQL_BIND *results;
/* Used when returned columns are truncated. */
MYSQL_BIND *new_binds;
} MySQLDBRow;
#define DEFAULT_MYSQL_COLUMN_SIZE 1024
static int
mysql_db_query_foreach_row (DBConnection *vconn, const char *sql,
CcnetDBRowFunc callback, void *data,
int n, va_list args)
{
MySQLDBConnection *conn = (MySQLDBConnection *)vconn;
MYSQL *db = conn->db_conn;
MYSQL_STMT *stmt = NULL;
MYSQL_BIND *params = NULL;
MySQLDBRow row;
int nrows = 0;
int i;
memset (&row, 0, sizeof(row));
stmt = _prepare_stmt_mysql (db, sql);
if (!stmt) {
return -1;
}
if (n > 0) {
params = g_new0 (MYSQL_BIND, n);
if (_bind_params_mysql (stmt, params, n, args) < 0) {
nrows = -1;
goto out;
}
}
if (mysql_stmt_execute (stmt) != 0) {
ccnet_warning ("Failed to execute sql %s: %s\n", sql, mysql_stmt_error(stmt));
nrows = -1;
goto out;
}
row.column_count = mysql_stmt_field_count (stmt);
row.stmt = stmt;
row.results = g_new0 (MYSQL_BIND, row.column_count);
for (i = 0; i < row.column_count; ++i) {
row.results[i].buffer = g_malloc (DEFAULT_MYSQL_COLUMN_SIZE + 1);
/* Ask MySQL to convert fields to string, to avoid the trouble of
* checking field types.
*/
row.results[i].buffer_type = MYSQL_TYPE_STRING;
row.results[i].buffer_length = DEFAULT_MYSQL_COLUMN_SIZE;
row.results[i].length = g_new0 (unsigned long, 1);
row.results[i].is_null = g_new0 (my_bool, 1);
}
row.new_binds = g_new0 (MYSQL_BIND, row.column_count);
if (mysql_stmt_bind_result (stmt, row.results) != 0) {
ccnet_warning ("Failed to bind result for sql %s: %s\n", sql, mysql_stmt_error(stmt));
nrows = -1;
goto out;
}
int rc;
gboolean next_row = TRUE;
while (1) {
rc = mysql_stmt_fetch (stmt);
if (rc == 1) {
ccnet_warning ("Failed to fetch result for sql %s: %s\n",
sql, mysql_stmt_error(stmt));
nrows = -1;
goto out;
}
if (rc == MYSQL_NO_DATA)
break;
/* rc == 0 or rc == MYSQL_DATA_TRUNCATED */
++nrows;
if (callback)
next_row = callback ((CcnetDBRow *)&row, data);
for (i = 0; i < row.column_count; ++i) {
g_free (row.new_binds[i].buffer);
g_free (row.new_binds[i].length);
g_free (row.new_binds[i].is_null);
memset (&row.new_binds[i], 0, sizeof(MYSQL_BIND));
}
if (!next_row)
break;
}
out:
if (stmt) {
mysql_stmt_free_result (stmt);
mysql_stmt_close (stmt);
}
if (params) {
for (i = 0; i < n; ++i) {
g_free (params[i].buffer);
g_free (params[i].length);
}
g_free (params);
}
if (row.results) {
for (i = 0; i < row.column_count; ++i) {
g_free (row.results[i].buffer);
g_free (row.results[i].length);
g_free (row.results[i].is_null);
}
g_free (row.results);
}
if (row.new_binds) {
for (i = 0; i < row.column_count; ++i) {
g_free (row.new_binds[i].buffer);
g_free (row.new_binds[i].length);
g_free (row.new_binds[i].is_null);
}
g_free (row.new_binds);
}
return nrows;
}
static int
mysql_db_row_get_column_count (CcnetDBRow *vrow)
{
MySQLDBRow *row = (MySQLDBRow *)vrow;
return row->column_count;
}
static const char *
mysql_db_row_get_column_string (CcnetDBRow *vrow, int i)
{
MySQLDBRow *row = (MySQLDBRow *)vrow;
if (*(row->results[i].is_null)) {
return NULL;
}
char *ret = NULL;
unsigned long real_length = *(row->results[i].length);
/* If column size is larger then allocated buffer size, re-allocate a new buffer
* and fetch the column directly.
*/
if (real_length > row->results[i].buffer_length) {
row->new_binds[i].buffer = g_malloc (real_length + 1);
row->new_binds[i].buffer_type = MYSQL_TYPE_STRING;
row->new_binds[i].buffer_length = real_length;
row->new_binds[i].length = g_new0 (unsigned long, 1);
row->new_binds[i].is_null = g_new0 (my_bool, 1);
if (mysql_stmt_fetch_column (row->stmt, &row->new_binds[i], i, 0) != 0) {
ccnet_warning ("Faield to fetch column: %s\n", mysql_stmt_error(row->stmt));
return NULL;
}
ret = row->new_binds[i].buffer;
} else {
ret = row->results[i].buffer;
}
ret[real_length] = 0;
return ret;
}
static int
mysql_db_row_get_column_int (CcnetDBRow *vrow, int idx)
{
const char *str;
char *e;
int ret;
str = mysql_db_row_get_column_string (vrow, idx);
if (!str) {
return 0;
}
errno = 0;
ret = strtol (str, &e, 10);
if (errno || (e == str)) {
ccnet_warning ("Number conversion failed.\n");
return -1;
}
return ret;
}
static gint64
mysql_db_row_get_column_int64 (CcnetDBRow *vrow, int idx)
{
const char *str;
char *e;
gint64 ret;
str = mysql_db_row_get_column_string (vrow, idx);
if (!str) {
return 0;
}
errno = 0;
ret = strtoll (str, &e, 10);
if (errno || (e == str)) {
ccnet_warning ("Number conversion failed.\n");
return -1;
}
return ret;
}
#endif /* HAVE_MYSQL */
/* SQLite DB */
/* SQLite thread synchronization rountines.
* See https://www.sqlite.org/unlock_notify.html
*/
typedef struct UnlockNotification {
int fired;
pthread_cond_t cond;
pthread_mutex_t mutex;
} UnlockNotification;
static void
unlock_notify_cb(void **ap_arg, int n_arg)
{
int i;
for (i = 0; i < n_arg; i++) {
UnlockNotification *p = (UnlockNotification *)ap_arg[i];
pthread_mutex_lock (&p->mutex);
p->fired = 1;
pthread_cond_signal (&p->cond);
pthread_mutex_unlock (&p->mutex);
}
}
static int
wait_for_unlock_notify(sqlite3 *db)
{
UnlockNotification un;
un.fired = 0;
pthread_mutex_init (&un.mutex, NULL);
pthread_cond_init (&un.cond, NULL);
int rc = sqlite3_unlock_notify(db, unlock_notify_cb, (void *)&un);
if (rc == SQLITE_OK) {
pthread_mutex_lock(&un.mutex);
if (!un.fired)
pthread_cond_wait (&un.cond, &un.mutex);
pthread_mutex_unlock(&un.mutex);
}
pthread_cond_destroy (&un.cond);
pthread_mutex_destroy (&un.mutex);
return rc;
}
static int
sqlite3_blocking_step(sqlite3_stmt *stmt)
{
int rc;
while (SQLITE_LOCKED == (rc = sqlite3_step(stmt))) {
rc = wait_for_unlock_notify(sqlite3_db_handle(stmt));
if (rc != SQLITE_OK)
break;
sqlite3_reset(stmt);
}
return rc;
}
static int
sqlite3_blocking_prepare_v2(sqlite3 *db, const char *sql, int sql_len, sqlite3_stmt **pstmt, const char **pz)
{
int rc;
while (SQLITE_LOCKED == (rc = sqlite3_prepare_v2(db, sql, sql_len, pstmt, pz))) {
rc = wait_for_unlock_notify(db);
if (rc != SQLITE_OK)
break;
}
return rc;
}
static int
sqlite3_blocking_exec(sqlite3 *db, const char *sql, int (*callback)(void *, int, char **, char **), void *arg, char **errmsg)
{
int rc;
while (SQLITE_LOCKED == (rc = sqlite3_exec(db, sql, callback, arg, errmsg))) {
rc = wait_for_unlock_notify(db);
if (rc != SQLITE_OK)
break;
}
return rc;
}
typedef struct SQLiteDB {
CcnetDB parent;
char *db_path;
} SQLiteDB;
typedef struct SQLiteDBConnection {
DBConnection parent;
sqlite3 *db_conn;
} SQLiteDBConnection;
static CcnetDB *
sqlite_db_new (const char *db_path)
{
SQLiteDB *db = g_new0 (SQLiteDB, 1);
db->db_path = g_strdup(db_path);
return (CcnetDB *)db;
}
static DBConnection *
sqlite_db_get_connection (CcnetDB *vdb)
{
SQLiteDB *db = (SQLiteDB *)vdb;
sqlite3 *db_conn;
int result;
const char *errmsg;
SQLiteDBConnection *conn;
result = sqlite3_open_v2 (db->db_path, &db_conn, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_SHAREDCACHE, NULL);
if (result != SQLITE_OK) {
errmsg = sqlite3_errmsg(db_conn);
ccnet_warning ("Failed to open sqlite db: %s\n", errmsg ? errmsg : "no error given");
return NULL;
}
conn = g_new0 (SQLiteDBConnection, 1);
conn->db_conn = db_conn;
return (DBConnection *)conn;
}
static void
sqlite_db_release_connection (DBConnection *vconn)
{
if (!vconn)
return;
SQLiteDBConnection *conn = (SQLiteDBConnection *)vconn;
sqlite3_close (conn->db_conn);
g_free (conn);
}
static int
sqlite_db_execute_sql_no_stmt (DBConnection *vconn, const char *sql)
{
SQLiteDBConnection *conn = (SQLiteDBConnection *)vconn;
char *errmsg = NULL;
int rc;
rc = sqlite3_blocking_exec (conn->db_conn, sql, NULL, NULL, &errmsg);
if (rc != SQLITE_OK) {
ccnet_warning ("sqlite3_exec failed %s: %s", sql, errmsg ? errmsg : "no error given");
if (errmsg)
sqlite3_free (errmsg);
return -1;
}
return 0;
}
static int
_bind_parameters_sqlite (sqlite3 *db, sqlite3_stmt *stmt, int n, va_list args)
{
int i;
const char *type;
for (i = 0; i < n; ++i) {
type = va_arg (args, const char *);
if (strcmp(type, "int") == 0) {
int x = va_arg (args, int);
if (sqlite3_bind_int (stmt, i+1, x) != SQLITE_OK) {
ccnet_warning ("sqlite3_bind_int failed: %s\n", sqlite3_errmsg(db));
return -1;
}
} else if (strcmp (type, "int64") == 0) {
gint64 x = va_arg (args, gint64);
if (sqlite3_bind_int64 (stmt, i+1, x) != SQLITE_OK) {
ccnet_warning ("sqlite3_bind_int64 failed: %s\n", sqlite3_errmsg(db));
return -1;
}
} else if (strcmp (type, "string") == 0) {
const char *s = va_arg (args, const char *);
if (sqlite3_bind_text (stmt, i+1, s, -1, SQLITE_TRANSIENT) != SQLITE_OK) {
ccnet_warning ("sqlite3_bind_text failed: %s\n", sqlite3_errmsg(db));
return -1;
}
} else {
ccnet_warning ("BUG: invalid prep stmt parameter type %s.\n", type);
g_return_val_if_reached (-1);
}
}
return 0;
}
static int
sqlite_db_execute_sql (DBConnection *vconn, const char *sql, int n, va_list args)
{
SQLiteDBConnection *conn = (SQLiteDBConnection *)vconn;
sqlite3 *db = conn->db_conn;
sqlite3_stmt *stmt;
int rc;
int ret = 0;
rc = sqlite3_blocking_prepare_v2 (db, sql, -1, &stmt, NULL);
if (rc != SQLITE_OK) {
ccnet_warning ("sqlite3_prepare_v2 failed %s: %s", sql, sqlite3_errmsg(db));
return -1;
}
if (_bind_parameters_sqlite (db, stmt, n, args) < 0) {
ccnet_warning ("Failed to bind parameters for sql %s\n", sql);
ret = -1;
goto out;
}
rc = sqlite3_blocking_step (stmt);
if (rc != SQLITE_DONE) {
ccnet_warning ("sqlite3_step failed %s: %s", sql, sqlite3_errmsg(db));
ret = -1;
goto out;
}
out:
sqlite3_finalize (stmt);
return ret;
}
typedef struct SQLiteDBRow {
CcnetDBRow parent;
int column_count;
sqlite3 *db;
sqlite3_stmt *stmt;
} SQLiteDBRow;
static int
sqlite_db_query_foreach_row (DBConnection *vconn, const char *sql,
CcnetDBRowFunc callback, void *data,
int n, va_list args)
{
SQLiteDBConnection *conn = (SQLiteDBConnection *)vconn;
sqlite3 *db = conn->db_conn;
sqlite3_stmt *stmt;
int rc;
int nrows = 0;
rc = sqlite3_blocking_prepare_v2 (db, sql, -1, &stmt, NULL);
if (rc != SQLITE_OK) {
ccnet_warning ("sqlite3_prepare_v2 failed %s: %s", sql, sqlite3_errmsg(db));
return -1;
}
if (_bind_parameters_sqlite (db, stmt, n, args) < 0) {
ccnet_warning ("Failed to bind parameters for sql %s\n", sql);
nrows = -1;
goto out;
}
SQLiteDBRow row;
memset (&row, 0, sizeof(row));
row.db = db;
row.stmt = stmt;
row.column_count = sqlite3_column_count (stmt);
while (1) {
rc = sqlite3_blocking_step (stmt);
if (rc == SQLITE_ROW) {
++nrows;
if (callback && !callback ((CcnetDBRow *)&row, data))
break;
} else if (rc == SQLITE_DONE) {
break;
} else {
ccnet_warning ("sqlite3_step failed %s: %s\n", sql, sqlite3_errmsg(db));
nrows = -1;
goto out;
}
}
out:
sqlite3_finalize (stmt);
return nrows;
}
static int
sqlite_db_row_get_column_count (CcnetDBRow *vrow)
{
SQLiteDBRow *row = (SQLiteDBRow *)vrow;
return row->column_count;
}
static const char *
sqlite_db_row_get_column_string (CcnetDBRow *vrow, int idx)
{
SQLiteDBRow *row = (SQLiteDBRow *)vrow;
return (const char *)sqlite3_column_text (row->stmt, idx);
}
static int
sqlite_db_row_get_column_int (CcnetDBRow *vrow, int idx)
{
SQLiteDBRow *row = (SQLiteDBRow *)vrow;
return sqlite3_column_int (row->stmt, idx);
}
static gint64
sqlite_db_row_get_column_int64 (CcnetDBRow *vrow, int idx)
{
SQLiteDBRow *row = (SQLiteDBRow *)vrow;
return sqlite3_column_int64 (row->stmt, idx);
}