Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ extern int gbl_fdb_io_error_retries_phase_2_poll;
extern int gbl_fdb_auth_enabled;
extern int gbl_fdb_auth_error;
extern int gbl_debug_invalid_genid;
extern int gbl_debug_fail_replay_dispatch;

/* Tranlog */
extern int gbl_tranlog_incoherent_timeout;
Expand Down
3 changes: 3 additions & 0 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,9 @@ REGISTER_TUNABLE("debug.invalid_genid",
"Deliberately introduce an invalid genid, FOR TESTING PURPOSE (Default: off)",
TUNABLE_BOOLEAN, &gbl_debug_invalid_genid,
NOARG | EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("debug.fail_replay_dispatch",
"Force replay dispatch to fail when verify_retries >= N, FOR TESTING PURPOSE (Default: 0)",
TUNABLE_INTEGER, &gbl_debug_fail_replay_dispatch, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE(
"query_plan_percentage",
"Alarm if the average cost per row of current query plan is n percent above the cost for different query plan."
Expand Down
234 changes: 130 additions & 104 deletions db/osql_srs.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,138 +254,164 @@ int srs_tran_empty(struct sqlclntstate *clnt)
long long gbl_verify_tran_replays = 0;
int gbl_disttxn_random_retry_poll = 500;

/**
* Replay transaction using the current history
*
*/
static int srs_tran_replay_int(struct sqlclntstate *clnt, int(dispatch_fn)(struct sqlclntstate *))
int srs_tran_replay_prepare(struct sqlclntstate *clnt)
{
clnt->verify_retries = 0;
if (!clnt->osql.history) {
logmsg(LOGMSG_ERROR, "Trying to replay, but no history?\n");
cheap_stack_trace();
return -1;
}
clnt->save_cb = clnt->done_cb;
clnt->done_cb = srs_tran_replay_async;
int rc = srs_tran_replay_begin(clnt);
if (rc) {
clnt->done_cb = clnt->save_cb;
clnt->save_cb = NULL;
clnt->query_rc = rc;
}
return rc;
}

int srs_tran_replay_begin(struct sqlclntstate *clnt)
{
osqlstate_t *osql = &clnt->osql;
srs_tran_query_t *item = 0;
int rc = 0;
int nq = 0;
int tnq = 0;

clnt->verify_retries = 0;

reset_query_effects(clnt, 0, 1);
if (!osql->history) {
logmsg(LOGMSG_ERROR, "Trying to replay, but no history?\n");
cheap_stack_trace();
return -1;
abort();
}
clnt->verify_retries++;
gbl_verify_tran_replays++;
if (clnt->dist_timestamp > 0) {
int pval = gbl_disttxn_random_retry_poll;
if (pval > 1) {
poll(0, 0, rand() % pval);
}
}

do {
/* resending writes do not repeat reads, preserve num_selected */
reset_query_effects(clnt, 0, 1); /* Reset it for each retry*/
if (!osql->history) {
logmsg(LOGMSG_ERROR, "Trying to replay, but no history?\n");
// Replays for SERIAL or SNAPISOL will never have select or selectv
if (clnt->dbtran.mode == TRANLEVEL_RECOM /* not for modsnap */) {
// we need to free all the shadows but selectv table (recgenid)
rc = osql_shadtbl_reset_for_selectv(clnt);
if (rc) {
logmsg(LOGMSG_ERROR, "Failed to reset selectv in read committed\n");
abort();
cheap_stack_trace();
return -1;
}
} else {
osql_shadtbl_close(clnt);
}

clnt->verify_retries++;
gbl_verify_tran_replays++;
if (clnt->dist_timestamp > 0) {
int pval = gbl_disttxn_random_retry_poll;
if (pval > 1) {
poll(0, 0, rand() % pval);
}
}
if (clnt->verify_retries == gbl_osql_verify_retries_max) {
osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_LAST);
}

/* Replays for SERIAL or SNAPISOL will never have select or selectv */
if (clnt->dbtran.mode == TRANLEVEL_RECOM /* not for modsnap */) {
/* we need to free all the shadows but selectv table (recgenid) */
rc = osql_shadtbl_reset_for_selectv(clnt);
if (rc) {
logmsg(LOGMSG_ERROR,
"Failed to reset selectv in read committed\n");
abort();
cheap_stack_trace();
return -1;
}
} else {
osql_shadtbl_close(clnt);
}
osql->num_queries = 0;
clnt->start_gen = bdb_get_rep_gen(thedb->bdb_env);

if (clnt->verify_retries == gbl_osql_verify_retries_max + 1) {
osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_LAST);
}
// Schedule the first query
osql->replay_cursor = LISTC_TOP(&osql->history->lst);
clnt->done = 0;

restore_stmt(clnt, osql->replay_cursor);
rc = dispatch_sql_query_no_wait(clnt);

if (0 /*!bdb_am_i_coherent(thedb->bdb_env)*/) {
logmsg(LOGMSG_ERROR,
"Cannot replay, I am incoherent id=%d retries=%d\n",
clnt->queryid, clnt->verify_retries);
rc = CDB2ERR_VERIFY_ERROR;
break;
return rc;
}

int srs_tran_replay_async(struct sqlclntstate *clnt)
{
osqlstate_t *osql = &clnt->osql;
int rc = 0;

// Set query_rc so that `newsql_done_cb` can clean up properly
if (peer_dropped_connection(clnt)) {
clnt->query_rc = CDB2ERR_IO_ERROR;
}
if (clnt->query_rc == CDB2ERR_IO_ERROR) {
logmsg(LOGMSG_ERROR, "%s: client disconnected during async replay, aborting\n", __func__);
goto done;
}

osql->num_queries++;

// Schedule the next stmt or finish if the last retry succeed at commit
if (!osql->history) {
goto done;
}
osql->replay_cursor = LISTC_NEXT(osql->replay_cursor, lnk);
if (osql->replay_cursor != 0) {
clnt->done = 0;
restore_stmt(clnt, osql->replay_cursor);
rc = dispatch_sql_query_no_wait(clnt);
if (rc == 0) {
return 0;
}
nq = 0;
clnt->start_gen = bdb_get_rep_gen(thedb->bdb_env);
LISTC_FOR_EACH(&osql->history->lst, item, lnk)
{
clnt->done = 0; /* reset done flag */
restore_stmt(clnt, item);
if ((rc = dispatch_fn(clnt)) != 0)
break;
if (!osql->history)
break;
nq++;
if (osql->replay != OSQL_RETRY_NONE) {
logmsg(LOGMSG_ERROR, "%p Replaying failed abnormally in dispatch, calling abort, nq=%d, rc=%d\n", clnt,
osql->num_queries, rc);
osql_sock_abort(clnt, tran2req(clnt->dbtran.mode));
}
if (rc == 0)
tnq = nq;

/* don't repeat if we fail with unexplicable error, i.e. not a logical
* error */
if (rc < 0) {
if (osql->replay != OSQL_RETRY_NONE) {
logmsg(LOGMSG_ERROR,
"%p Replaying failed abnormally, calling abort, nq=%d tnq=%d\n",
clnt, nq, tnq);
if (debug_switch_osql_verbose_history_replay()) {
if (osql->history) {
LISTC_FOR_EACH(&osql->history->lst, item, lnk)
{
logmsg(LOGMSG_DEBUG, "\"%s\"\n", print_stmt(clnt, item));
}
}
clnt->query_rc = CDB2ERR_INTERNAL;
}
// No more stmt in this txn
else {
osql->total_queries = osql->num_queries;

// Not yet reached the retry max
if (clnt->verify_retries < gbl_osql_verify_retries_max) {
// Start another pass of replay
if (clnt->osql.replay == OSQL_RETRY_DO) {
rc = srs_tran_replay_begin(clnt);
if (rc == 0) {
return 0;
}

int type = tran2req(clnt->dbtran.mode);
osql_sock_abort(clnt, type);
if (osql->replay != OSQL_RETRY_NONE) {
logmsg(LOGMSG_ERROR, "%p Replaying failed abnormally in dispatch, calling abort, nq=%d, rc=%d\n",
clnt, osql->num_queries, rc);
osql_sock_abort(clnt, tran2req(clnt->dbtran.mode));
}
clnt->query_rc = CDB2ERR_INTERNAL;
}
break;
}
} while (osql->replay == OSQL_RETRY_DO && clnt->verify_retries <= gbl_osql_verify_retries_max);

if (clnt->verify_retries >= gbl_osql_verify_retries_max && osql->xerr.errval) {
logmsg(LOGMSG_ERROR, "transaction from pid %d on origin host %s failed %d times with verify errors\n",
clnt->last_pid, clnt->origin, clnt->verify_retries);
/* Set to NONE to suppress the error from srs_tran_destroy(). */
osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_NONE);
// Too many retry failures
else if (osql->xerr.errval) {
logmsg(LOGMSG_ERROR, "transaction from pid %d on origin host %s failed %d times with verify errors\n",
clnt->last_pid, clnt->origin, clnt->verify_retries);
clnt->query_rc = CDB2ERR_VERIFY_ERROR;
}
}

/* replayed, free the session */
done:
// Set to NONE to suppress the error from srs_tran_destroy().
osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_NONE);

// Finish replay, free the session
if (srs_tran_destroy(clnt)) {
logmsg(LOGMSG_ERROR, "%s Fail to destroy transaction replay session\n",
__func__);
logmsg(LOGMSG_ERROR, "%s Fail to destroy transaction replay session\n", __func__);
}
if (rc && clnt->verify_retries < gbl_osql_verify_retries_max) {
logmsg(LOGMSG_ERROR, "Uncommittable transaction %d retried %d times, "
"rc=%d [global retr=%lld] nq=%d tnq=%d\n", clnt->queryid,
clnt->verify_retries, rc, gbl_verify_tran_replays, nq, tnq);
logmsg(LOGMSG_ERROR,
"Uncommittable transaction %d retried %d times, "
"rc=%d [global retr=%lld] nq=%d tnq=%d\n",
clnt->queryid, clnt->verify_retries, rc, gbl_verify_tran_replays, osql->num_queries,
osql->total_queries);
}

osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_NONE);
clnt->verify_retries = 0;

return rc;
}
if (rc && !clnt->query_rc) {
clnt->query_rc = rc;
}
clnt->done_cb = clnt->save_cb;
clnt->save_cb = NULL;

static int run_sql_query(struct sqlclntstate *clnt)
{
sqlengine_work_appsock(clnt->thd, clnt);
return 0;
}
// Clean up the residual states
clnt->done_cb(clnt);

int srs_tran_replay_inline(struct sqlclntstate *clnt)
{
return srs_tran_replay_int(clnt, run_sql_query);
return 0;
}
15 changes: 12 additions & 3 deletions db/osql_srs.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,19 @@ int srs_tran_del_last_query(struct sqlclntstate *clnt);
int srs_tran_empty(struct sqlclntstate *clnt);

/**
* Replay transaction using the current history
*
* Prepare the transaction for replay
*/
int srs_tran_replay_prepare(struct sqlclntstate *);

/**
* Begin the replay of the transaction
*/
int srs_tran_replay_begin(struct sqlclntstate *);

/**
* Schedule a query during replay
*/
int srs_tran_replay_inline(struct sqlclntstate *);
int srs_tran_replay_async(struct sqlclntstate *);

void srs_tran_print_history(struct sqlclntstate *clnt, int indent);
#endif
4 changes: 4 additions & 0 deletions db/sql.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ typedef struct osqlstate {

/* verify handling */
/* keep the log of sql strings for the current transaction */
struct srs_tran_query *replay_cursor;
struct srs_tran *history;
int num_queries;
int total_queries;
int replay; /* set this when a session is replayed, used by sorese */
int sent_column_data; /* set this if we've already sent the column data */

Expand Down Expand Up @@ -785,6 +788,7 @@ struct sqlclntstate {
uint8_t no_more_heartbeats;
uint8_t done;
plugin_func *done_cb; /* newsql_done_evbuffer */
plugin_func *save_cb;
unsigned long long sqltick, sqltick_last_seen;

int using_case_insensitive_like;
Expand Down
19 changes: 12 additions & 7 deletions db/sqlinterfaces.c
Original file line number Diff line number Diff line change
Expand Up @@ -4480,14 +4480,13 @@ int done_cb_evbuffer(struct sqlclntstate *clnt)
return -1;
}
if (clnt->osql.replay == OSQL_RETRY_DO) {
plugin_func *save_cb = clnt->done_cb;
clnt->done_cb = NULL;
int rc = srs_tran_replay_inline(clnt);
if (rc && !clnt->query_rc) {
clnt->query_rc = rc;
if (srs_tran_replay_prepare(clnt) == 0) {
return RC_INTERNAL_RETRY;
}
clnt->done_cb = save_cb;
} else if (clnt->osql.history && clnt->ctrl_sqlengine == SQLENG_NORMAL_PROCESS) {
}
/* Set to NONE to suppress the error from srs_tran_destroy(). */
osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_NONE);
if (clnt->osql.history && clnt->ctrl_sqlengine == SQLENG_NORMAL_PROCESS) {
srs_tran_destroy(clnt);
}
Pthread_mutex_lock(&lru_evbuffers_mtx); /* protect log_long_running_stmts_evbuffer() */
Expand Down Expand Up @@ -4601,6 +4600,7 @@ static void sqlengine_work_lua_thread(void *thddata, void *work)
}

int gbl_debug_sqlthd_failures;
int gbl_debug_fail_replay_dispatch;
int gbl_enable_internal_sql_stmt_caching = 1;

static int execute_verify_indexes(struct sqlthdstate *thd, struct sqlclntstate *clnt)
Expand Down Expand Up @@ -5055,6 +5055,11 @@ static int enqueue_sql_query(struct sqlclntstate *clnt, int force_dispatch)
}

struct string_ref *sr = get_ref(clnt->sql_ref);
if (gbl_debug_fail_replay_dispatch && clnt->osql.replay != OSQL_RETRY_NONE &&
clnt->verify_retries >= gbl_debug_fail_replay_dispatch) {
put_ref(&sr);
return -1;
}
if ((rc = thdpool_enqueue(pool, sqlengine_work_appsock_pp,
clnt, clnt->queue_me, sr, flags)) != 0) {
if ((in_client_trans(clnt) || clnt->osql.replay == OSQL_RETRY_DO) &&
Expand Down
5 changes: 3 additions & 2 deletions net/sqlwriter.c
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,9 @@ int sql_peer_check(struct sqlwriter *writer)
int sql_done(struct sqlwriter *writer)
{
struct sqlclntstate *clnt = writer->clnt;
if (done_cb_evbuffer(clnt) != 0) {
return -1;
int rc = done_cb_evbuffer(clnt);
if (rc != 0) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle rc RC_INTERNAL_RETRY here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is handled in newsql_done_cb by returning without doing anything. I still think the failure could be a timing issue but I'm looking into it more.

return rc;
}
Pthread_mutex_lock(&writer->wr_lock);
writer->done = 1;
Expand Down
Loading
Loading