#include "common.h" #include "log.h" #include "ccnet-db.h" #include #ifdef HAVE_MYSQL #include #endif #include #include struct CcnetDB { int type; }; typedef struct DBConnection { /* Empty */ } DBConnection; struct CcnetDBRow { /* Empty */ }; struct CcnetDBTrans { DBConnection *conn; }; typedef struct DBOperations { DBConnection* (*get_connection)(CcnetDB *db); void (*release_connection)(DBConnection *conn); int (*execute_sql_no_stmt)(DBConnection *conn, const char *sql); int (*execute_sql)(DBConnection *conn, const char *sql, int n, va_list args); int (*query_foreach_row)(DBConnection *conn, const char *sql, CcnetDBRowFunc callback, void *data, int n, va_list args); int (*row_get_column_count)(CcnetDBRow *row); const char* (*row_get_column_string)(CcnetDBRow *row, int idx); int (*row_get_column_int)(CcnetDBRow *row, int idx); gint64 (*row_get_column_int64)(CcnetDBRow *row, int idx); } DBOperations; static DBOperations db_ops; #ifdef HAVE_MYSQL /* MySQL Ops */ static CcnetDB * mysql_db_new (const char *host, int port, const char *user, const char *password, const char *db_name, const char *unix_socket, gboolean use_ssl, const char *charset); static DBConnection * mysql_db_get_connection (CcnetDB *db); static void mysql_db_release_connection (DBConnection *vconn); static int mysql_db_execute_sql_no_stmt (DBConnection *vconn, const char *sql); static int mysql_db_execute_sql (DBConnection *vconn, const char *sql, int n, va_list args); static int mysql_db_query_foreach_row (DBConnection *vconn, const char *sql, CcnetDBRowFunc callback, void *data, int n, va_list args); static int mysql_db_row_get_column_count (CcnetDBRow *row); static const char * mysql_db_row_get_column_string (CcnetDBRow *row, int idx); static int mysql_db_row_get_column_int (CcnetDBRow *row, int idx); static gint64 mysql_db_row_get_column_int64 (CcnetDBRow *row, int idx); CcnetDB * ccnet_db_new_mysql (const char *host, int port, const char *user, const char *passwd, const char *db_name, const char *unix_socket, gboolean use_ssl, const char *charset, int max_connections) { CcnetDB *db; db = mysql_db_new (host, port, user, passwd, db_name, unix_socket, use_ssl, charset); if (!db) return NULL; db->type = CCNET_DB_TYPE_MYSQL; db_ops.get_connection = mysql_db_get_connection; db_ops.release_connection = mysql_db_release_connection; db_ops.execute_sql_no_stmt = mysql_db_execute_sql_no_stmt; db_ops.execute_sql = mysql_db_execute_sql; db_ops.query_foreach_row = mysql_db_query_foreach_row; db_ops.row_get_column_count = mysql_db_row_get_column_count; db_ops.row_get_column_string = mysql_db_row_get_column_string; db_ops.row_get_column_int = mysql_db_row_get_column_int; db_ops.row_get_column_int64 = mysql_db_row_get_column_int64; return db; } #endif /* SQLite Ops */ static CcnetDB * sqlite_db_new (const char *db_path); static DBConnection * sqlite_db_get_connection (CcnetDB *db); static void sqlite_db_release_connection (DBConnection *vconn); static int sqlite_db_execute_sql_no_stmt (DBConnection *vconn, const char *sql); static int sqlite_db_execute_sql (DBConnection *vconn, const char *sql, int n, va_list args); static int sqlite_db_query_foreach_row (DBConnection *vconn, const char *sql, CcnetDBRowFunc callback, void *data, int n, va_list args); static int sqlite_db_row_get_column_count (CcnetDBRow *row); static const char * sqlite_db_row_get_column_string (CcnetDBRow *row, int idx); static int sqlite_db_row_get_column_int (CcnetDBRow *row, int idx); static gint64 sqlite_db_row_get_column_int64 (CcnetDBRow *row, int idx); CcnetDB * ccnet_db_new_sqlite (const char *db_path) { CcnetDB *db; db = sqlite_db_new (db_path); if (!db) return NULL; db->type = CCNET_DB_TYPE_SQLITE; db_ops.get_connection = sqlite_db_get_connection; db_ops.release_connection = sqlite_db_release_connection; db_ops.execute_sql_no_stmt = sqlite_db_execute_sql_no_stmt; db_ops.execute_sql = sqlite_db_execute_sql; db_ops.query_foreach_row = sqlite_db_query_foreach_row; db_ops.row_get_column_count = sqlite_db_row_get_column_count; db_ops.row_get_column_string = sqlite_db_row_get_column_string; db_ops.row_get_column_int = sqlite_db_row_get_column_int; db_ops.row_get_column_int64 = sqlite_db_row_get_column_int64; return db; } int ccnet_db_type (CcnetDB *db) { return db->type; } int ccnet_db_query (CcnetDB *db, const char *sql) { DBConnection *conn = db_ops.get_connection (db); if (!conn) return -1; int ret; ret = db_ops.execute_sql_no_stmt (conn, sql); db_ops.release_connection (conn); return ret; } gboolean ccnet_db_check_for_existence (CcnetDB *db, const char *sql, gboolean *db_err) { return ccnet_db_statement_exists (db, sql, db_err, 0); } int ccnet_db_foreach_selected_row (CcnetDB *db, const char *sql, CcnetDBRowFunc callback, void *data) { return ccnet_db_statement_foreach_row (db, sql, callback, data, 0); } const char * ccnet_db_row_get_column_text (CcnetDBRow *row, guint32 idx) { g_return_val_if_fail (idx < db_ops.row_get_column_count(row), NULL); return db_ops.row_get_column_string (row, idx); } int ccnet_db_row_get_column_int (CcnetDBRow *row, guint32 idx) { g_return_val_if_fail (idx < db_ops.row_get_column_count(row), -1); return db_ops.row_get_column_int (row, idx); } gint64 ccnet_db_row_get_column_int64 (CcnetDBRow *row, guint32 idx) { g_return_val_if_fail (idx < db_ops.row_get_column_count(row), -1); return db_ops.row_get_column_int64 (row, idx); } int ccnet_db_get_int (CcnetDB *db, const char *sql) { return ccnet_db_statement_get_int (db, sql, 0); } gint64 ccnet_db_get_int64 (CcnetDB *db, const char *sql) { return ccnet_db_statement_get_int64 (db, sql, 0); } char * ccnet_db_get_string (CcnetDB *db, const char *sql) { return ccnet_db_statement_get_string (db, sql, 0); } int ccnet_db_statement_query (CcnetDB *db, const char *sql, int n, ...) { int ret; DBConnection *conn = NULL; conn = db_ops.get_connection (db); if (!conn) return -1; va_list args; va_start (args, n); ret = db_ops.execute_sql (conn, sql, n, args); va_end (args); db_ops.release_connection (conn); return ret; } gboolean ccnet_db_statement_exists (CcnetDB *db, const char *sql, gboolean *db_err, int n, ...) { int n_rows; DBConnection *conn = NULL; conn = db_ops.get_connection(db); if (!conn) return FALSE; va_list args; va_start (args, n); n_rows = db_ops.query_foreach_row (conn, sql, NULL, NULL, n, args); va_end (args); db_ops.release_connection(conn); if (n_rows < 0) { *db_err = TRUE; return FALSE; } else { *db_err = FALSE; return (n_rows != 0); } } int ccnet_db_statement_foreach_row (CcnetDB *db, const char *sql, CcnetDBRowFunc callback, void *data, int n, ...) { int ret; DBConnection *conn = NULL; conn = db_ops.get_connection (db); if (!conn) return -1; va_list args; va_start (args, n); ret = db_ops.query_foreach_row (conn, sql, callback, data, n, args); va_end (args); db_ops.release_connection (conn); return ret; } static gboolean get_int_cb (CcnetDBRow *row, void *data) { int *pret = (int*)data; *pret = ccnet_db_row_get_column_int (row, 0); return FALSE; } int ccnet_db_statement_get_int (CcnetDB *db, const char *sql, int n, ...) { int ret = -1; int rc; DBConnection *conn = NULL; conn = db_ops.get_connection (db); if (!conn) return -1; va_list args; va_start (args, n); rc = db_ops.query_foreach_row (conn, sql, get_int_cb, &ret, n, args); va_end (args); db_ops.release_connection (conn); if (rc < 0) return -1; return ret; } static gboolean get_int64_cb (CcnetDBRow *row, void *data) { gint64 *pret = (gint64*)data; *pret = ccnet_db_row_get_column_int64 (row, 0); return FALSE; } gint64 ccnet_db_statement_get_int64 (CcnetDB *db, const char *sql, int n, ...) { gint64 ret = -1; int rc; DBConnection *conn = NULL; conn = db_ops.get_connection (db); if (!conn) return -1; va_list args; va_start (args, n); rc = db_ops.query_foreach_row (conn, sql, get_int64_cb, &ret, n, args); va_end(args); db_ops.release_connection (conn); if (rc < 0) return -1; return ret; } static gboolean get_string_cb (CcnetDBRow *row, void *data) { char **pret = (char**)data; *pret = g_strdup(ccnet_db_row_get_column_text (row, 0)); return FALSE; } char * ccnet_db_statement_get_string (CcnetDB *db, const char *sql, int n, ...) { char *ret = NULL; int rc; DBConnection *conn = NULL; conn = db_ops.get_connection (db); if (!conn) return NULL; va_list args; va_start (args, n); rc = db_ops.query_foreach_row (conn, sql, get_string_cb, &ret, n, args); va_end(args); db_ops.release_connection (conn); if (rc < 0) return NULL; return ret; } /* Transaction */ CcnetDBTrans * ccnet_db_begin_transaction (CcnetDB *db) { CcnetDBTrans *trans = NULL; DBConnection *conn = db_ops.get_connection(db); if (!conn) { return trans; } if (db_ops.execute_sql_no_stmt (conn, "BEGIN") < 0) { db_ops.release_connection (conn); return trans; } trans = g_new0 (CcnetDBTrans, 1); trans->conn = conn; return trans; } void ccnet_db_trans_close (CcnetDBTrans *trans) { db_ops.release_connection (trans->conn); g_free (trans); } int ccnet_db_commit (CcnetDBTrans *trans) { DBConnection *conn = trans->conn; if (db_ops.execute_sql_no_stmt (conn, "COMMIT") < 0) { return -1; } return 0; } int ccnet_db_rollback (CcnetDBTrans *trans) { DBConnection *conn = trans->conn; if (db_ops.execute_sql_no_stmt (conn, "ROLLBACK") < 0) { return -1; } return 0; } int ccnet_db_trans_query (CcnetDBTrans *trans, const char *sql, int n, ...) { int ret; va_list args; va_start (args, n); ret = db_ops.execute_sql (trans->conn, sql, n, args); va_end (args); return ret; } gboolean ccnet_db_trans_check_for_existence (CcnetDBTrans *trans, const char *sql, gboolean *db_err, int n, ...) { int n_rows; va_list args; va_start (args, n); n_rows = db_ops.query_foreach_row (trans->conn, sql, NULL, NULL, n, args); va_end (args); if (n_rows < 0) { *db_err = TRUE; return FALSE; } else { *db_err = FALSE; return (n_rows != 0); } } int ccnet_db_trans_foreach_selected_row (CcnetDBTrans *trans, const char *sql, CcnetDBRowFunc callback, void *data, int n, ...) { int ret; va_list args; va_start (args, n); ret = db_ops.query_foreach_row (trans->conn, sql, callback, data, n, args); va_end (args); return ret; } #ifdef HAVE_MYSQL /* MySQL DB */ typedef struct MySQLDB { struct CcnetDB parent; char *host; char *user; char *password; unsigned int port; char *db_name; char *unix_socket; gboolean use_ssl; char *charset; } MySQLDB; typedef struct MySQLDBConnection { struct DBConnection parent; MYSQL *db_conn; } MySQLDBConnection; static CcnetDB * mysql_db_new (const char *host, int port, const char *user, const char *password, const char *db_name, const char *unix_socket, gboolean use_ssl, const char *charset) { MySQLDB *db = g_new0 (MySQLDB, 1); db->host = g_strdup (host); db->user = g_strdup (user); db->password = g_strdup (password); db->port = port; db->db_name = g_strdup(db_name); db->unix_socket = g_strdup(unix_socket); db->use_ssl = use_ssl; db->charset = g_strdup(charset); mysql_library_init (0, NULL, NULL); return (CcnetDB *)db; } static DBConnection * mysql_db_get_connection (CcnetDB *vdb) { MySQLDB *db = (MySQLDB *)vdb; my_bool yes = 1; int conn_timeout = 1; MYSQL *db_conn; MySQLDBConnection *conn = NULL; db_conn = mysql_init (NULL); if (!db_conn) { ccnet_warning ("Failed to init mysql connection object.\n"); return NULL; } if (db->use_ssl) mysql_ssl_set(db_conn, 0,0,0,0,0); if (db->charset) mysql_options(db_conn, MYSQL_SET_CHARSET_NAME, db->charset); mysql_options(db_conn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&conn_timeout); mysql_options(db_conn, MYSQL_OPT_RECONNECT, (const char*)&yes); if (!mysql_real_connect(db_conn, db->host, db->user, db->password, db->db_name, db->port, db->unix_socket, CLIENT_MULTI_STATEMENTS)) { ccnet_warning ("Failed to connect to MySQL: %s\n", mysql_error(db_conn)); mysql_close (db_conn); return NULL; } conn = g_new0 (MySQLDBConnection, 1); conn->db_conn = db_conn; return (DBConnection *)conn; } static void mysql_db_release_connection (DBConnection *vconn) { if (!vconn) return; MySQLDBConnection *conn = (MySQLDBConnection *)vconn; mysql_close (conn->db_conn); g_free (conn); } static int mysql_db_execute_sql_no_stmt (DBConnection *vconn, const char *sql) { MySQLDBConnection *conn = (MySQLDBConnection *)vconn; if (mysql_query (conn->db_conn, sql) != 0) { ccnet_warning ("Failed to execute sql %s: %s\n", sql, mysql_error(conn->db_conn)); return -1; } return 0; } static MYSQL_STMT * _prepare_stmt_mysql (MYSQL *db, const char *sql) { MYSQL_STMT *stmt; stmt = mysql_stmt_init (db); if (!stmt) { ccnet_warning ("mysql_stmt_init failed.\n"); return NULL; } if (mysql_stmt_prepare (stmt, sql, strlen(sql)) != 0) { ccnet_warning ("Failed to prepare sql %s: %s\n", sql, mysql_stmt_error(stmt)); mysql_stmt_close (stmt); return NULL; } return stmt; } static int _bind_params_mysql (MYSQL_STMT *stmt, MYSQL_BIND *params, int n, va_list args) { int i; const char *type; for (i = 0; i < n; ++i) { type = va_arg (args, const char *); if (strcmp(type, "int") == 0) { int x = va_arg (args, int); int *pval = g_new (int, 1); *pval = x; params[i].buffer_type = MYSQL_TYPE_LONG; params[i].buffer = pval; params[i].is_null = 0; } else if (strcmp (type, "int64") == 0) { gint64 x = va_arg (args, gint64); gint64 *pval = g_new (gint64, 1); *pval = x; params[i].buffer_type = MYSQL_TYPE_LONGLONG; params[i].buffer = pval; params[i].is_null = 0; } else if (strcmp (type, "string") == 0) { const char *s = va_arg (args, const char *); static my_bool yes = TRUE; params[i].buffer_type = MYSQL_TYPE_STRING; params[i].buffer = g_strdup(s); unsigned long *plen = g_new (unsigned long, 1); params[i].length = plen; if (!s) { *plen = 0; params[i].buffer_length = 0; params[i].is_null = &yes; } else { *plen = strlen(s); params[i].buffer_length = *plen + 1; params[i].is_null = 0; } } else { ccnet_warning ("BUG: invalid prep stmt parameter type %s.\n", type); g_return_val_if_reached (-1); } } if (mysql_stmt_bind_param (stmt, params) != 0) { return -1; } return 0; } static int mysql_db_execute_sql (DBConnection *vconn, const char *sql, int n, va_list args) { MySQLDBConnection *conn = (MySQLDBConnection *)vconn; MYSQL *db = conn->db_conn; MYSQL_STMT *stmt = NULL; MYSQL_BIND *params = NULL; int ret = 0; stmt = _prepare_stmt_mysql (db, sql); if (!stmt) { return -1; } if (n > 0) { params = g_new0 (MYSQL_BIND, n); if (_bind_params_mysql (stmt, params, n, args) < 0) { ccnet_warning ("Failed to bind parameters for %s: %s.\n", sql, mysql_stmt_error(stmt)); ret = -1; goto out; } } if (mysql_stmt_execute (stmt) != 0) { ccnet_warning ("Failed to execute sql %s: %s\n", sql, mysql_stmt_error(stmt)); ret = -1; goto out; } out: if (stmt) mysql_stmt_close (stmt); if (params) { int i; for (i = 0; i < n; ++i) { g_free (params[i].buffer); g_free (params[i].length); } g_free (params); } return ret; } typedef struct MySQLDBRow { CcnetDBRow parent; int column_count; MYSQL_STMT *stmt; MYSQL_BIND *results; /* Used when returned columns are truncated. */ MYSQL_BIND *new_binds; } MySQLDBRow; #define DEFAULT_MYSQL_COLUMN_SIZE 1024 static int mysql_db_query_foreach_row (DBConnection *vconn, const char *sql, CcnetDBRowFunc callback, void *data, int n, va_list args) { MySQLDBConnection *conn = (MySQLDBConnection *)vconn; MYSQL *db = conn->db_conn; MYSQL_STMT *stmt = NULL; MYSQL_BIND *params = NULL; MySQLDBRow row; int nrows = 0; int i; memset (&row, 0, sizeof(row)); stmt = _prepare_stmt_mysql (db, sql); if (!stmt) { return -1; } if (n > 0) { params = g_new0 (MYSQL_BIND, n); if (_bind_params_mysql (stmt, params, n, args) < 0) { nrows = -1; goto out; } } if (mysql_stmt_execute (stmt) != 0) { ccnet_warning ("Failed to execute sql %s: %s\n", sql, mysql_stmt_error(stmt)); nrows = -1; goto out; } row.column_count = mysql_stmt_field_count (stmt); row.stmt = stmt; row.results = g_new0 (MYSQL_BIND, row.column_count); for (i = 0; i < row.column_count; ++i) { row.results[i].buffer = g_malloc (DEFAULT_MYSQL_COLUMN_SIZE + 1); /* Ask MySQL to convert fields to string, to avoid the trouble of * checking field types. */ row.results[i].buffer_type = MYSQL_TYPE_STRING; row.results[i].buffer_length = DEFAULT_MYSQL_COLUMN_SIZE; row.results[i].length = g_new0 (unsigned long, 1); row.results[i].is_null = g_new0 (my_bool, 1); } row.new_binds = g_new0 (MYSQL_BIND, row.column_count); if (mysql_stmt_bind_result (stmt, row.results) != 0) { ccnet_warning ("Failed to bind result for sql %s: %s\n", sql, mysql_stmt_error(stmt)); nrows = -1; goto out; } int rc; gboolean next_row = TRUE; while (1) { rc = mysql_stmt_fetch (stmt); if (rc == 1) { ccnet_warning ("Failed to fetch result for sql %s: %s\n", sql, mysql_stmt_error(stmt)); nrows = -1; goto out; } if (rc == MYSQL_NO_DATA) break; /* rc == 0 or rc == MYSQL_DATA_TRUNCATED */ ++nrows; if (callback) next_row = callback ((CcnetDBRow *)&row, data); for (i = 0; i < row.column_count; ++i) { g_free (row.new_binds[i].buffer); g_free (row.new_binds[i].length); g_free (row.new_binds[i].is_null); memset (&row.new_binds[i], 0, sizeof(MYSQL_BIND)); } if (!next_row) break; } out: if (stmt) { mysql_stmt_free_result (stmt); mysql_stmt_close (stmt); } if (params) { for (i = 0; i < n; ++i) { g_free (params[i].buffer); g_free (params[i].length); } g_free (params); } if (row.results) { for (i = 0; i < row.column_count; ++i) { g_free (row.results[i].buffer); g_free (row.results[i].length); g_free (row.results[i].is_null); } g_free (row.results); } if (row.new_binds) { for (i = 0; i < row.column_count; ++i) { g_free (row.new_binds[i].buffer); g_free (row.new_binds[i].length); g_free (row.new_binds[i].is_null); } g_free (row.new_binds); } return nrows; } static int mysql_db_row_get_column_count (CcnetDBRow *vrow) { MySQLDBRow *row = (MySQLDBRow *)vrow; return row->column_count; } static const char * mysql_db_row_get_column_string (CcnetDBRow *vrow, int i) { MySQLDBRow *row = (MySQLDBRow *)vrow; if (*(row->results[i].is_null)) { return NULL; } char *ret = NULL; unsigned long real_length = *(row->results[i].length); /* If column size is larger then allocated buffer size, re-allocate a new buffer * and fetch the column directly. */ if (real_length > row->results[i].buffer_length) { row->new_binds[i].buffer = g_malloc (real_length + 1); row->new_binds[i].buffer_type = MYSQL_TYPE_STRING; row->new_binds[i].buffer_length = real_length; row->new_binds[i].length = g_new0 (unsigned long, 1); row->new_binds[i].is_null = g_new0 (my_bool, 1); if (mysql_stmt_fetch_column (row->stmt, &row->new_binds[i], i, 0) != 0) { ccnet_warning ("Faield to fetch column: %s\n", mysql_stmt_error(row->stmt)); return NULL; } ret = row->new_binds[i].buffer; } else { ret = row->results[i].buffer; } ret[real_length] = 0; return ret; } static int mysql_db_row_get_column_int (CcnetDBRow *vrow, int idx) { const char *str; char *e; int ret; str = mysql_db_row_get_column_string (vrow, idx); if (!str) { return 0; } errno = 0; ret = strtol (str, &e, 10); if (errno || (e == str)) { ccnet_warning ("Number conversion failed.\n"); return -1; } return ret; } static gint64 mysql_db_row_get_column_int64 (CcnetDBRow *vrow, int idx) { const char *str; char *e; gint64 ret; str = mysql_db_row_get_column_string (vrow, idx); if (!str) { return 0; } errno = 0; ret = strtoll (str, &e, 10); if (errno || (e == str)) { ccnet_warning ("Number conversion failed.\n"); return -1; } return ret; } #endif /* HAVE_MYSQL */ /* SQLite DB */ /* SQLite thread synchronization rountines. * See https://www.sqlite.org/unlock_notify.html */ typedef struct UnlockNotification { int fired; pthread_cond_t cond; pthread_mutex_t mutex; } UnlockNotification; static void unlock_notify_cb(void **ap_arg, int n_arg) { int i; for (i = 0; i < n_arg; i++) { UnlockNotification *p = (UnlockNotification *)ap_arg[i]; pthread_mutex_lock (&p->mutex); p->fired = 1; pthread_cond_signal (&p->cond); pthread_mutex_unlock (&p->mutex); } } static int wait_for_unlock_notify(sqlite3 *db) { UnlockNotification un; un.fired = 0; pthread_mutex_init (&un.mutex, NULL); pthread_cond_init (&un.cond, NULL); int rc = sqlite3_unlock_notify(db, unlock_notify_cb, (void *)&un); if (rc == SQLITE_OK) { pthread_mutex_lock(&un.mutex); if (!un.fired) pthread_cond_wait (&un.cond, &un.mutex); pthread_mutex_unlock(&un.mutex); } pthread_cond_destroy (&un.cond); pthread_mutex_destroy (&un.mutex); return rc; } static int sqlite3_blocking_step(sqlite3_stmt *stmt) { int rc; while (SQLITE_LOCKED == (rc = sqlite3_step(stmt))) { rc = wait_for_unlock_notify(sqlite3_db_handle(stmt)); if (rc != SQLITE_OK) break; sqlite3_reset(stmt); } return rc; } static int sqlite3_blocking_prepare_v2(sqlite3 *db, const char *sql, int sql_len, sqlite3_stmt **pstmt, const char **pz) { int rc; while (SQLITE_LOCKED == (rc = sqlite3_prepare_v2(db, sql, sql_len, pstmt, pz))) { rc = wait_for_unlock_notify(db); if (rc != SQLITE_OK) break; } return rc; } static int sqlite3_blocking_exec(sqlite3 *db, const char *sql, int (*callback)(void *, int, char **, char **), void *arg, char **errmsg) { int rc; while (SQLITE_LOCKED == (rc = sqlite3_exec(db, sql, callback, arg, errmsg))) { rc = wait_for_unlock_notify(db); if (rc != SQLITE_OK) break; } return rc; } typedef struct SQLiteDB { CcnetDB parent; char *db_path; } SQLiteDB; typedef struct SQLiteDBConnection { DBConnection parent; sqlite3 *db_conn; } SQLiteDBConnection; static CcnetDB * sqlite_db_new (const char *db_path) { SQLiteDB *db = g_new0 (SQLiteDB, 1); db->db_path = g_strdup(db_path); return (CcnetDB *)db; } static DBConnection * sqlite_db_get_connection (CcnetDB *vdb) { SQLiteDB *db = (SQLiteDB *)vdb; sqlite3 *db_conn; int result; const char *errmsg; SQLiteDBConnection *conn; result = sqlite3_open_v2 (db->db_path, &db_conn, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_SHAREDCACHE, NULL); if (result != SQLITE_OK) { errmsg = sqlite3_errmsg(db_conn); ccnet_warning ("Failed to open sqlite db: %s\n", errmsg ? errmsg : "no error given"); return NULL; } conn = g_new0 (SQLiteDBConnection, 1); conn->db_conn = db_conn; return (DBConnection *)conn; } static void sqlite_db_release_connection (DBConnection *vconn) { if (!vconn) return; SQLiteDBConnection *conn = (SQLiteDBConnection *)vconn; sqlite3_close (conn->db_conn); g_free (conn); } static int sqlite_db_execute_sql_no_stmt (DBConnection *vconn, const char *sql) { SQLiteDBConnection *conn = (SQLiteDBConnection *)vconn; char *errmsg = NULL; int rc; rc = sqlite3_blocking_exec (conn->db_conn, sql, NULL, NULL, &errmsg); if (rc != SQLITE_OK) { ccnet_warning ("sqlite3_exec failed %s: %s", sql, errmsg ? errmsg : "no error given"); if (errmsg) sqlite3_free (errmsg); return -1; } return 0; } static int _bind_parameters_sqlite (sqlite3 *db, sqlite3_stmt *stmt, int n, va_list args) { int i; const char *type; for (i = 0; i < n; ++i) { type = va_arg (args, const char *); if (strcmp(type, "int") == 0) { int x = va_arg (args, int); if (sqlite3_bind_int (stmt, i+1, x) != SQLITE_OK) { ccnet_warning ("sqlite3_bind_int failed: %s\n", sqlite3_errmsg(db)); return -1; } } else if (strcmp (type, "int64") == 0) { gint64 x = va_arg (args, gint64); if (sqlite3_bind_int64 (stmt, i+1, x) != SQLITE_OK) { ccnet_warning ("sqlite3_bind_int64 failed: %s\n", sqlite3_errmsg(db)); return -1; } } else if (strcmp (type, "string") == 0) { const char *s = va_arg (args, const char *); if (sqlite3_bind_text (stmt, i+1, s, -1, SQLITE_TRANSIENT) != SQLITE_OK) { ccnet_warning ("sqlite3_bind_text failed: %s\n", sqlite3_errmsg(db)); return -1; } } else { ccnet_warning ("BUG: invalid prep stmt parameter type %s.\n", type); g_return_val_if_reached (-1); } } return 0; } static int sqlite_db_execute_sql (DBConnection *vconn, const char *sql, int n, va_list args) { SQLiteDBConnection *conn = (SQLiteDBConnection *)vconn; sqlite3 *db = conn->db_conn; sqlite3_stmt *stmt; int rc; int ret = 0; rc = sqlite3_blocking_prepare_v2 (db, sql, -1, &stmt, NULL); if (rc != SQLITE_OK) { ccnet_warning ("sqlite3_prepare_v2 failed %s: %s", sql, sqlite3_errmsg(db)); return -1; } if (_bind_parameters_sqlite (db, stmt, n, args) < 0) { ccnet_warning ("Failed to bind parameters for sql %s\n", sql); ret = -1; goto out; } rc = sqlite3_blocking_step (stmt); if (rc != SQLITE_DONE) { ccnet_warning ("sqlite3_step failed %s: %s", sql, sqlite3_errmsg(db)); ret = -1; goto out; } out: sqlite3_finalize (stmt); return ret; } typedef struct SQLiteDBRow { CcnetDBRow parent; int column_count; sqlite3 *db; sqlite3_stmt *stmt; } SQLiteDBRow; static int sqlite_db_query_foreach_row (DBConnection *vconn, const char *sql, CcnetDBRowFunc callback, void *data, int n, va_list args) { SQLiteDBConnection *conn = (SQLiteDBConnection *)vconn; sqlite3 *db = conn->db_conn; sqlite3_stmt *stmt; int rc; int nrows = 0; rc = sqlite3_blocking_prepare_v2 (db, sql, -1, &stmt, NULL); if (rc != SQLITE_OK) { ccnet_warning ("sqlite3_prepare_v2 failed %s: %s", sql, sqlite3_errmsg(db)); return -1; } if (_bind_parameters_sqlite (db, stmt, n, args) < 0) { ccnet_warning ("Failed to bind parameters for sql %s\n", sql); nrows = -1; goto out; } SQLiteDBRow row; memset (&row, 0, sizeof(row)); row.db = db; row.stmt = stmt; row.column_count = sqlite3_column_count (stmt); while (1) { rc = sqlite3_blocking_step (stmt); if (rc == SQLITE_ROW) { ++nrows; if (callback && !callback ((CcnetDBRow *)&row, data)) break; } else if (rc == SQLITE_DONE) { break; } else { ccnet_warning ("sqlite3_step failed %s: %s\n", sql, sqlite3_errmsg(db)); nrows = -1; goto out; } } out: sqlite3_finalize (stmt); return nrows; } static int sqlite_db_row_get_column_count (CcnetDBRow *vrow) { SQLiteDBRow *row = (SQLiteDBRow *)vrow; return row->column_count; } static const char * sqlite_db_row_get_column_string (CcnetDBRow *vrow, int idx) { SQLiteDBRow *row = (SQLiteDBRow *)vrow; return (const char *)sqlite3_column_text (row->stmt, idx); } static int sqlite_db_row_get_column_int (CcnetDBRow *vrow, int idx) { SQLiteDBRow *row = (SQLiteDBRow *)vrow; return sqlite3_column_int (row->stmt, idx); } static gint64 sqlite_db_row_get_column_int64 (CcnetDBRow *vrow, int idx) { SQLiteDBRow *row = (SQLiteDBRow *)vrow; return sqlite3_column_int64 (row->stmt, idx); }