diff --git a/db/osql_srs.c b/db/osql_srs.c index 84bafee5c9..41e490cb6f 100644 --- a/db/osql_srs.c +++ b/db/osql_srs.c @@ -254,138 +254,153 @@ int srs_tran_empty(struct sqlclntstate *clnt) long long gbl_verify_tran_replays = 0; int gbl_disttxn_random_retry_poll = 500; -/** - * Replay transaction using the current history - * - */ -static int srs_tran_replay_int(struct sqlclntstate *clnt, int(dispatch_fn)(struct sqlclntstate *)) +int srs_tran_replay_prepare(struct sqlclntstate *clnt) { - osqlstate_t *osql = &clnt->osql; - srs_tran_query_t *item = 0; - int rc = 0; - int nq = 0; - int tnq = 0; - - clnt->verify_retries = 0; - - if (!osql->history) { + if (!clnt->osql.history) { logmsg(LOGMSG_ERROR, "Trying to replay, but no history?\n"); cheap_stack_trace(); return -1; } - do { - /* resending writes do not repeat reads, preserve num_selected */ - reset_query_effects(clnt, 0, 1); /* Reset it for each retry*/ - if (!osql->history) { - logmsg(LOGMSG_ERROR, "Trying to replay, but no history?\n"); - abort(); - } + clnt->save_cb = clnt->done_cb; + clnt->done_cb = NULL; + clnt->verify_retries = 0; - clnt->verify_retries++; - gbl_verify_tran_replays++; - if (clnt->dist_timestamp > 0) { - int pval = gbl_disttxn_random_retry_poll; - if (pval > 1) { - poll(0, 0, rand() % pval); - } - } + int rc = dispatch_sql_query_no_wait(clnt); + if (rc == 0) { + return RC_INTERNAL_RETRY; + } - /* Replays for SERIAL or SNAPISOL will never have select or selectv */ - if (clnt->dbtran.mode == TRANLEVEL_RECOM /* not for modsnap */) { - /* we need to free all the shadows but selectv table (recgenid) */ - rc = osql_shadtbl_reset_for_selectv(clnt); - if (rc) { - logmsg(LOGMSG_ERROR, - "Failed to reset selectv in read committed\n"); - abort(); - cheap_stack_trace(); - return -1; - } - } else { - osql_shadtbl_close(clnt); - } + // Dispatch failed - restore done_cb + clnt->done_cb = clnt->save_cb; + clnt->save_cb = NULL; - if (clnt->verify_retries == gbl_osql_verify_retries_max + 1) { - osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_LAST); - } + if (!clnt->query_rc) { + clnt->query_rc = clnt->osql.xerr.errval ? clnt->osql.xerr.errval : CDB2ERR_VERIFY_ERROR; + } - if (0 /*!bdb_am_i_coherent(thedb->bdb_env)*/) { - logmsg(LOGMSG_ERROR, - "Cannot replay, I am incoherent id=%d retries=%d\n", - clnt->queryid, clnt->verify_retries); - rc = CDB2ERR_VERIFY_ERROR; - break; - } - nq = 0; - clnt->start_gen = bdb_get_rep_gen(thedb->bdb_env); - LISTC_FOR_EACH(&osql->history->lst, item, lnk) - { - clnt->done = 0; /* reset done flag */ - restore_stmt(clnt, item); - if ((rc = dispatch_fn(clnt)) != 0) - break; - if (!osql->history) - break; - nq++; - } - if (rc == 0) - tnq = nq; - - /* don't repeat if we fail with unexplicable error, i.e. not a logical - * error */ - if (rc < 0) { - if (osql->replay != OSQL_RETRY_NONE) { - logmsg(LOGMSG_ERROR, - "%p Replaying failed abnormally, calling abort, nq=%d tnq=%d\n", - clnt, nq, tnq); - if (debug_switch_osql_verbose_history_replay()) { - if (osql->history) { - LISTC_FOR_EACH(&osql->history->lst, item, lnk) - { - logmsg(LOGMSG_DEBUG, "\"%s\"\n", print_stmt(clnt, item)); - } - } - } - - int type = tran2req(clnt->dbtran.mode); - osql_sock_abort(clnt, type); - } - break; + return rc; +} + +static int srs_tran_replay_once(struct sqlclntstate *clnt, int(dispatch_fn)(struct sqlclntstate *)) +{ + osqlstate_t *osql = &clnt->osql; + srs_tran_query_t *item = 0; + int rc = 0; + osql->num_queries = 0; + + /* resending writes do not repeat reads, preserve num_selected */ + reset_query_effects(clnt, 0, 1); /* Reset it for each retry*/ + if (!osql->history) { + logmsg(LOGMSG_ERROR, "Trying to replay, but no history?\n"); + abort(); + } + + clnt->verify_retries++; + gbl_verify_tran_replays++; + if (clnt->dist_timestamp > 0) { + int pval = gbl_disttxn_random_retry_poll; + if (pval > 1) { + poll(0, 0, rand() % pval); } - } while (osql->replay == OSQL_RETRY_DO && clnt->verify_retries <= gbl_osql_verify_retries_max); + } - if (clnt->verify_retries >= gbl_osql_verify_retries_max && osql->xerr.errval) { - logmsg(LOGMSG_ERROR, "transaction from pid %d on origin host %s failed %d times with verify errors\n", - clnt->last_pid, clnt->origin, clnt->verify_retries); - /* Set to NONE to suppress the error from srs_tran_destroy(). */ - osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_NONE); + /* Replays for SERIAL or SNAPISOL will never have select or selectv */ + if (clnt->dbtran.mode == TRANLEVEL_RECOM /* not for modsnap */) { + /* we need to free all the shadows but selectv table (recgenid) */ + rc = osql_shadtbl_reset_for_selectv(clnt); + if (rc) { + logmsg(LOGMSG_ERROR, "Failed to reset selectv in read committed\n"); + abort(); + cheap_stack_trace(); + return -1; + } + } else { + osql_shadtbl_close(clnt); } - /* replayed, free the session */ - if (srs_tran_destroy(clnt)) { - logmsg(LOGMSG_ERROR, "%s Fail to destroy transaction replay session\n", - __func__); + if (clnt->verify_retries == gbl_osql_verify_retries_max + 1) { + osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_LAST); } - if (rc && clnt->verify_retries < gbl_osql_verify_retries_max) { - logmsg(LOGMSG_ERROR, "Uncommittable transaction %d retried %d times, " - "rc=%d [global retr=%lld] nq=%d tnq=%d\n", clnt->queryid, - clnt->verify_retries, rc, gbl_verify_tran_replays, nq, tnq); + + clnt->start_gen = bdb_get_rep_gen(thedb->bdb_env); + LISTC_FOR_EACH(&osql->history->lst, item, lnk) + { + clnt->done = 0; /* reset done flag */ + restore_stmt(clnt, item); + if ((rc = dispatch_fn(clnt)) != 0) + return rc; + if (!osql->history) + return 0; // normal completion - inner commit consumed history + osql->num_queries++; } - osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_NONE); - clnt->verify_retries = 0; + // Notice that this won't be updated upon a successful replay + // the history is cleared in `handle_sql_commitrollback` and + // the current function returns early. + osql->total_queries = osql->num_queries; return rc; } static int run_sql_query(struct sqlclntstate *clnt) { + clnt->osql.in_replay_nested = 1; sqlengine_work_appsock(clnt->thd, clnt); + clnt->osql.in_replay_nested = 0; return 0; } -int srs_tran_replay_inline(struct sqlclntstate *clnt) +int srs_tran_replay(struct sqlclntstate *clnt) { - return srs_tran_replay_int(clnt, run_sql_query); + osqlstate_t *osql = &clnt->osql; + int rc = 0; + + // replay all the stmts + rc = srs_tran_replay_once(clnt, run_sql_query); + + int more = (rc == 0 && osql->replay == OSQL_RETRY_DO && clnt->verify_retries <= gbl_osql_verify_retries_max); + + int redispatched = 0; + if (more) { + redispatched = (dispatch_sql_query_no_wait(clnt) == 0); + } + if (redispatched) { + return RC_INTERNAL_RETRY; + } + + int replay_succeeded = (osql->replay == OSQL_RETRY_NONE && rc == 0); + + // Replay succeeded or there's an error and we need to stop replay + + if (clnt->verify_retries >= gbl_osql_verify_retries_max && osql->xerr.errval) { + // Exhausted all retries + logmsg(LOGMSG_ERROR, "transaction from pid %d on origin host %s failed %d times with verify errors\n", + clnt->last_pid, clnt->origin, clnt->verify_retries); + } + if (rc && clnt->verify_retries < gbl_osql_verify_retries_max) { + logmsg(LOGMSG_ERROR, + "Uncommittable transaction %d retried %d times, " + "rc=%d [global retr=%lld] nq=%d tnq=%d\n", + clnt->queryid, clnt->verify_retries, rc, gbl_verify_tran_replays, osql->num_queries, + osql->total_queries); + } + + /* Single point to surface a failure code to the client. */ + if (!replay_succeeded && !clnt->query_rc) { + clnt->query_rc = osql->xerr.errval ? osql->xerr.errval : CDB2ERR_VERIFY_ERROR; + } + + osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_NONE); + + /* replayed, free the session */ + if (srs_tran_destroy(clnt)) { + logmsg(LOGMSG_ERROR, "%s Fail to destroy transaction replay session\n", __func__); + } + + clnt->done_cb = clnt->save_cb; + clnt->save_cb = NULL; + clnt->verify_retries = 0; + + return rc; } diff --git a/db/osql_srs.h b/db/osql_srs.h index b3218ce6bb..0006a0a067 100644 --- a/db/osql_srs.h +++ b/db/osql_srs.h @@ -58,10 +58,15 @@ int srs_tran_del_last_query(struct sqlclntstate *clnt); int srs_tran_empty(struct sqlclntstate *clnt); /** - * Replay transaction using the current history + * Prepare and dispatch the replay + */ +int srs_tran_replay_prepare(struct sqlclntstate *clnt); + +/** + * Replay transaction once using the current history * */ -int srs_tran_replay_inline(struct sqlclntstate *); +int srs_tran_replay(struct sqlclntstate *clnt); void srs_tran_print_history(struct sqlclntstate *clnt, int indent); #endif diff --git a/db/sql.h b/db/sql.h index c4e04c306a..3e03f08067 100644 --- a/db/sql.h +++ b/db/sql.h @@ -183,6 +183,9 @@ typedef struct osqlstate { /* verify handling */ /* keep the log of sql strings for the current transaction */ struct srs_tran *history; + int num_queries; /* statements dispatched in the current replay iteration */ + int total_queries; /* statements dispatched in the most recent clean replay iteration */ + int in_replay_nested; int replay; /* set this when a session is replayed, used by sorese */ int sent_column_data; /* set this if we've already sent the column data */ @@ -785,6 +788,7 @@ struct sqlclntstate { uint8_t no_more_heartbeats; uint8_t done; plugin_func *done_cb; /* newsql_done_evbuffer */ + plugin_func *save_cb; unsigned long long sqltick, sqltick_last_seen; int using_case_insensitive_like; diff --git a/db/sqlinterfaces.c b/db/sqlinterfaces.c index 8cbffaea59..396819f7d9 100644 --- a/db/sqlinterfaces.c +++ b/db/sqlinterfaces.c @@ -4483,14 +4483,15 @@ int done_cb_evbuffer(struct sqlclntstate *clnt) return -1; } if (clnt->osql.replay == OSQL_RETRY_DO) { - plugin_func *save_cb = clnt->done_cb; - clnt->done_cb = NULL; - int rc = srs_tran_replay_inline(clnt); - if (rc && !clnt->query_rc) { - clnt->query_rc = rc; + int rc = srs_tran_replay_prepare(clnt); + if (rc == RC_INTERNAL_RETRY) { + return rc; } - clnt->done_cb = save_cb; - } else if (clnt->osql.history && clnt->ctrl_sqlengine == SQLENG_NORMAL_PROCESS) { + } + if (clnt->osql.replay != OSQL_RETRY_NONE) { + osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_NONE); + } + if (clnt->osql.history && clnt->ctrl_sqlengine == SQLENG_NORMAL_PROCESS) { srs_tran_destroy(clnt); } Pthread_mutex_lock(&lru_evbuffers_mtx); /* protect log_long_running_stmts_evbuffer() */ @@ -4857,6 +4858,17 @@ void sqlengine_work_appsock(struct sqlthdstate *thd, struct sqlclntstate *clnt) assert(clnt->dbtran.pStmt == NULL); + if (clnt->osql.replay == OSQL_RETRY_DO && !clnt->osql.in_replay_nested) { + if (srs_tran_replay(clnt) == RC_INTERNAL_RETRY) { + /* Another iteration was scheduled on a new worker. + * That worker now owns the clnt; do NOT signal_clnt_as_done + * here or it would race with the new worker's enqueue. */ + thrman_setid(thrman_self(), "[done]"); + return; + } + goto done; + } + /* everything going in is cursor based */ int rc = get_curtran(thedb->bdb_env, clnt); if (rc) { @@ -4898,10 +4910,6 @@ void sqlengine_work_appsock(struct sqlthdstate *thd, struct sqlclntstate *clnt) clnt->query_rc = execute_sql_query(thd, clnt); } - if (clnt->sql_ref) { - put_ref(&clnt->sql_ref); - } - osql_shadtbl_done_query(thedb->bdb_env, clnt); thrman_setfd(thd->thr_self, -1); @@ -4914,6 +4922,12 @@ void sqlengine_work_appsock(struct sqlthdstate *thd, struct sqlclntstate *clnt) logmsg(LOGMSG_ERROR, "%s: unable to destroy a CURSOR transaction!\n", __func__); } clnt->osql.timings.query_finished = osql_log_time(); + +done: + if (clnt->sql_ref) { + put_ref(&clnt->sql_ref); + } + osql_log_time_done(clnt); clnt_change_state(clnt, CONNECTION_IDLE); debug_close_clnt(clnt); diff --git a/net/sqlwriter.c b/net/sqlwriter.c index 38e4173daa..17c00d444b 100644 --- a/net/sqlwriter.c +++ b/net/sqlwriter.c @@ -455,8 +455,9 @@ int sql_peer_check(struct sqlwriter *writer) int sql_done(struct sqlwriter *writer) { struct sqlclntstate *clnt = writer->clnt; - if (done_cb_evbuffer(clnt) != 0) { - return -1; + int rc = done_cb_evbuffer(clnt); + if (rc != 0) { + return rc; } Pthread_mutex_lock(&writer->wr_lock); writer->done = 1; diff --git a/plugins/newsql/newsql_evbuffer.c b/plugins/newsql/newsql_evbuffer.c index d573b29389..dcbc6f78d3 100644 --- a/plugins/newsql/newsql_evbuffer.c +++ b/plugins/newsql/newsql_evbuffer.c @@ -176,7 +176,8 @@ static void newsql_reset_evbuffer(struct newsql_appdata_evbuffer *appdata) static int newsql_done_cb(struct sqlclntstate *clnt) { struct newsql_appdata_evbuffer *appdata = clnt->appdata; - if (sql_done(appdata->writer) == 0) { + int rc = sql_done(appdata->writer); + if (rc == 0) { if (clnt->added_to_hist) { clnt->added_to_hist = 0; } else { @@ -184,7 +185,7 @@ static int newsql_done_cb(struct sqlclntstate *clnt) } appdata->query = NULL; evtimer_once(appdata->base, rd_hdr, appdata); - } else { + } else if (rc < 0) { appdata->cleanup_ev = event_new(appdata->base, -1, 0, newsql_cleanup, appdata); event_active(appdata->cleanup_ev, 0, 0); } diff --git a/tests/replay_trans.test/Makefile b/tests/replay_trans.test/Makefile index e05866ba3a..2d39e80074 100644 --- a/tests/replay_trans.test/Makefile +++ b/tests/replay_trans.test/Makefile @@ -4,5 +4,5 @@ else include $(TESTSROOTDIR)/testcase.mk endif ifeq ($(TEST_TIMEOUT),) - export TEST_TIMEOUT=1m + export TEST_TIMEOUT=3m endif diff --git a/tests/replay_trans.test/runit b/tests/replay_trans.test/runit index 010db35088..4d67ca0ad4 100755 --- a/tests/replay_trans.test/runit +++ b/tests/replay_trans.test/runit @@ -1,5 +1,5 @@ #!/usr/bin/env bash -bash -n "$0" | exit 1 +bash -n "$0" || exit 1 set -x source ${TESTSROOTDIR}/tools/runit_common.sh @@ -23,9 +23,26 @@ function init() { cdb2sql ${CDB2_OPTIONS} $dbnm default "INSERT INTO t1 VALUES(1, NOW())" master=`getmaster` + if [ -n "$CLUSTER" ] ; then + logfile="$TESTDIR/logs/$dbnm.$master.db" + else + logfile="$TESTDIR/logs/$dbnm.db" + fi cdb2sql --tabs ${CDB2_OPTIONS} $dbnm --host $master "PUT TUNABLE 'debug.invalid_genid' 1" } +function set_all_nodes_tunable() { + local tunable=$1 + local value=$2 + if [ -n "$CLUSTER" ]; then + for node in $CLUSTER; do + cdb2sql --tabs ${CDB2_OPTIONS} $dbnm --host $node "PUT TUNABLE '$tunable' $value" + done + else + cdb2sql --tabs ${CDB2_OPTIONS} $dbnm --host $master "PUT TUNABLE '$tunable' $value" + fi +} + function cleanup() { cdb2sql ${CDB2_OPTIONS} $dbnm default "DROP TABLE t1" } @@ -56,8 +73,90 @@ EOF assert_fail $? } +function run_client_timeout_during_retry() { + echo "Test: client timeout during retry" + # Raise retry max on all nodes so retries are still in flight when + # the client-side api timeout fires. + set_all_nodes_tunable osql_verify_retry_max 100000 + + # Client-side api_call_timeout expires while the server is still retrying. + # The server should clean up the replay state without "state is wrong" errors. + COMDB2_CONFIG_API_CALL_TIMEOUT=1000 cdb2sql ${CDB2_OPTIONS} $dbnm default - <&1 || true +BEGIN +UPDATE t1 SET j = NOW() WHERE i = 1; +COMMIT +EOF + + sleep 2 + + set_all_nodes_tunable osql_verify_retry_max 499 + + # Server should still be healthy + out=$(cdb2sql --tabs ${CDB2_OPTIONS} $dbnm default "SELECT i FROM t1 WHERE i = 1") + if [[ "$out" != "1" ]]; then + failexit "server unhealthy after client timeout during retry, got: $out" + fi + + # The replay state must be cleaned up without "state is wrong" errors. + found_state_wrong=0 + if [ -n "$CLUSTER" ]; then + for node in $CLUSTER; do + if grep -q "state is wrong" "$TESTDIR/logs/$dbnm.$node.db"; then + found_state_wrong=1 + break + fi + done + else + if grep -q "state is wrong" "$logfile"; then + found_state_wrong=1 + fi + fi + if [[ $found_state_wrong -eq 1 ]]; then + failexit "replay state not cleaned up on client disconnect" + fi + + echo "client timeout during retry: passed" +} + +function run_queue_full_during_retry() { + echo "Test: queue full during retry" + # Shrink the SQL engine pool so it's easy to saturate. + # Set on all nodes since the client may connect to any of them. + set_all_nodes_tunable sqlenginepool.maxt 4 + set_all_nodes_tunable sqlenginepool.maxq 2 + + # Saturate the pool with long-running queries + for i in $(seq 1 6); do + cdb2sql ${CDB2_OPTIONS} $dbnm default "SELECT SLEEP(5)" &>/dev/null & + done + sleep 1 + + # Trigger a tx that will need to retry — pool is full, dispatch may fail + cdb2sql ${CDB2_OPTIONS} $dbnm default - <&1 || true +BEGIN +UPDATE t1 SET j = NOW() WHERE i = 1; +COMMIT +EOF + + # Wait for SLEEP queries to finish and pool to drain + wait + + # Restore pool settings + set_all_nodes_tunable sqlenginepool.maxt 48 + set_all_nodes_tunable sqlenginepool.maxq 0 + + # Server should still be healthy + out=$(cdb2sql --tabs ${CDB2_OPTIONS} $dbnm default "SELECT i FROM t1 WHERE i = 1") + if [[ "$out" != "1" ]]; then + failexit "server unhealthy after queue-full retry, got: $out" + fi + echo "queue full during retry: passed" +} + init run_bad_tx_with_intransres run_bad_tx_without_intransres +run_client_timeout_during_retry +run_queue_full_during_retry verify cleanup