From 95a7c32bc5fb0b764b7a8fa12794ef194f2db4af Mon Sep 17 00:00:00 2001 From: Johnny Miller <163300+millerjp@users.noreply.github.com> Date: Fri, 3 Apr 2026 12:12:16 +0200 Subject: [PATCH 1/4] Add optional target consistency level override Introduce ZDM_TARGET_CONSISTENCY_LEVEL config option that overrides the consistency level for all requests forwarded to the target cluster. The origin cluster always receives the original client-requested consistency level, preserving the consistency contract on the source of truth. This is useful during migration when the target is being populated via dual writes. A weaker CL such as LOCAL_ONE on the target reduces the risk of write failures caused by target-side instability (node outages, streaming, compaction pressure). Target data can be repaired after migration, so temporary under-replication is acceptable. The feature is strictly opt-in: when the config is absent, empty, or unset, the proxy forwards requests with the original client CL (existing behaviour preserved). Invalid values are rejected at startup. A WARN log is emitted when the override is active. Verified end-to-end against Cassandra 5.0.6 via system_traces: inline Query, prepared Execute, and Batch writes all show the overridden CL on the target while origin retains the client-requested CL. --- .gitignore | 5 +- docs/assets/zdm-config-reference.yml | 19 + .../target_write_consistency_test.go | 429 ++++++++++++++++++ proxy/launch.go | 7 + proxy/pkg/config/config.go | 51 +++ .../pkg/config/config_target_write_cl_test.go | 157 +++++++ proxy/pkg/zdmproxy/clienthandler.go | 62 +++ 7 files changed, 729 insertions(+), 1 deletion(-) create mode 100644 integration-tests/target_write_consistency_test.go create mode 100644 proxy/pkg/config/config_target_write_cl_test.go diff --git a/.gitignore b/.gitignore index 5d4c8caa..bad788bd 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,7 @@ vendor __debug_bin .vscode /proxy/proxy -/proxy/*.prof \ No newline at end of file +/proxy/*.prof + +# Claude Code +CLAUDE.md \ No newline at end of file diff --git a/docs/assets/zdm-config-reference.yml b/docs/assets/zdm-config-reference.yml index e9d239f0..6b771f4a 100644 --- a/docs/assets/zdm-config-reference.yml +++ b/docs/assets/zdm-config-reference.yml @@ -180,3 +180,22 @@ proxy_listen_port: 14002 # Control connection failure threshold. If threshold is exceeded, # readiness probe of ZDM will report failure and pod will be recreated. # heartbeat_failure_threshold: 1 + +# Override the consistency level used for all requests forwarded to the target cluster. +# When this property is set, the proxy replaces the client-requested consistency level with the +# specified value on every request sent to the target cluster (reads and writes). The origin cluster +# always receives the original client-requested consistency level, preserving the consistency +# contract on the source of truth. +# +# This is useful during migration when the target cluster is being populated via dual writes. Using +# a weaker consistency level such as LOCAL_ONE on the target reduces the risk of write failures +# caused by target-side instability (e.g. node outages, streaming, or compaction pressure). Because +# the target data can be repaired after migration is complete, temporary under-replication is +# acceptable and preferable to failing writes that would otherwise succeed on origin. +# +# When this property is absent, empty, or not set, the proxy forwards requests to the target with the +# original client-requested consistency level (default behavior, no override). +# +# Valid values: ANY, ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE +# (case-insensitive). Serial consistency levels (SERIAL, LOCAL_SERIAL) are not valid here. +# target_consistency_level: LOCAL_ONE diff --git a/integration-tests/target_write_consistency_test.go b/integration-tests/target_write_consistency_test.go new file mode 100644 index 00000000..ecb6e530 --- /dev/null +++ b/integration-tests/target_write_consistency_test.go @@ -0,0 +1,429 @@ +package integration_tests + +import ( + "context" + "encoding/json" + "testing" + + "github.com/datastax/go-cassandra-native-protocol/client" + "github.com/datastax/go-cassandra-native-protocol/frame" + "github.com/datastax/go-cassandra-native-protocol/message" + "github.com/datastax/go-cassandra-native-protocol/primitive" + "github.com/datastax/zdm-proxy/integration-tests/env" + "github.com/datastax/zdm-proxy/integration-tests/setup" + "github.com/datastax/zdm-proxy/integration-tests/simulacron" + "github.com/stretchr/testify/require" +) + +// getWriteQueries returns QUERY-type log entries for a given cluster. +func getWriteQueries(t *testing.T, cluster *simulacron.Cluster) []*simulacron.RequestLogEntry { + logs, err := cluster.GetLogsByType(simulacron.QueryTypeQuery) + require.NoError(t, err) + var queries []*simulacron.RequestLogEntry + for _, dc := range logs.Datacenters { + for _, node := range dc.Nodes { + queries = append(queries, node.Queries...) + } + } + return queries +} + +// TestTargetConsistencyOverride_Disabled verifies that when the override config is NOT set, +// both origin and target receive the client-requested consistency level unchanged. +func TestTargetConsistencyOverride_Disabled(t *testing.T) { + // Default config — no override + testSetup, err := setup.NewSimulacronTestSetup(t) + require.NoError(t, err) + defer testSetup.Cleanup() + + queryPrime := + simulacron.WhenQuery( + "INSERT INTO myks.users (name) VALUES ('alice')", + simulacron.NewWhenQueryOptions()). + ThenSuccess() + + err = testSetup.Origin.Prime(queryPrime) + require.NoError(t, err) + err = testSetup.Target.Prime(queryPrime) + require.NoError(t, err) + + // Clear logs before test + err = testSetup.Origin.DeleteLogs() + require.NoError(t, err) + err = testSetup.Target.DeleteLogs() + require.NoError(t, err) + + // Send a write with LOCAL_QUORUM using low-level client + cqlClient := client.NewCqlClient("127.0.0.1:14002", nil) + cqlConn, err := cqlClient.ConnectAndInit(context.Background(), env.DefaultProtocolVersionSimulacron, 0) + require.NoError(t, err) + defer cqlConn.Close() + + queryMsg := &message.Query{ + Query: "INSERT INTO myks.users (name) VALUES ('alice')", + Options: &message.QueryOptions{ + Consistency: primitive.ConsistencyLevelLocalQuorum, + }, + } + + rsp, err := cqlConn.SendAndReceive(frame.NewFrame(env.DefaultProtocolVersionSimulacron, 0, queryMsg)) + require.NoError(t, err) + require.Equal(t, primitive.OpCodeResult, rsp.Header.OpCode) + + // Verify origin received LOCAL_QUORUM + originQueries := getWriteQueries(t, testSetup.Origin) + require.GreaterOrEqual(t, len(originQueries), 1, "expected at least 1 query on origin") + lastOriginQuery := originQueries[len(originQueries)-1] + require.Equal(t, "LOCAL_QUORUM", lastOriginQuery.ConsistencyLevel, + "origin should receive client-requested LOCAL_QUORUM") + + // Verify target also received LOCAL_QUORUM (no override) + targetQueries := getWriteQueries(t, testSetup.Target) + require.GreaterOrEqual(t, len(targetQueries), 1, "expected at least 1 query on target") + lastTargetQuery := targetQueries[len(targetQueries)-1] + require.Equal(t, "LOCAL_QUORUM", lastTargetQuery.ConsistencyLevel, + "target should receive client-requested LOCAL_QUORUM when override is disabled") +} + +// TestTargetConsistencyOverride_Enabled verifies that when the override is set to LOCAL_ONE, +// origin receives the original CL but target receives LOCAL_ONE. +func TestTargetConsistencyOverride_Enabled(t *testing.T) { + c := setup.NewTestConfig("", "") + c.TargetConsistencyLevel = "LOCAL_ONE" + + testSetup, err := setup.NewSimulacronTestSetupWithConfig(t, c) + require.NoError(t, err) + defer testSetup.Cleanup() + + queryPrime := + simulacron.WhenQuery( + "INSERT INTO myks.users (name) VALUES ('bob')", + simulacron.NewWhenQueryOptions()). + ThenSuccess() + + err = testSetup.Origin.Prime(queryPrime) + require.NoError(t, err) + err = testSetup.Target.Prime(queryPrime) + require.NoError(t, err) + + // Clear logs before test + err = testSetup.Origin.DeleteLogs() + require.NoError(t, err) + err = testSetup.Target.DeleteLogs() + require.NoError(t, err) + + // Send a write with LOCAL_QUORUM + cqlClient := client.NewCqlClient("127.0.0.1:14002", nil) + cqlConn, err := cqlClient.ConnectAndInit(context.Background(), env.DefaultProtocolVersionSimulacron, 0) + require.NoError(t, err) + defer cqlConn.Close() + + queryMsg := &message.Query{ + Query: "INSERT INTO myks.users (name) VALUES ('bob')", + Options: &message.QueryOptions{ + Consistency: primitive.ConsistencyLevelLocalQuorum, + }, + } + + rsp, err := cqlConn.SendAndReceive(frame.NewFrame(env.DefaultProtocolVersionSimulacron, 0, queryMsg)) + require.NoError(t, err) + require.Equal(t, primitive.OpCodeResult, rsp.Header.OpCode) + + // Verify origin still receives LOCAL_QUORUM (unchanged) + originQueries := getWriteQueries(t, testSetup.Origin) + require.GreaterOrEqual(t, len(originQueries), 1, "expected at least 1 query on origin") + lastOriginQuery := originQueries[len(originQueries)-1] + require.Equal(t, "LOCAL_QUORUM", lastOriginQuery.ConsistencyLevel, + "origin should always receive client-requested LOCAL_QUORUM") + + // Verify target receives LOCAL_ONE (overridden) + targetQueries := getWriteQueries(t, testSetup.Target) + require.GreaterOrEqual(t, len(targetQueries), 1, "expected at least 1 query on target") + lastTargetQuery := targetQueries[len(targetQueries)-1] + require.Equal(t, "LOCAL_ONE", lastTargetQuery.ConsistencyLevel, + "target should receive overridden LOCAL_ONE") +} + +// TestTargetConsistencyOverride_ReadAlsoAffected verifies that read requests +// routed to the target cluster are also affected by the consistency override. +// With default config (PrimaryCluster=ORIGIN), reads go to origin only, so +// the target does not receive them. This test uses PrimaryCluster=TARGET to +// route reads to target and verify the override applies. +func TestTargetConsistencyOverride_ReadAlsoAffected(t *testing.T) { + c := setup.NewTestConfig("", "") + c.TargetConsistencyLevel = "LOCAL_ONE" + c.PrimaryCluster = "TARGET" + + testSetup, err := setup.NewSimulacronTestSetupWithConfig(t, c) + require.NoError(t, err) + defer testSetup.Cleanup() + + expectedRows := simulacron.NewRowsResult( + map[string]simulacron.DataType{"name": simulacron.DataTypeText}). + WithRow(map[string]interface{}{"name": "alice"}) + + queryPrime := + simulacron.WhenQuery( + "SELECT name FROM myks.users", + simulacron.NewWhenQueryOptions()). + ThenRowsSuccess(expectedRows) + + err = testSetup.Target.Prime(queryPrime) + require.NoError(t, err) + + // Clear logs before test + err = testSetup.Target.DeleteLogs() + require.NoError(t, err) + + // Send a read with LOCAL_QUORUM + cqlClient := client.NewCqlClient("127.0.0.1:14002", nil) + cqlConn, err := cqlClient.ConnectAndInit(context.Background(), env.DefaultProtocolVersionSimulacron, 0) + require.NoError(t, err) + defer cqlConn.Close() + + queryMsg := &message.Query{ + Query: "SELECT name FROM myks.users", + Options: &message.QueryOptions{ + Consistency: primitive.ConsistencyLevelLocalQuorum, + }, + } + + rsp, err := cqlConn.SendAndReceive(frame.NewFrame(env.DefaultProtocolVersionSimulacron, 0, queryMsg)) + require.NoError(t, err) + require.Equal(t, primitive.OpCodeResult, rsp.Header.OpCode) + + // Verify target received LOCAL_ONE (overridden), not LOCAL_QUORUM + targetQueries := getWriteQueries(t, testSetup.Target) + require.GreaterOrEqual(t, len(targetQueries), 1, "expected at least 1 query on target") + lastTargetQuery := targetQueries[len(targetQueries)-1] + require.Equal(t, "LOCAL_ONE", lastTargetQuery.ConsistencyLevel, + "read queries to target should also be affected by consistency override") +} + +// TestTargetConsistencyOverride_Enabled_ONE verifies override with a different CL value (ONE). +func TestTargetConsistencyOverride_Enabled_ONE(t *testing.T) { + c := setup.NewTestConfig("", "") + c.TargetConsistencyLevel = "ONE" + + testSetup, err := setup.NewSimulacronTestSetupWithConfig(t, c) + require.NoError(t, err) + defer testSetup.Cleanup() + + queryPrime := + simulacron.WhenQuery( + "INSERT INTO myks.users (name) VALUES ('charlie')", + simulacron.NewWhenQueryOptions()). + ThenSuccess() + + err = testSetup.Origin.Prime(queryPrime) + require.NoError(t, err) + err = testSetup.Target.Prime(queryPrime) + require.NoError(t, err) + + err = testSetup.Origin.DeleteLogs() + require.NoError(t, err) + err = testSetup.Target.DeleteLogs() + require.NoError(t, err) + + cqlClient := client.NewCqlClient("127.0.0.1:14002", nil) + cqlConn, err := cqlClient.ConnectAndInit(context.Background(), env.DefaultProtocolVersionSimulacron, 0) + require.NoError(t, err) + defer cqlConn.Close() + + queryMsg := &message.Query{ + Query: "INSERT INTO myks.users (name) VALUES ('charlie')", + Options: &message.QueryOptions{ + Consistency: primitive.ConsistencyLevelAll, + }, + } + + rsp, err := cqlConn.SendAndReceive(frame.NewFrame(env.DefaultProtocolVersionSimulacron, 0, queryMsg)) + require.NoError(t, err) + require.Equal(t, primitive.OpCodeResult, rsp.Header.OpCode) + + // Origin gets ALL (client-requested) + originQueries := getWriteQueries(t, testSetup.Origin) + require.GreaterOrEqual(t, len(originQueries), 1) + require.Equal(t, "ALL", originQueries[len(originQueries)-1].ConsistencyLevel) + + // Target gets ONE (overridden) + targetQueries := getWriteQueries(t, testSetup.Target) + require.GreaterOrEqual(t, len(targetQueries), 1) + require.Equal(t, "ONE", targetQueries[len(targetQueries)-1].ConsistencyLevel) +} + +// TestTargetConsistencyOverride_PreparedStatement verifies that the override applies +// to EXECUTE messages (prepared statement execution), not just inline Query writes. +func TestTargetConsistencyOverride_PreparedStatement(t *testing.T) { + c := setup.NewTestConfig("", "") + c.TargetConsistencyLevel = "LOCAL_ONE" + + testSetup, err := setup.NewSimulacronTestSetupWithConfig(t, c) + require.NoError(t, err) + defer testSetup.Cleanup() + + // Prime the query for both clusters (simulacron needs this for PREPARE + EXECUTE) + queryPrime := + simulacron.WhenQuery( + "INSERT INTO myks.users (name) VALUES (?)", + simulacron.NewWhenQueryOptions(). + WithPositionalParameter(simulacron.DataTypeText, "dave")). + ThenSuccess() + + err = testSetup.Origin.Prime(queryPrime) + require.NoError(t, err) + err = testSetup.Target.Prime(queryPrime) + require.NoError(t, err) + + // Connect with low-level client + cqlClient := client.NewCqlClient("127.0.0.1:14002", nil) + cqlConn, err := cqlClient.ConnectAndInit(context.Background(), env.DefaultProtocolVersionSimulacron, 0) + require.NoError(t, err) + defer cqlConn.Close() + + // Step 1: PREPARE + prepareMsg := &message.Prepare{ + Query: "INSERT INTO myks.users (name) VALUES (?)", + } + prepareResp, err := cqlConn.SendAndReceive(frame.NewFrame(env.DefaultProtocolVersionSimulacron, 0, prepareMsg)) + require.NoError(t, err) + + prepared, ok := prepareResp.Body.Message.(*message.PreparedResult) + require.True(t, ok, "expected PreparedResult but got %T", prepareResp.Body.Message) + + // Clear logs between PREPARE and EXECUTE + err = testSetup.Origin.DeleteLogs() + require.NoError(t, err) + err = testSetup.Target.DeleteLogs() + require.NoError(t, err) + + // Step 2: EXECUTE with LOCAL_QUORUM + executeMsg := &message.Execute{ + QueryId: prepared.PreparedQueryId, + ResultMetadataId: prepared.ResultMetadataId, + Options: &message.QueryOptions{ + Consistency: primitive.ConsistencyLevelLocalQuorum, + PositionalValues: []*primitive.Value{primitive.NewValue([]byte("dave"))}, + }, + } + execResp, err := cqlConn.SendAndReceive(frame.NewFrame(env.DefaultProtocolVersionSimulacron, 0, executeMsg)) + require.NoError(t, err) + require.Equal(t, primitive.OpCodeResult, execResp.Header.OpCode) + + // Check origin EXECUTE logs — should have LOCAL_QUORUM + originExecLogs, err := testSetup.Origin.GetLogsByType(simulacron.QueryTypeExecute) + require.NoError(t, err) + originExecQueries := originExecLogs.Datacenters[0].Nodes[0].Queries + require.GreaterOrEqual(t, len(originExecQueries), 1, "expected at least 1 EXECUTE on origin") + + lastOriginExec := originExecQueries[len(originExecQueries)-1] + var originExecMsg simulacron.ExecuteMessage + err = json.Unmarshal(lastOriginExec.Frame.Message, &originExecMsg) + require.NoError(t, err) + require.NotNil(t, originExecMsg.Options) + require.Equal(t, "LOCAL_QUORUM", originExecMsg.Options.Consistency, + "origin EXECUTE should retain client-requested LOCAL_QUORUM") + + // Check target EXECUTE logs — should have LOCAL_ONE (overridden) + targetExecLogs, err := testSetup.Target.GetLogsByType(simulacron.QueryTypeExecute) + require.NoError(t, err) + targetExecQueries := targetExecLogs.Datacenters[0].Nodes[0].Queries + require.GreaterOrEqual(t, len(targetExecQueries), 1, "expected at least 1 EXECUTE on target") + + lastTargetExec := targetExecQueries[len(targetExecQueries)-1] + var targetExecMsg simulacron.ExecuteMessage + err = json.Unmarshal(lastTargetExec.Frame.Message, &targetExecMsg) + require.NoError(t, err) + require.NotNil(t, targetExecMsg.Options) + require.Equal(t, "LOCAL_ONE", targetExecMsg.Options.Consistency, + "target EXECUTE should have overridden LOCAL_ONE") +} + +// TestTargetConsistencyOverride_Batch verifies that the override applies to BATCH messages. +func TestTargetConsistencyOverride_Batch(t *testing.T) { + c := setup.NewTestConfig("", "") + c.TargetConsistencyLevel = "LOCAL_ONE" + + testSetup, err := setup.NewSimulacronTestSetupWithConfig(t, c) + require.NoError(t, err) + defer testSetup.Cleanup() + + // Prime individual queries that will be part of the batch + queryPrime1 := + simulacron.WhenQuery( + "INSERT INTO myks.users (name) VALUES ('eve')", + simulacron.NewWhenQueryOptions()). + ThenSuccess() + queryPrime2 := + simulacron.WhenQuery( + "INSERT INTO myks.users (name) VALUES ('frank')", + simulacron.NewWhenQueryOptions()). + ThenSuccess() + + for _, prime := range []simulacron.Then{queryPrime1, queryPrime2} { + err = testSetup.Origin.Prime(prime) + require.NoError(t, err) + err = testSetup.Target.Prime(prime) + require.NoError(t, err) + } + + cqlClient := client.NewCqlClient("127.0.0.1:14002", nil) + cqlConn, err := cqlClient.ConnectAndInit(context.Background(), env.DefaultProtocolVersionSimulacron, 0) + require.NoError(t, err) + defer cqlConn.Close() + + // Clear logs + err = testSetup.Origin.DeleteLogs() + require.NoError(t, err) + err = testSetup.Target.DeleteLogs() + require.NoError(t, err) + + // Send a BATCH with LOCAL_QUORUM + batchMsg := &message.Batch{ + Type: primitive.BatchTypeLogged, + Children: []*message.BatchChild{ + { + Query: "INSERT INTO myks.users (name) VALUES ('eve')", + }, + { + Query: "INSERT INTO myks.users (name) VALUES ('frank')", + }, + }, + Consistency: primitive.ConsistencyLevelLocalQuorum, + } + + batchResp, err := cqlConn.SendAndReceive(frame.NewFrame(env.DefaultProtocolVersionSimulacron, 0, batchMsg)) + require.NoError(t, err) + require.Equal(t, primitive.OpCodeResult, batchResp.Header.OpCode) + + // Helper to extract batch messages from logs + getBatchMessages := func(cluster *simulacron.Cluster) []*simulacron.BatchMessage { + logs, err := cluster.GetLogsByType(simulacron.QueryTypeBatch) + require.NoError(t, err) + var batches []*simulacron.BatchMessage + for _, dc := range logs.Datacenters { + for _, node := range dc.Nodes { + for _, entry := range node.Queries { + var bm simulacron.BatchMessage + err := json.Unmarshal(entry.Frame.Message, &bm) + if err == nil { + batches = append(batches, &bm) + } + } + } + } + return batches + } + + // Check origin BATCH — should have LOCAL_QUORUM + originBatches := getBatchMessages(testSetup.Origin) + require.GreaterOrEqual(t, len(originBatches), 1, "expected at least 1 BATCH on origin") + require.Equal(t, "LOCAL_QUORUM", originBatches[len(originBatches)-1].Consistency, + "origin BATCH should retain client-requested LOCAL_QUORUM") + + // Check target BATCH — should have LOCAL_ONE (overridden) + targetBatches := getBatchMessages(testSetup.Target) + require.GreaterOrEqual(t, len(targetBatches), 1, "expected at least 1 BATCH on target") + require.Equal(t, "LOCAL_ONE", targetBatches[len(targetBatches)-1].Consistency, + "target BATCH should have overridden LOCAL_ONE") +} diff --git a/proxy/launch.go b/proxy/launch.go index 9882457b..39f9d31d 100644 --- a/proxy/launch.go +++ b/proxy/launch.go @@ -56,6 +56,13 @@ func launchProxy(profilingSupported bool) { } log.SetLevel(logLevel) + targetCL, _ := conf.ParseTargetConsistencyLevel() + if targetCL != nil { + log.Warnf("Target consistency level override is ENABLED: all requests to the target cluster will use %v instead of the client-requested consistency level", *targetCL) + } else { + log.Infof("Target consistency level override: disabled") + } + if profilingSupported { log.Debugf("Proxy built with profiling support") } else { diff --git a/proxy/pkg/config/config.go b/proxy/pkg/config/config.go index fe3cebdb..a589ebde 100644 --- a/proxy/pkg/config/config.go +++ b/proxy/pkg/config/config.go @@ -105,6 +105,13 @@ type Config struct { HeartbeatRetryBackoffFactor float64 `default:"2" split_words:"true" yaml:"heartbeat_retry_backoff_factor"` HeartbeatFailureThreshold int `default:"1" split_words:"true" yaml:"heartbeat_failure_threshold"` + // Target consistency level override. + // When set, overrides the consistency level for ALL requests (reads and writes) sent to the target cluster. + // The origin/source cluster always uses the client-requested consistency level. + // Valid values: ANY, ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE (case-insensitive). + // Empty or unset means disabled (default behavior, no override). + TargetConsistencyLevel string `default:"" split_words:"true" yaml:"target_consistency_level"` + ////////////////////////////////////////////////////////////////////// /// THE SETTINGS BELOW AREN'T SUPPORTED AND MAY CHANGE AT ANY TIME /// ////////////////////////////////////////////////////////////////////// @@ -338,6 +345,11 @@ func (c *Config) Validate() error { return err } + _, err = c.ParseTargetConsistencyLevel() + if err != nil { + return err + } + return nil } @@ -392,6 +404,45 @@ func (c *Config) ParseReadMode() (common.ReadMode, error) { } } +// consistencyLevelMap maps uppercase consistency level names to primitive.ConsistencyLevel values. +// Only non-serial consistency levels are valid for write CL override. +var consistencyLevelMap = map[string]primitive.ConsistencyLevel{ + "ANY": primitive.ConsistencyLevelAny, + "ONE": primitive.ConsistencyLevelOne, + "TWO": primitive.ConsistencyLevelTwo, + "THREE": primitive.ConsistencyLevelThree, + "QUORUM": primitive.ConsistencyLevelQuorum, + "ALL": primitive.ConsistencyLevelAll, + "LOCAL_QUORUM": primitive.ConsistencyLevelLocalQuorum, + "EACH_QUORUM": primitive.ConsistencyLevelEachQuorum, + "LOCAL_ONE": primitive.ConsistencyLevelLocalOne, +} + +// ParseTargetConsistencyLevel parses the target consistency level override. +// Returns nil if the feature is disabled (empty/unset config value). +// Returns a non-nil pointer to the parsed consistency level if valid. +// Returns an error if the value is set but invalid. +func (c *Config) ParseTargetConsistencyLevel() (*primitive.ConsistencyLevel, error) { + trimmed := strings.TrimSpace(c.TargetConsistencyLevel) + if trimmed == "" { + return nil, nil + } + + upper := strings.ToUpper(trimmed) + if cl, ok := consistencyLevelMap[upper]; ok { + return &cl, nil + } + + validValues := make([]string, 0, len(consistencyLevelMap)) + for k := range consistencyLevelMap { + validValues = append(validValues, k) + } + slices.Sort(validValues) + return nil, fmt.Errorf( + "invalid value for ZDM_TARGET_CONSISTENCY_LEVEL: %q; valid values are: %v", + trimmed, strings.Join(validValues, ", ")) +} + func (c *Config) ParseControlConnMaxProtocolVersion() (primitive.ProtocolVersion, error) { if strings.EqualFold(c.ControlConnMaxProtocolVersion, "DseV2") { return primitive.ProtocolVersionDse2, nil diff --git a/proxy/pkg/config/config_target_write_cl_test.go b/proxy/pkg/config/config_target_write_cl_test.go new file mode 100644 index 00000000..a32a6bbe --- /dev/null +++ b/proxy/pkg/config/config_target_write_cl_test.go @@ -0,0 +1,157 @@ +package config + +import ( + "testing" + + "github.com/datastax/go-cassandra-native-protocol/primitive" + "github.com/stretchr/testify/require" +) + +func TestParseTargetConsistencyLevel_Empty(t *testing.T) { + c := &Config{TargetConsistencyLevel: ""} + cl, err := c.ParseTargetConsistencyLevel() + require.NoError(t, err) + require.Nil(t, cl, "empty value should return nil (disabled)") +} + +func TestParseTargetConsistencyLevel_Whitespace(t *testing.T) { + c := &Config{TargetConsistencyLevel: " "} + cl, err := c.ParseTargetConsistencyLevel() + require.NoError(t, err) + require.Nil(t, cl, "whitespace-only value should return nil (disabled)") +} + +func TestParseTargetConsistencyLevel_ValidValues(t *testing.T) { + tests := []struct { + input string + expected primitive.ConsistencyLevel + }{ + {"ANY", primitive.ConsistencyLevelAny}, + {"ONE", primitive.ConsistencyLevelOne}, + {"TWO", primitive.ConsistencyLevelTwo}, + {"THREE", primitive.ConsistencyLevelThree}, + {"QUORUM", primitive.ConsistencyLevelQuorum}, + {"ALL", primitive.ConsistencyLevelAll}, + {"LOCAL_QUORUM", primitive.ConsistencyLevelLocalQuorum}, + {"EACH_QUORUM", primitive.ConsistencyLevelEachQuorum}, + {"LOCAL_ONE", primitive.ConsistencyLevelLocalOne}, + // case-insensitive + {"local_one", primitive.ConsistencyLevelLocalOne}, + {"Local_Quorum", primitive.ConsistencyLevelLocalQuorum}, + {"one", primitive.ConsistencyLevelOne}, + } + + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + c := &Config{TargetConsistencyLevel: tt.input} + cl, err := c.ParseTargetConsistencyLevel() + require.NoError(t, err) + require.NotNil(t, cl) + require.Equal(t, tt.expected, *cl) + }) + } +} + +func TestParseTargetConsistencyLevel_InvalidValues(t *testing.T) { + tests := []string{ + "SERIAL", + "LOCAL_SERIAL", + "INVALID", + "local_serial", + "QUOROM", + "12345", + } + + for _, input := range tests { + t.Run(input, func(t *testing.T) { + c := &Config{TargetConsistencyLevel: input} + cl, err := c.ParseTargetConsistencyLevel() + require.Error(t, err) + require.Nil(t, cl) + require.Contains(t, err.Error(), "ZDM_TARGET_CONSISTENCY_LEVEL") + }) + } +} + +func TestParseTargetConsistencyLevel_WithWhitespacePadding(t *testing.T) { + c := &Config{TargetConsistencyLevel: " LOCAL_ONE "} + cl, err := c.ParseTargetConsistencyLevel() + require.NoError(t, err) + require.NotNil(t, cl) + require.Equal(t, primitive.ConsistencyLevelLocalOne, *cl) +} + +func TestValidate_RejectsInvalidTargetConsistencyLevel(t *testing.T) { + defer clearAllEnvVars() + + clearAllEnvVars() + setOriginCredentialsEnvVars() + setTargetCredentialsEnvVars() + setOriginContactPointsAndPortEnvVars() + setTargetContactPointsAndPortEnvVars() + + setEnvVar("ZDM_TARGET_CONSISTENCY_LEVEL", "SERIAL") + + _, err := New().LoadConfig("") + require.Error(t, err) + require.Contains(t, err.Error(), "ZDM_TARGET_CONSISTENCY_LEVEL") +} + +func TestValidate_AcceptsEmptyTargetConsistencyLevel(t *testing.T) { + defer clearAllEnvVars() + + clearAllEnvVars() + setOriginCredentialsEnvVars() + setTargetCredentialsEnvVars() + setOriginContactPointsAndPortEnvVars() + setTargetContactPointsAndPortEnvVars() + + // do NOT set ZDM_TARGET_CONSISTENCY_LEVEL + + conf, err := New().LoadConfig("") + require.NoError(t, err) + require.Empty(t, conf.TargetConsistencyLevel) +} + +func TestValidate_AcceptsValidTargetConsistencyLevel(t *testing.T) { + defer clearAllEnvVars() + + clearAllEnvVars() + setOriginCredentialsEnvVars() + setTargetCredentialsEnvVars() + setOriginContactPointsAndPortEnvVars() + setTargetContactPointsAndPortEnvVars() + + setEnvVar("ZDM_TARGET_CONSISTENCY_LEVEL", "LOCAL_ONE") + + conf, err := New().LoadConfig("") + require.NoError(t, err) + require.Equal(t, "LOCAL_ONE", conf.TargetConsistencyLevel) +} + +func TestTargetConsistencyLevel_YamlConfig(t *testing.T) { + yamlContent := ` +origin_contact_points: "origin.hostname.com" +origin_port: 9042 +origin_username: "user" +origin_password: "pass" +target_contact_points: "target.hostname.com" +target_port: 9042 +target_username: "user" +target_password: "pass" +target_consistency_level: "LOCAL_ONE" +` + + f, err := createConfigFile(yamlContent) + require.NoError(t, err) + defer removeConfigFile(f) + + conf, err := New().LoadConfig(f.Name()) + require.NoError(t, err) + require.Equal(t, "LOCAL_ONE", conf.TargetConsistencyLevel) + + cl, err := conf.ParseTargetConsistencyLevel() + require.NoError(t, err) + require.NotNil(t, cl) + require.Equal(t, primitive.ConsistencyLevelLocalOne, *cl) +} diff --git a/proxy/pkg/zdmproxy/clienthandler.go b/proxy/pkg/zdmproxy/clienthandler.go index 26c4a654..0af1a0b7 100644 --- a/proxy/pkg/zdmproxy/clienthandler.go +++ b/proxy/pkg/zdmproxy/clienthandler.go @@ -120,6 +120,10 @@ type ClientHandler struct { timeUuidGenerator TimeUuidGenerator rateLimiters *RateLimiters + // targetConsistencyLevel is the optional override for target-side consistency. + // nil means disabled (default). When non-nil, all requests to the target cluster use this CL. + targetConsistencyLevel *primitive.ConsistencyLevel + // not used atm but should be used when a protocol error occurs after #68 has been addressed clientHandlerShutdownRequestCancelFn context.CancelFunc @@ -278,6 +282,13 @@ func NewClientHandler( forwardAuthToTarget, targetCredsOnClientRequest := forwardAuthToTarget( originControlConn, targetControlConn, conf.ForwardClientCredentialsToOrigin) + // Parse target consistency level override (nil = disabled) + targetWriteCL, err := conf.ParseTargetConsistencyLevel() + if err != nil { + clientHandlerCancelFunc() + return nil, fmt.Errorf("failed to parse target consistency level: %w", err) + } + return &ClientHandler{ clientConnector: NewClientConnector( clientTcpConn, @@ -345,6 +356,7 @@ func NewClientHandler( clientHandlerShutdownRequestCancelFn: clientHandlerShutdownRequestCancelFn, clientHandlerShutdownRequestContext: clientHandlerShutdownRequestContext, compression: compression, + targetConsistencyLevel: targetWriteCL, }, nil } @@ -1528,6 +1540,16 @@ func (ch *ClientHandler) executeRequest( return err } + // Override target consistency level for requests where origin and target share the same frame. + // EXECUTE and BATCH are handled in their respective handlers where the deep copy already exists. + targetReceivesRequest := fwdDecision == forwardToBoth || fwdDecision == forwardToTarget + if ch.targetConsistencyLevel != nil && targetReceivesRequest && originRequest == targetRequest { + targetRequest, err = ch.overrideTargetConsistency(frameContext) + if err != nil { + return fmt.Errorf("could not override target consistency: %w", err) + } + } + if fwdDecision == forwardToNone { if clientResponse == nil { return fmt.Errorf("forwardDecision is NONE but client response is nil") @@ -1654,6 +1676,35 @@ func (ch *ClientHandler) handleRequestSendFailure(err error, frameContext *frame } } +// overrideTargetConsistency creates a deep copy of the frame with the consistency level overridden +// for target-side requests. This is called for generic Query messages (non-prepared, non-batch) where +// the origin and target requests share the same raw frame pointer. +func (ch *ClientHandler) overrideTargetConsistency(frameContext *frameDecodeContext) (*frame.RawFrame, error) { + decodedFrame, err := frameContext.GetOrDecodeFrame() + if err != nil { + return nil, fmt.Errorf("could not decode frame for target write CL override: %w", err) + } + + targetFrame := decodedFrame.DeepCopy() + + switch typedMsg := targetFrame.Body.Message.(type) { + case *message.Query: + if typedMsg.Options == nil { + typedMsg.Options = &message.QueryOptions{} + } + typedMsg.Options.Consistency = *ch.targetConsistencyLevel + default: + // For messages without a consistency field (e.g., PREPARE, STARTUP), return unchanged. + return frameContext.GetRawFrame(), nil + } + + rawFrame, err := ch.getCodec().ConvertToRawFrame(targetFrame) + if err != nil { + return nil, fmt.Errorf("could not re-encode frame after target write CL override: %w", err) + } + return rawFrame, nil +} + func (ch *ClientHandler) handleInterceptedRequest( requestInfo RequestInfo, frameContext *frameDecodeContext, currentKeyspace string) (*frame.RawFrame, error) { @@ -1842,6 +1893,13 @@ func (ch *ClientHandler) handleExecuteRequest( log.Tracef("Replacing prepared ID %s with %s for target cluster.", hex.EncodeToString(originalQueryId), hex.EncodeToString(newTargetExecuteMsg.QueryId)) + if ch.targetConsistencyLevel != nil { + if newTargetExecuteMsg.Options == nil { + newTargetExecuteMsg.Options = &message.QueryOptions{} + } + newTargetExecuteMsg.Options.Consistency = *ch.targetConsistencyLevel + } + newTargetRequestRaw, err := ch.getCodec().ConvertToRawFrame(newTargetRequest) if err != nil { return nil, nil, nil, fmt.Errorf("could not convert target EXECUTE response to raw frame: %w", err) @@ -1901,6 +1959,10 @@ func (ch *ClientHandler) handleBatchRequest( hex.EncodeToString(originalQueryId), hex.EncodeToString(preparedData.GetTargetPreparedId())) } + if ch.targetConsistencyLevel != nil { + newTargetBatchMsg.Consistency = *ch.targetConsistencyLevel + } + if newOriginRequest != nil { originBatchRequest, err := ch.getCodec().ConvertToRawFrame(newOriginRequest) if err != nil { From b26a6c338ec1376433d78c9c5fe85e9f32108d4f Mon Sep 17 00:00:00 2001 From: Johnny Miller <163300+millerjp@users.noreply.github.com> Date: Fri, 3 Apr 2026 12:15:59 +0200 Subject: [PATCH 2/4] Add target consistency level override to README quick start --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 70fa60fd..898ad37d 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ ZDM_PROXY_LISTEN_ADDRESS=127.0.0.1 ZDM_PRIMARY_CLUSTER=ORIGIN ZDM_READ_MODE=PRIMARY_ONLY ZDM_LOG_LEVEL=INFO +# ZDM_TARGET_CONSISTENCY_LEVEL=LOCAL_ONE #optional, overrides CL on target during migration ``` The environment variables (or YAM configuration file) must be set for the proxy to work. From fc58352656dba03be6b0458f46f6c5ddf3e3f054 Mon Sep 17 00:00:00 2001 From: Johnny Miller <163300+millerjp@users.noreply.github.com> Date: Fri, 3 Apr 2026 12:29:37 +0200 Subject: [PATCH 3/4] Warn at startup if configured cluster users are superusers Query system_auth.roles on both origin and target control connections at startup to check if the configured user is a superuser. If so, log a WARN explaining that superuser authentication in Cassandra requires QUORUM consistency internally, which increases the risk of auth failures during node instability. The check is best-effort: if auth is not enabled, or the query fails for any reason (e.g. permission denied, Astra-specific behavior, table not present), it is silently skipped. This ensures no impact on platforms where system_auth.roles may not be accessible. Verified against Cassandra 5.0.6 with PasswordAuthenticator enabled: - superuser 'cassandra' triggers WARN on both ORIGIN and TARGET - non-superuser 'app_user' produces no warning (query fails silently because non-superusers cannot read system_auth.roles) - auth-disabled clusters produce no warning (check skipped) --- docs/assets/zdm-config-reference.yml | 10 ++++-- proxy/pkg/zdmproxy/controlconn.go | 47 ++++++++++++++++++++++++++++ proxy/pkg/zdmproxy/proxy.go | 2 ++ 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/docs/assets/zdm-config-reference.yml b/docs/assets/zdm-config-reference.yml index 6b771f4a..78a34b39 100644 --- a/docs/assets/zdm-config-reference.yml +++ b/docs/assets/zdm-config-reference.yml @@ -55,7 +55,10 @@ origin_port: 9042 # Local data center for origin cluster. # origin_local_datacenter: -# Origin cluster username. +# Origin cluster username. Avoid using a superuser account for application workloads. +# Superuser authentication in Cassandra requires QUORUM consistency internally, which +# increases the risk of auth failures during node instability. The proxy will log a +# warning at startup if the configured user is a superuser. origin_username: user1 # Origin cluster password. @@ -89,7 +92,10 @@ target_contact_points: 127.0.0.2 # Port used when connecting to nodes from target cluster. target_port: 9042 -# Target cluster username. +# Target cluster username. Avoid using a superuser account for application workloads. +# Superuser authentication in Cassandra requires QUORUM consistency internally, which +# increases the risk of auth failures during node instability. The proxy will log a +# warning at startup if the configured user is a superuser. target_username: user2 # Target cluster password. diff --git a/proxy/pkg/zdmproxy/controlconn.go b/proxy/pkg/zdmproxy/controlconn.go index 37edb54b..2d0b2632 100644 --- a/proxy/pkg/zdmproxy/controlconn.go +++ b/proxy/pkg/zdmproxy/controlconn.go @@ -235,6 +235,53 @@ func (cc *ControlConn) IsAuthEnabled() (bool, error) { "the control connection has not been initialized") } +// CheckSuperUserAndWarn queries system_auth.roles to determine if the configured user is a superuser. +// If the user is a superuser, a warning is logged advising against this practice. +// Any errors (e.g. table doesn't exist, permission denied, Astra-specific behavior) are silently +// ignored — the check is best-effort only. +func (cc *ControlConn) CheckSuperUserAndWarn() { + conn, _ := cc.GetConnAndContactPoint() + if conn == nil { + return + } + + authEnabled, err := cc.IsAuthEnabled() + if err != nil || !authEnabled { + return + } + + clusterType := cc.connConfig.GetClusterType() + query := fmt.Sprintf("SELECT is_superuser FROM system_auth.roles WHERE role = '%s'", cc.username) + result, err := conn.Query(query, GetDefaultGenericTypeCodec(), cc.context) + if err != nil { + log.Debugf("[%v] Could not query system_auth.roles to check superuser status (this is expected on some platforms): %v", + clusterType, err) + return + } + + if result == nil || len(result.Rows) == 0 { + return + } + + val, exists := result.Rows[0].GetByColumn("is_superuser") + if !exists || val == nil { + return + } + + isSuperUser, ok := val.(bool) + if !ok { + return + } + + if isSuperUser { + log.Warnf("[%v] The configured user '%s' is a superuser. This is not recommended for application "+ + "workloads because superuser authentication requires QUORUM consistency internally in Cassandra, "+ + "which increases the risk of authentication failures during node instability. Consider using a "+ + "regular user with only the necessary permissions for the keyspaces being migrated.", + clusterType, cc.username) + } +} + func (cc *ControlConn) IncrementFailureCounter() { cc.counterLock.Lock() defer cc.counterLock.Unlock() diff --git a/proxy/pkg/zdmproxy/proxy.go b/proxy/pkg/zdmproxy/proxy.go index ffd4f1c2..574982fb 100644 --- a/proxy/pkg/zdmproxy/proxy.go +++ b/proxy/pkg/zdmproxy/proxy.go @@ -264,6 +264,7 @@ func (p *ZdmProxy) initializeControlConnections(ctx context.Context) error { if err := originControlConn.Start(p.controlConnShutdownWg, ctx); err != nil { return fmt.Errorf("failed to initialize origin control connection: %w", err) } + originControlConn.CheckSuperUserAndWarn() p.lock.Lock() p.originControlConn = originControlConn @@ -276,6 +277,7 @@ func (p *ZdmProxy) initializeControlConnections(ctx context.Context) error { if err := targetControlConn.Start(p.controlConnShutdownWg, ctx); err != nil { return fmt.Errorf("failed to initialize target control connection: %w", err) } + targetControlConn.CheckSuperUserAndWarn() p.lock.Lock() p.targetControlConn = targetControlConn From c0a788edf2a4aa678185fb3452467669889ef7df Mon Sep 17 00:00:00 2001 From: Johnny Miller <163300+millerjp@users.noreply.github.com> Date: Sat, 4 Apr 2026 05:45:42 +0200 Subject: [PATCH 4/4] Add CCM integration test for target consistency level override Verifies via system_traces.sessions on real Cassandra clusters that: - Inline INSERT at QUORUM: origin trace shows QUORUM (unchanged), target trace shows LOCAL_ONE (overridden) - Prepared INSERT at QUORUM: same verification - Batch INSERT at QUORUM: same verification Skipped on Cassandra < 3.0 (system_traces parameters map format differs in older versions). --- .../target_consistency_override_ccm_test.go | 188 ++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100644 integration-tests/target_consistency_override_ccm_test.go diff --git a/integration-tests/target_consistency_override_ccm_test.go b/integration-tests/target_consistency_override_ccm_test.go new file mode 100644 index 00000000..0a900fec --- /dev/null +++ b/integration-tests/target_consistency_override_ccm_test.go @@ -0,0 +1,188 @@ +package integration_tests + +import ( + "fmt" + "strings" + "testing" + "time" + + gocql "github.com/apache/cassandra-gocql-driver/v2" + "github.com/stretchr/testify/require" + + "github.com/datastax/zdm-proxy/integration-tests/env" + "github.com/datastax/zdm-proxy/integration-tests/setup" + "github.com/datastax/zdm-proxy/integration-tests/utils" +) + +// TestTargetConsistencyOverrideCCM verifies that the ZDM_TARGET_CONSISTENCY_LEVEL config +// overrides the consistency level on the target cluster while preserving the original +// client-requested CL on origin. Verified via Cassandra system_traces. +// +// Test matrix: +// - Inline INSERT at QUORUM → origin should see QUORUM, target should see LOCAL_ONE +// - Prepared INSERT at QUORUM → same verification via EXECUTE trace +// - Batch INSERT at QUORUM → same verification via BATCH trace +func TestTargetConsistencyOverrideCCM(t *testing.T) { + if env.CompareServerVersion("3.0.0") < 0 { + t.Skip("Skipping consistency override trace test: system_traces.sessions parameters map not available before Cassandra 3.0") + } + + originCluster, targetCluster, err := SetupOrGetGlobalCcmClusters(t) + require.Nil(t, err) + + originSession := originCluster.GetSession() + targetSession := targetCluster.GetSession() + + // Create a proxy with target consistency override set to LOCAL_ONE + conf := setup.NewTestConfig(originCluster.GetInitialContactPoint(), targetCluster.GetInitialContactPoint()) + conf.TargetConsistencyLevel = "LOCAL_ONE" + + proxyInstance, err := setup.NewProxyInstanceWithConfig(conf) + require.Nil(t, err) + defer proxyInstance.Shutdown() + + // Ensure system_traces has RF=1 on both single-node CCM clusters + // and create the test table + for _, s := range []*gocql.Session{originSession, targetSession} { + s.Query("ALTER KEYSPACE system_traces WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}").Exec() + s.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.cl_test (id uuid PRIMARY KEY, val text)", setup.TestKeyspace)).Exec() + } + + // Connect through the proxy + proxy, err := utils.ConnectToCluster("127.0.0.1", "", "", conf.ProxyListenPort) + require.Nil(t, err) + defer proxy.Close() + + // ================================================================ + // TEST 1: Inline INSERT + // Client sends QUORUM, origin should see QUORUM, target should see LOCAL_ONE + // ================================================================ + t.Run("inline_insert", func(t *testing.T) { + clearTraces(originSession, targetSession) + + q := proxy.Query(fmt.Sprintf( + "INSERT INTO %s.cl_test (id, val) VALUES (d1b05da0-8c20-11ea-9fc6-6d2c86545d91, 'inline_cl_test')", + setup.TestKeyspace)) + q.Consistency(gocql.Quorum) + q.Trace(noopTracer{}) + err = q.Exec() + require.Nil(t, err, "inline INSERT through proxy failed") + + originCL := findTraceCL(t, originSession, "inline_cl_test") + require.Equal(t, "QUORUM", originCL, "origin should receive client-requested QUORUM") + + targetCL := findTraceCL(t, targetSession, "inline_cl_test") + require.Equal(t, "LOCAL_ONE", targetCL, "target should receive overridden LOCAL_ONE") + }) + + // ================================================================ + // TEST 2: Prepared INSERT + // gocql auto-prepares when bind params are used. The EXECUTE trace + // contains the original query string (e.g. "INSERT INTO ks.cl_test (id, val) VALUES (?, ?)") + // so we search for the table name as the marker. + // ================================================================ + t.Run("prepared_insert", func(t *testing.T) { + clearTraces(originSession, targetSession) + + q := proxy.Query(fmt.Sprintf( + "INSERT INTO %s.cl_test (id, val) VALUES (?, ?)", setup.TestKeyspace)) + q.Bind("eed574b0-8c20-11ea-9fc6-6d2c86545d91", "prepared_cl_test") + q.Consistency(gocql.Quorum) + q.Trace(noopTracer{}) + err = q.Exec() + require.Nil(t, err, "prepared INSERT through proxy failed") + + // For prepared statements, the trace query field contains the CQL with ? markers, + // not the bound values. Search for the table name instead. + originCL := findTraceCL(t, originSession, "cl_test") + require.Equal(t, "QUORUM", originCL, "origin should receive client-requested QUORUM for prepared statement") + + targetCL := findTraceCL(t, targetSession, "cl_test") + require.Equal(t, "LOCAL_ONE", targetCL, "target should receive overridden LOCAL_ONE for prepared statement") + }) + + // ================================================================ + // TEST 3: Batch INSERT + // Batch traces don't include the query text in the parameters map, + // so we check the first trace found after clearing. + // ================================================================ + t.Run("batch_insert", func(t *testing.T) { + clearTraces(originSession, targetSession) + + batch := proxy.NewBatch(gocql.LoggedBatch) + batch.Query(fmt.Sprintf( + "INSERT INTO %s.cl_test (id, val) VALUES (cf0f4cf0-8c20-11ea-9fc6-6d2c86545d91, 'batch_cl_test')", + setup.TestKeyspace)) + batch.SetConsistency(gocql.Quorum) + batch.Trace(noopTracer{}) + err = proxy.ExecuteBatch(batch) + require.Nil(t, err, "batch INSERT through proxy failed") + + originCL := findAnyTraceCL(t, originSession) + require.Equal(t, "QUORUM", originCL, "origin should receive client-requested QUORUM for batch") + + targetCL := findAnyTraceCL(t, targetSession) + require.Equal(t, "LOCAL_ONE", targetCL, "target should receive overridden LOCAL_ONE for batch") + }) +} + +// clearTraces truncates system_traces.sessions on both clusters. +func clearTraces(origin *gocql.Session, target *gocql.Session) { + origin.Query("TRUNCATE system_traces.sessions").Consistency(gocql.One).Exec() + origin.Query("TRUNCATE system_traces.events").Consistency(gocql.One).Exec() + target.Query("TRUNCATE system_traces.sessions").Consistency(gocql.One).Exec() + target.Query("TRUNCATE system_traces.events").Consistency(gocql.One).Exec() +} + +// findTraceCL searches system_traces.sessions for a trace whose query parameter contains +// the given marker, and returns the consistency_level. Retries for up to 10 seconds. +func findTraceCL(t *testing.T, session *gocql.Session, marker string) string { + t.Helper() + for attempt := 0; attempt < 20; attempt++ { + q := session.Query("SELECT parameters FROM system_traces.sessions") + q.Consistency(gocql.One) + iter := q.Iter() + var params map[string]string + for iter.Scan(¶ms) { + if query, ok := params["query"]; ok && strings.Contains(query, marker) { + if cl, ok := params["consistency_level"]; ok { + iter.Close() + return cl + } + } + } + iter.Close() + time.Sleep(500 * time.Millisecond) + } + t.Fatalf("no trace found containing marker %q after 10s of retries", marker) + return "" +} + +// findAnyTraceCL returns the consistency_level from the first trace session found. +// Retries for up to 10 seconds. +func findAnyTraceCL(t *testing.T, session *gocql.Session) string { + t.Helper() + for attempt := 0; attempt < 20; attempt++ { + q := session.Query("SELECT parameters FROM system_traces.sessions") + q.Consistency(gocql.One) + iter := q.Iter() + var params map[string]string + for iter.Scan(¶ms) { + if cl, ok := params["consistency_level"]; ok { + iter.Close() + return cl + } + } + iter.Close() + time.Sleep(500 * time.Millisecond) + } + t.Fatalf("no trace sessions found after 10s of retries") + return "" +} + +// noopTracer enables the tracing flag on the CQL protocol frame without +// fetching trace results. This avoids trace-fetch queries going through +// the proxy and interfering with the test. +type noopTracer struct{} + +func (noopTracer) Trace(_ []byte) {}