Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 121 additions & 106 deletions db/osql_srs.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,138 +254,153 @@ int srs_tran_empty(struct sqlclntstate *clnt)
long long gbl_verify_tran_replays = 0;
int gbl_disttxn_random_retry_poll = 500;

/**
* Replay transaction using the current history
*
*/
static int srs_tran_replay_int(struct sqlclntstate *clnt, int(dispatch_fn)(struct sqlclntstate *))
int srs_tran_replay_prepare(struct sqlclntstate *clnt)
{
osqlstate_t *osql = &clnt->osql;
srs_tran_query_t *item = 0;
int rc = 0;
int nq = 0;
int tnq = 0;

clnt->verify_retries = 0;

if (!osql->history) {
if (!clnt->osql.history) {
logmsg(LOGMSG_ERROR, "Trying to replay, but no history?\n");
cheap_stack_trace();
return -1;
}

do {
/* resending writes do not repeat reads, preserve num_selected */
reset_query_effects(clnt, 0, 1); /* Reset it for each retry*/
if (!osql->history) {
logmsg(LOGMSG_ERROR, "Trying to replay, but no history?\n");
abort();
}
clnt->save_cb = clnt->done_cb;
clnt->done_cb = NULL;
clnt->verify_retries = 0;

clnt->verify_retries++;
gbl_verify_tran_replays++;
if (clnt->dist_timestamp > 0) {
int pval = gbl_disttxn_random_retry_poll;
if (pval > 1) {
poll(0, 0, rand() % pval);
}
}
int rc = dispatch_sql_query_no_wait(clnt);
if (rc == 0) {
return RC_INTERNAL_RETRY;
}

/* Replays for SERIAL or SNAPISOL will never have select or selectv */
if (clnt->dbtran.mode == TRANLEVEL_RECOM /* not for modsnap */) {
/* we need to free all the shadows but selectv table (recgenid) */
rc = osql_shadtbl_reset_for_selectv(clnt);
if (rc) {
logmsg(LOGMSG_ERROR,
"Failed to reset selectv in read committed\n");
abort();
cheap_stack_trace();
return -1;
}
} else {
osql_shadtbl_close(clnt);
}
// Dispatch failed - restore done_cb
clnt->done_cb = clnt->save_cb;
clnt->save_cb = NULL;

if (clnt->verify_retries == gbl_osql_verify_retries_max + 1) {
osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_LAST);
}
if (!clnt->query_rc) {
clnt->query_rc = clnt->osql.xerr.errval ? clnt->osql.xerr.errval : CDB2ERR_VERIFY_ERROR;
}

if (0 /*!bdb_am_i_coherent(thedb->bdb_env)*/) {
logmsg(LOGMSG_ERROR,
"Cannot replay, I am incoherent id=%d retries=%d\n",
clnt->queryid, clnt->verify_retries);
rc = CDB2ERR_VERIFY_ERROR;
break;
}
nq = 0;
clnt->start_gen = bdb_get_rep_gen(thedb->bdb_env);
LISTC_FOR_EACH(&osql->history->lst, item, lnk)
{
clnt->done = 0; /* reset done flag */
restore_stmt(clnt, item);
if ((rc = dispatch_fn(clnt)) != 0)
break;
if (!osql->history)
break;
nq++;
}
if (rc == 0)
tnq = nq;

/* don't repeat if we fail with unexplicable error, i.e. not a logical
* error */
if (rc < 0) {
if (osql->replay != OSQL_RETRY_NONE) {
logmsg(LOGMSG_ERROR,
"%p Replaying failed abnormally, calling abort, nq=%d tnq=%d\n",
clnt, nq, tnq);
if (debug_switch_osql_verbose_history_replay()) {
if (osql->history) {
LISTC_FOR_EACH(&osql->history->lst, item, lnk)
{
logmsg(LOGMSG_DEBUG, "\"%s\"\n", print_stmt(clnt, item));
}
}
}

int type = tran2req(clnt->dbtran.mode);
osql_sock_abort(clnt, type);
}
break;
return rc;
}

static int srs_tran_replay_once(struct sqlclntstate *clnt, int(dispatch_fn)(struct sqlclntstate *))
{
osqlstate_t *osql = &clnt->osql;
srs_tran_query_t *item = 0;
int rc = 0;
osql->num_queries = 0;

/* resending writes do not repeat reads, preserve num_selected */
reset_query_effects(clnt, 0, 1); /* Reset it for each retry*/
if (!osql->history) {
logmsg(LOGMSG_ERROR, "Trying to replay, but no history?\n");
abort();
}

clnt->verify_retries++;
gbl_verify_tran_replays++;
if (clnt->dist_timestamp > 0) {
int pval = gbl_disttxn_random_retry_poll;
if (pval > 1) {
poll(0, 0, rand() % pval);
}
} while (osql->replay == OSQL_RETRY_DO && clnt->verify_retries <= gbl_osql_verify_retries_max);
}

if (clnt->verify_retries >= gbl_osql_verify_retries_max && osql->xerr.errval) {
logmsg(LOGMSG_ERROR, "transaction from pid %d on origin host %s failed %d times with verify errors\n",
clnt->last_pid, clnt->origin, clnt->verify_retries);
/* Set to NONE to suppress the error from srs_tran_destroy(). */
osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_NONE);
/* Replays for SERIAL or SNAPISOL will never have select or selectv */
if (clnt->dbtran.mode == TRANLEVEL_RECOM /* not for modsnap */) {
/* we need to free all the shadows but selectv table (recgenid) */
rc = osql_shadtbl_reset_for_selectv(clnt);
if (rc) {
logmsg(LOGMSG_ERROR, "Failed to reset selectv in read committed\n");
abort();
cheap_stack_trace();
return -1;
}
} else {
osql_shadtbl_close(clnt);
}

/* replayed, free the session */
if (srs_tran_destroy(clnt)) {
logmsg(LOGMSG_ERROR, "%s Fail to destroy transaction replay session\n",
__func__);
if (clnt->verify_retries == gbl_osql_verify_retries_max + 1) {
osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_LAST);
}
if (rc && clnt->verify_retries < gbl_osql_verify_retries_max) {
logmsg(LOGMSG_ERROR, "Uncommittable transaction %d retried %d times, "
"rc=%d [global retr=%lld] nq=%d tnq=%d\n", clnt->queryid,
clnt->verify_retries, rc, gbl_verify_tran_replays, nq, tnq);

clnt->start_gen = bdb_get_rep_gen(thedb->bdb_env);
LISTC_FOR_EACH(&osql->history->lst, item, lnk)
{
clnt->done = 0; /* reset done flag */
restore_stmt(clnt, item);
if ((rc = dispatch_fn(clnt)) != 0)
return rc;
if (!osql->history)
return 0; // normal completion - inner commit consumed history
osql->num_queries++;
}

osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_NONE);
clnt->verify_retries = 0;
// Notice that this won't be updated upon a successful replay
// the history is cleared in `handle_sql_commitrollback` and
// the current function returns early.
osql->total_queries = osql->num_queries;

return rc;
}

static int run_sql_query(struct sqlclntstate *clnt)
{
clnt->osql.in_replay_nested = 1;
sqlengine_work_appsock(clnt->thd, clnt);
clnt->osql.in_replay_nested = 0;
return 0;
}

int srs_tran_replay_inline(struct sqlclntstate *clnt)
int srs_tran_replay(struct sqlclntstate *clnt)
{
return srs_tran_replay_int(clnt, run_sql_query);
osqlstate_t *osql = &clnt->osql;
int rc = 0;

// replay all the stmts
rc = srs_tran_replay_once(clnt, run_sql_query);

int more = (rc == 0 && osql->replay == OSQL_RETRY_DO && clnt->verify_retries <= gbl_osql_verify_retries_max);

int redispatched = 0;
if (more) {
redispatched = (dispatch_sql_query_no_wait(clnt) == 0);
}
if (redispatched) {
return RC_INTERNAL_RETRY;
}

int replay_succeeded = (osql->replay == OSQL_RETRY_NONE && rc == 0);

// Replay succeeded or there's an error and we need to stop replay

if (clnt->verify_retries >= gbl_osql_verify_retries_max && osql->xerr.errval) {
// Exhausted all retries
logmsg(LOGMSG_ERROR, "transaction from pid %d on origin host %s failed %d times with verify errors\n",
clnt->last_pid, clnt->origin, clnt->verify_retries);
}
if (rc && clnt->verify_retries < gbl_osql_verify_retries_max) {
logmsg(LOGMSG_ERROR,
"Uncommittable transaction %d retried %d times, "
"rc=%d [global retr=%lld] nq=%d tnq=%d\n",
clnt->queryid, clnt->verify_retries, rc, gbl_verify_tran_replays, osql->num_queries,
osql->total_queries);
}

/* Single point to surface a failure code to the client. */
if (!replay_succeeded && !clnt->query_rc) {
clnt->query_rc = osql->xerr.errval ? osql->xerr.errval : CDB2ERR_VERIFY_ERROR;
}

osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_NONE);

/* replayed, free the session */
if (srs_tran_destroy(clnt)) {
logmsg(LOGMSG_ERROR, "%s Fail to destroy transaction replay session\n", __func__);
}

clnt->done_cb = clnt->save_cb;
clnt->save_cb = NULL;
clnt->verify_retries = 0;

return rc;
}
9 changes: 7 additions & 2 deletions db/osql_srs.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,15 @@ int srs_tran_del_last_query(struct sqlclntstate *clnt);
int srs_tran_empty(struct sqlclntstate *clnt);

/**
* Replay transaction using the current history
* Prepare and dispatch the replay
*/
int srs_tran_replay_prepare(struct sqlclntstate *clnt);

/**
* Replay transaction once using the current history
*
*/
int srs_tran_replay_inline(struct sqlclntstate *);
int srs_tran_replay(struct sqlclntstate *clnt);

void srs_tran_print_history(struct sqlclntstate *clnt, int indent);
#endif
4 changes: 4 additions & 0 deletions db/sql.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ typedef struct osqlstate {
/* verify handling */
/* keep the log of sql strings for the current transaction */
struct srs_tran *history;
int num_queries; /* statements dispatched in the current replay iteration */
int total_queries; /* statements dispatched in the most recent clean replay iteration */
int in_replay_nested;
int replay; /* set this when a session is replayed, used by sorese */
int sent_column_data; /* set this if we've already sent the column data */

Expand Down Expand Up @@ -785,6 +788,7 @@ struct sqlclntstate {
uint8_t no_more_heartbeats;
uint8_t done;
plugin_func *done_cb; /* newsql_done_evbuffer */
plugin_func *save_cb;
unsigned long long sqltick, sqltick_last_seen;

int using_case_insensitive_like;
Expand Down
36 changes: 25 additions & 11 deletions db/sqlinterfaces.c
Original file line number Diff line number Diff line change
Expand Up @@ -4480,14 +4480,15 @@ int done_cb_evbuffer(struct sqlclntstate *clnt)
return -1;
}
if (clnt->osql.replay == OSQL_RETRY_DO) {
plugin_func *save_cb = clnt->done_cb;
clnt->done_cb = NULL;
int rc = srs_tran_replay_inline(clnt);
if (rc && !clnt->query_rc) {
clnt->query_rc = rc;
int rc = srs_tran_replay_prepare(clnt);
if (rc == RC_INTERNAL_RETRY) {
return rc;
}
clnt->done_cb = save_cb;
} else if (clnt->osql.history && clnt->ctrl_sqlengine == SQLENG_NORMAL_PROCESS) {
}
if (clnt->osql.replay != OSQL_RETRY_NONE) {
osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_NONE);
}
if (clnt->osql.history && clnt->ctrl_sqlengine == SQLENG_NORMAL_PROCESS) {
srs_tran_destroy(clnt);
}
Pthread_mutex_lock(&lru_evbuffers_mtx); /* protect log_long_running_stmts_evbuffer() */
Expand Down Expand Up @@ -4854,6 +4855,17 @@ void sqlengine_work_appsock(struct sqlthdstate *thd, struct sqlclntstate *clnt)

assert(clnt->dbtran.pStmt == NULL);

if (clnt->osql.replay == OSQL_RETRY_DO && !clnt->osql.in_replay_nested) {
if (srs_tran_replay(clnt) == RC_INTERNAL_RETRY) {
/* Another iteration was scheduled on a new worker.
* That worker now owns the clnt; do NOT signal_clnt_as_done
* here or it would race with the new worker's enqueue. */
thrman_setid(thrman_self(), "[done]");
return;
}
goto done;
}

/* everything going in is cursor based */
int rc = get_curtran(thedb->bdb_env, clnt);
if (rc) {
Expand Down Expand Up @@ -4895,10 +4907,6 @@ void sqlengine_work_appsock(struct sqlthdstate *thd, struct sqlclntstate *clnt)
clnt->query_rc = execute_sql_query(thd, clnt);
}

if (clnt->sql_ref) {
put_ref(&clnt->sql_ref);
}

osql_shadtbl_done_query(thedb->bdb_env, clnt);
thrman_setfd(thd->thr_self, -1);

Expand All @@ -4911,6 +4919,12 @@ void sqlengine_work_appsock(struct sqlthdstate *thd, struct sqlclntstate *clnt)
logmsg(LOGMSG_ERROR, "%s: unable to destroy a CURSOR transaction!\n", __func__);
}
clnt->osql.timings.query_finished = osql_log_time();

done:
if (clnt->sql_ref) {
put_ref(&clnt->sql_ref);
}

osql_log_time_done(clnt);
clnt_change_state(clnt, CONNECTION_IDLE);
debug_close_clnt(clnt);
Expand Down
5 changes: 3 additions & 2 deletions net/sqlwriter.c
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,9 @@ int sql_peer_check(struct sqlwriter *writer)
int sql_done(struct sqlwriter *writer)
{
struct sqlclntstate *clnt = writer->clnt;
if (done_cb_evbuffer(clnt) != 0) {
return -1;
int rc = done_cb_evbuffer(clnt);
if (rc != 0) {
return rc;
}
Pthread_mutex_lock(&writer->wr_lock);
writer->done = 1;
Expand Down
5 changes: 3 additions & 2 deletions plugins/newsql/newsql_evbuffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,16 @@ static void newsql_reset_evbuffer(struct newsql_appdata_evbuffer *appdata)
static int newsql_done_cb(struct sqlclntstate *clnt)
{
struct newsql_appdata_evbuffer *appdata = clnt->appdata;
if (sql_done(appdata->writer) == 0) {
int rc = sql_done(appdata->writer);
if (rc == 0) {
if (clnt->added_to_hist) {
clnt->added_to_hist = 0;
} else {
free_pb_cdb2query(clnt, appdata->query);
}
appdata->query = NULL;
evtimer_once(appdata->base, rd_hdr, appdata);
} else {
} else if (rc < 0) {
appdata->cleanup_ev = event_new(appdata->base, -1, 0, newsql_cleanup, appdata);
event_active(appdata->cleanup_ev, 0, 0);
}
Expand Down
2 changes: 1 addition & 1 deletion tests/replay_trans.test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ else
include $(TESTSROOTDIR)/testcase.mk
endif
ifeq ($(TEST_TIMEOUT),)
export TEST_TIMEOUT=1m
export TEST_TIMEOUT=3m
endif
Loading
Loading