From 49c1cec0aefa7521b0a829d9858fc523f6aeab7f Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Sat, 16 May 2026 16:34:53 -0700 Subject: [PATCH 1/4] PHOENIX-7787 Make CCF HAGroupStore ZK Updates backward compatible with existing ZK based client --- ...taleClusterRoleRecordVersionException.java | 34 ++ .../phoenix/jdbc/ClusterRoleRecord.java | 68 ++- .../phoenix/jdbc/HAGroupStoreClient.java | 256 ++++++++- .../apache/phoenix/jdbc/PhoenixHAAdmin.java | 95 +++- .../apache/phoenix/query/QueryServices.java | 10 + .../phoenix/query/QueryServicesOptions.java | 14 +- .../apache/phoenix/schema/MetaDataClient.java | 6 +- .../phoenix/jdbc/HAGroupStoreClientIT.java | 511 ++++++++++++++++++ .../phoenix/jdbc/ClusterRoleRecordTest.java | 104 ++++ .../json/test_role_record_explicit_rpc.json | 10 + .../json/test_role_record_explicit_zk.json | 10 + .../test_role_record_no_registry_type.json | 9 + 12 files changed, 1092 insertions(+), 35 deletions(-) create mode 100644 phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java create mode 100644 phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json create mode 100644 phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json create mode 100644 phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java new file mode 100644 index 00000000000..0b20a53a7bc --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.exception; + +/** + * CAS write to the legacy {@code /phoenix/ha} CRR znode failed (BadVersion or NodeExists); the + * caller can re-read and retry if needed. Analog of {@link StaleHAGroupStoreRecordVersionException}. + */ +public class StaleClusterRoleRecordVersionException extends Exception { + private static final long serialVersionUID = 1L; + + public StaleClusterRoleRecordVersionException(String msg) { + super(msg); + } + + public StaleClusterRoleRecordVersionException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java index 0ba6b312d7b..327db3eaa5e 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Objects; import java.util.Optional; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -126,31 +127,50 @@ public enum RegistryType { private final long version; /** - * To handle backward compatibility with old ClusterRoleRecords which had zk1 and zk2 as keys for - * zk urls, This constructor is only being used {@link ClusterRoleRecord#fromJson} when we - * deserialize Cluster Role Record read from ZooKeeper ZNode. If CRR is in old format we will read - * zk1 and zk2 and url1 and url2 will be null and if it is in new format zk1 and zk2 will be null - * in both cases final url is being stored in url1 and url2 url will be stored in normalized forms - * which looks like zk1\\:port1,zk2\\:port2,zk3\\:port3, zk4\\:port4,zk5\\:port5::znode or - * master1\\:port1,master2\\:port2,master3\\:port3, master4\\:port4,master5\\:port5 + * Convenience constructor: defaults {@code registryType} to + * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). Use the explicit overload below to write + * {@link RegistryType#ZK} records for the legacy {@code /phoenix/ha} znode. * @param haGroupName HighAvailability Group name / CRR name - * @param policy Policy used by give CRR - * @param url1 ZK/HMaster url based on registry type for first cluster - * @param role1 {@link ClusterRole} which describes the current state of first cluster - * @param url2 ZK/HMaster url based on registry type for second cluster - * @param role2 {@link ClusterRole} which describes the current state of second cluster - * @param version version of a given CRR + * @param policy Policy used by the given CRR + * @param url1 ZK/HMaster url for the first cluster + * @param role1 {@link ClusterRole} describing the current state of the first cluster + * @param url2 ZK/HMaster url for the second cluster + * @param role2 {@link ClusterRole} describing the current state of the second cluster + * @param version monotonic version of this CRR + */ + public ClusterRoleRecord(String haGroupName, HighAvailabilityPolicy policy, String url1, + ClusterRole role1, String url2, ClusterRole role2, long version) { + this(haGroupName, policy, DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE, url1, role1, url2, role2, + version); + } + + /** + * Canonical constructor; also the {@code @JsonCreator} entry point so the persisted + * {@code registryType} round-trips correctly. Records persisted before {@code registryType} was + * added as a JSON field pass {@code null} here and default to + * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). URLs are normalized to the + * {@code registryType}-specific canonical form for accurate comparisons; the resulting + * {@code url1}/{@code url2} are stored as {@code zk1\:port1,zk2\:port2,...::znode} for ZK or + * {@code master1\:port1,master2\:port2,...} for RPC/MASTER. + * @param haGroupName HighAvailability Group name / CRR name + * @param policy Policy used by the given CRR + * @param registryType {@link RegistryType} for URL normalization; {@code null} defaults to + * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC) + * @param url1 ZK/HMaster url for the first cluster + * @param role1 {@link ClusterRole} describing the current state of the first cluster + * @param url2 ZK/HMaster url for the second cluster + * @param role2 {@link ClusterRole} describing the current state of the second cluster + * @param version monotonic version of this CRR */ @JsonCreator public ClusterRoleRecord(@JsonProperty("haGroupName") String haGroupName, - @JsonProperty("policy") HighAvailabilityPolicy policy, @JsonProperty("url1") String url1, + @JsonProperty("policy") HighAvailabilityPolicy policy, + @JsonProperty("registryType") RegistryType registryType, @JsonProperty("url1") String url1, @JsonProperty("role1") ClusterRole role1, @JsonProperty("url2") String url2, @JsonProperty("role2") ClusterRole role2, @JsonProperty("version") long version) { this.haGroupName = haGroupName; this.policy = policy; - - // Default registry type is RPC from Consistent Cluster Failover onwards - this.registryType = DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE; + this.registryType = registryType != null ? registryType : DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE; // Do we really need to normalize here ? // We are normalizing to have urls in specific formats for each registryType for getting @@ -221,6 +241,20 @@ public boolean hasSameInfo(ClusterRoleRecord other) { return haGroupName.equals(other.haGroupName) && policy.equals(other.policy); } + /** + * Equality on the six identity/role fields ({@code haGroupName, policy, url1, url2, role1, + * role2}); ignores {@code version} (always bumps) and {@code registryType} (avoids RPC->ZK + * thrash). Returns {@code false} if {@code other} is {@code null}. + */ + public boolean isLogicallyEqualIgnoringVersionAndRegistry(ClusterRoleRecord other) { + if (other == null) { + return false; + } + return Objects.equals(haGroupName, other.haGroupName) && Objects.equals(policy, other.policy) + && Objects.equals(url1, other.url1) && Objects.equals(url2, other.url2) + && role1 == other.role1 && role2 == other.role2; + } + /** Returns true if CRR has any url in UNKNOWN role/state. */ public boolean hasUnknownRole() { return role1 == ClusterRole.UNKNOWN || role2 == ClusterRole.UNKNOWN; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java index fe299d14fbb..9f9afbd3dab 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_1; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_2; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_1; @@ -34,7 +35,11 @@ import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_SYNC_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK; @@ -62,11 +67,13 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.exception.InvalidClusterRoleTransitionException; +import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException; import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException; import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; import org.apache.phoenix.jdbc.ClusterRoleRecord.RegistryType; @@ -80,6 +87,7 @@ import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.MoreExecutors; /** * Main implementation of HAGroupStoreClient with peer support. Write-through cache for HAGroupStore @@ -98,8 +106,13 @@ public class HAGroupStoreClient implements Closeable { public static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1; // Maximum jitter in seconds for sync job start time (10 seconds) private static final long SYNC_JOB_MAX_JITTER_SECONDS = 10; + // Exclusive upper bound for initial-delay jitter on the periodic reconciler (0..30s). + private static final long LEGACY_CRR_SYNC_JOB_MAX_JITTER_SECONDS = 31; private PhoenixHAAdmin phoenixHaAdmin; private PhoenixHAAdmin peerPhoenixHaAdmin; + // Admin + NodeCache on /phoenix/ha; null when feature disabled. + private volatile PhoenixHAAdmin legacyHaAdmin; + private volatile NodeCache legacyCrrNodeCache; private static final Logger LOGGER = LoggerFactory.getLogger(HAGroupStoreClient.class); // Map of > private static final Map> instances = @@ -132,6 +145,12 @@ public class HAGroupStoreClient implements Closeable { CopyOnWriteArraySet> targetStateSubscribers = new ConcurrentHashMap<>(); // Scheduled executor for periodic sync job private ScheduledExecutorService syncExecutor; + // Legacy CRR sync state. All invocations of syncLegacyCRRIfRoleChanged go through the + // single-threaded legacyCrrSyncExecutor (initial sync, periodic, and event-driven), so + // calls are naturally serialized; close races are handled via local snapshots of mutable + // fields inside the method. No additional lock is needed. + private final boolean legacyCrrSyncEnabled; + private volatile ScheduledExecutorService legacyCrrSyncExecutor; public static HAGroupStoreClient getInstance(Configuration conf, String haGroupName) throws SQLException { @@ -224,6 +243,8 @@ public static List getHAGroupNames(String zkUrl) throws SQLException { conf.getLong(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT) * ZK_SESSION_TIMEOUT_MULTIPLIER); this.rotationTimeMs = conf.getLong(QueryServices.REPLICATION_LOG_ROTATION_TIME_MS_KEY, QueryServicesOptions.DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS); + this.legacyCrrSyncEnabled = conf.getBoolean(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED, + DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED); // Custom Event Listener this.peerCustomPathChildrenCacheListener = peerPathChildrenCacheListener; try { @@ -246,6 +267,12 @@ public static List getHAGroupNames(String zkUrl) throws SQLException { // Start periodic sync job startPeriodicSyncJob(); + // Opt-in legacy /phoenix/ha sync. Setup failures propagate so the client is marked + // unhealthy rather than silently dropping the legacy znode out of sync. + if (legacyCrrSyncEnabled && this.isHealthy) { + setupLegacyCrrSync(); + } + } catch (Exception e) { this.isHealthy = false; close(); @@ -886,9 +913,16 @@ private PathChildrenCacheListener createCacheListener(CountDownLatch latch, if (cacheType == ClusterType.LOCAL) { maybeInitializePeerPathChildrenCache(); } + // Offload the legacy CRR sync (it does ZK + JDBC I/O) so we don't block + // Curator's per-namespace event dispatcher. + ScheduledExecutorService syncExec = legacyCrrSyncExecutor; + if (syncExec != null) { + syncExec.execute(this::syncLegacyCRRIfRoleChanged); + } } break; case CHILD_REMOVED: + // No-op: the legacy /phoenix/ha znode is never deleted by this client. break; case INITIALIZED: latch.countDown(); @@ -983,32 +1017,65 @@ private void closePeerConnection() { /** * Shuts down the periodic sync executor gracefully. */ + /** + * Remove this instance from the static {@link #instances} map. Idempotent. Uses value-based + * remove so that, if a concurrent {@link #getInstanceForZkUrl} has already swapped in a fresh + * replacement, the replacement is preserved. + */ + private void deregisterFromInstances() { + final String key = (this.zkUrl != null) ? this.zkUrl : getLocalZkUrl(this.conf); + if (key == null) { + return; + } + final ConcurrentHashMap bucket = instances.get(key); + if (bucket == null) { + return; + } + bucket.remove(this.haGroupName, this); + instances.computeIfPresent(key, (k, v) -> v.isEmpty() ? null : v); + } + private void shutdownSyncExecutor() { if (syncExecutor != null) { - syncExecutor.shutdown(); - try { - if (!syncExecutor.awaitTermination(5, TimeUnit.SECONDS)) { - syncExecutor.shutdownNow(); - } - } catch (InterruptedException e) { - syncExecutor.shutdownNow(); - Thread.currentThread().interrupt(); - } + MoreExecutors.shutdownAndAwaitTermination(syncExecutor, 5, TimeUnit.SECONDS); syncExecutor = null; } } + private void shutdownLegacyCrrSyncExecutor() { + if (legacyCrrSyncExecutor != null) { + MoreExecutors.shutdownAndAwaitTermination(legacyCrrSyncExecutor, 5, TimeUnit.SECONDS); + legacyCrrSyncExecutor = null; + } + } + @Override public void close() { try { LOGGER.info("Closing HAGroupStoreClient"); - // Shutdown sync executor + // Mark unhealthy and deregister from the static cache first so any concurrent + // getInstanceForZkUrl() does not hand out a half-closed instance. + isHealthy = false; + deregisterFromInstances(); + // Executors -> caches -> admins. Null-before-close on legacy resources so a racing + // listener sees either a live or null reference, never half-closed. shutdownSyncExecutor(); + shutdownLegacyCrrSyncExecutor(); if (pathChildrenCache != null) { pathChildrenCache.close(); pathChildrenCache = null; } closePeerConnection(); + NodeCache nodeCache = this.legacyCrrNodeCache; + this.legacyCrrNodeCache = null; + if (nodeCache != null) { + nodeCache.close(); + } + PhoenixHAAdmin admin = this.legacyHaAdmin; + this.legacyHaAdmin = null; + if (admin != null) { + admin.close(); + } LOGGER.info("Closed HAGroupStoreClient"); } catch (IOException e) { LOGGER.error("Exception closing HAGroupStoreClient", e); @@ -1047,6 +1114,175 @@ private long validateTransitionAndGetWaitTime(HAGroupStoreRecord.HAGroupState cu return Math.max(0, remainingTime); } + // ========== Legacy /phoenix/ha CRR Sync ========== + + /** + * Derives the combined CRR from local + peer records and CAS-writes it to {@code /phoenix/ha}. + * CAS losses are logged and skipped; the next consistentHA cache event or periodic cycle + * reconverges. + */ + private void syncLegacyCRRIfRoleChanged() { + if (!legacyCrrSyncEnabled || !isHealthy) { + return; + } + // Snapshot mutable resources up front so a concurrent close() can't null them mid-method + // and trigger NPEs / writes through a torn-down Curator client. + final PhoenixHAAdmin admin = this.legacyHaAdmin; + final NodeCache cache = this.legacyCrrNodeCache; + if (admin == null || cache == null) { + return; + } + try { + HAGroupStoreRecord local = getHAGroupStoreRecord(); + if (local == null) { + LOGGER.debug("Skipping legacy CRR sync for HA group {}: no local consistentHA record", + haGroupName); + return; + } + // Wait for peer URL before building the desired CRR (ctor NPEs on null url2). + if (StringUtils.isBlank(local.getPeerZKUrl())) { + LOGGER.debug("Skipping legacy CRR sync for HA group {}: peer ZK URL is blank", + haGroupName); + return; + } + HAGroupStoreRecord peer = getHAGroupStoreRecordFromPeer(); + // NodeCache is eventually consistent; on apparent absence, fall back to an authoritative + // ZK read so the equality check and CAS both see consistent state. + Pair snapshot = readLegacyCrrSnapshot(cache); + if (snapshot.getRight() == null) { + snapshot = admin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); + } + ClusterRoleRecord existing = snapshot.getLeft(); + Stat existingStat = snapshot.getRight(); + if (!shouldWriteLegacyCrr(existing)) { + return; + } + ClusterRoleRecord desired = buildDesiredLegacyCrr(local, peer, existing); + if (desired.isLogicallyEqualIgnoringVersionAndRegistry(existing)) { + LOGGER.debug("Legacy CRR for HA group {} already up to date at version {}", haGroupName, + existing.getVersion()); + return; + } + try { + if (existingStat == null) { + admin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired, + PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, /* ignored */ 0); + } else { + admin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, existingStat.getVersion()); + } + LOGGER.info("Synced legacy CRR for HA group {} (version {} -> {})", haGroupName, + existing != null ? existing.getVersion() : -1L, desired.getVersion()); + } catch (StaleClusterRoleRecordVersionException stale) { + // CAS lost; next event/periodic cycle reconverges. + LOGGER.info("Legacy CRR CAS lost for HA group {} at expected stat version {}", + haGroupName, existingStat != null ? existingStat.getVersion() : -1); + } + } catch (Exception e) { + LOGGER.warn( + "Legacy CRR sync failed for HA group {}; will be retried by next event/periodic cycle", + haGroupName, e); + } + } + + /** + * Policy gate before issuing a CAS write to the legacy CRR. Returns {@code false} when the + * existing record must not be overwritten by this client. + */ + private boolean shouldWriteLegacyCrr(ClusterRoleRecord existing) { + // Refuse to overwrite a non-ZK (admin-managed RPC) legacy CRR; live readers use its + // registryType to build connection strings, so swapping form would break them. + if (existing != null && existing.getRegistryType() != RegistryType.ZK) { + LOGGER.warn("Skipping legacy CRR sync for HA group {}: existing registryType={} " + + "(requires admin migration to ZK form)", haGroupName, existing.getRegistryType()); + return false; + } + return true; + } + + /** + * Builds the desired legacy CRR, always stamped {@link RegistryType#ZK}. When the local + * client's peer view is unavailable, preserves the {@code existing} record's {@code role2} + * rather than downgrading it to {@link ClusterRole#UNKNOWN} — another RS with peer visibility + * would otherwise keep flipping it back, and the equality check naturally short-circuits + * when no other field changed. + */ + private ClusterRoleRecord buildDesiredLegacyCrr(HAGroupStoreRecord local, HAGroupStoreRecord peer, + ClusterRoleRecord existing) { + final ClusterRole role2; + if (peer != null) { + role2 = peer.getClusterRole(); + } else if (existing != null) { + role2 = existing.getRole2(); + } else { + role2 = ClusterRole.UNKNOWN; + } + long peerAdminVersion = (peer != null) ? peer.getAdminCRRVersion() : 0L; + long baseVersion = Math.max(existing != null ? existing.getVersion() : 0L, + Math.max(local.getAdminCRRVersion(), peerAdminVersion)); + return new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.valueOf(local.getPolicy()), + RegistryType.ZK, this.zkUrl, local.getClusterRole(), local.getPeerZKUrl(), role2, + baseVersion + 1); + } + + /** + * NodeCache snapshot of the legacy CRR. {@code (null, null)} on cache miss; callers must confirm + * absence with an authoritative ZK read since the cache is eventually consistent. Caller passes + * the cache from a local snapshot, never the mutable field (which {@link #close()} may null at + * any time). + */ + private Pair readLegacyCrrSnapshot(NodeCache cache) { + ChildData current = cache.getCurrentData(); + if (current == null) { + return Pair.of(null, null); + } + ClusterRoleRecord record = ClusterRoleRecord.fromJson(current.getData()).orElse(null); + return Pair.of(record, current.getStat()); + } + + /** + * Initialize legacy {@code /phoenix/ha} sync: admin handle, NodeCache, single-thread executor, + * an off-lock initial sync, and the periodic reconciler. Called only when the feature is + * enabled and the client is healthy. + */ + private void setupLegacyCrrSync() throws Exception { + this.legacyHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf, PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE); + this.legacyCrrNodeCache = + new NodeCache(this.legacyHaAdmin.getCurator(), toPath(haGroupName)); + this.legacyCrrNodeCache.start(true); + this.legacyCrrSyncExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "HAGroupStoreClient-LegacyCRRSync-" + haGroupName); + t.setDaemon(true); + return t; + }); + // Run the initial sync off-thread so we don't block the static HAGroupStoreClient.class + // monitor held by getInstanceForZkUrl on JDBC/ZK I/O. + this.legacyCrrSyncExecutor.execute(this::syncLegacyCRRIfRoleChanged); + startLegacyCrrReconciliation(); + } + + /** Schedules the periodic reconciler; no-op when {@code intervalSec <= 0}. */ + private void startLegacyCrrReconciliation() { + long intervalSec = conf.getLong(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS, + DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS); + if (intervalSec <= 0) { + LOGGER.info("Legacy CRR periodic reconciliation disabled (interval={}s) for HA group {}", + intervalSec, haGroupName); + return; + } + long jitterSec = + ThreadLocalRandom.current().nextLong(0, LEGACY_CRR_SYNC_JOB_MAX_JITTER_SECONDS); + LOGGER.info("Starting legacy CRR reconciliation for HA group {} with initial delay {}s, " + + "then every {}s", haGroupName, jitterSec, intervalSec); + legacyCrrSyncExecutor.scheduleAtFixedRate(() -> { + try { + syncLegacyCRRIfRoleChanged(); + } catch (Throwable t) { + LOGGER.warn("Periodic legacy CRR reconciliation failed for HA group {}", haGroupName, t); + } + }, jitterSec, intervalSec, TimeUnit.SECONDS); + } + // ========== HA Group State Change Subscription Methods ========== /** diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java index f0fa7189520..896b017128e 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java @@ -39,7 +39,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException; import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.util.JDBCUtil; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -83,6 +85,25 @@ public CuratorFramework getCurator(String zkUrl, Properties properties, String n private static final Logger LOG = LoggerFactory.getLogger(PhoenixHAAdmin.class); + /** ZK's wildcard for {@code setData()/delete()}: bypasses version check. */ + static final int ZK_MATCH_ANY_VERSION = -1; + + /** + * Write mode for {@link #createOrUpdateClusterRoleRecordWithCAS}. The accompanying + * {@code expectedStatVersion} argument is interpreted only for {@link #CAS_WITH_VERSION}. + *

+ * Phoenix-internal. Not part of any public API contract — values may be added or renamed + * without notice. Do not switch exhaustively from outside this module. + */ + public enum LegacyCrrWriteMode { + /** Create the znode; no prior version expected. */ + CREATE_NEW, + /** Unconditional overwrite (no CAS). For operator/migration tooling only. */ + FORCE_OVERWRITE, + /** CAS update; {@code expectedStatVersion} must be {@code >= 0}. */ + CAS_WITH_VERSION + } + /** The fully qualified ZK URL for an HBase cluster in format host:port:/hbase */ private final String zkUrl; /** Configuration of this command line tool. */ @@ -524,17 +545,18 @@ public void updateHAGroupStoreRecordInZooKeeper(String haGroupName, } /** - * Gets the HAGroupStoreRecord and Stat from ZooKeeper. + * Gets the HAGroupStoreRecord and Stat from ZooKeeper. Reads (record, stat) atomically via + * {@code storingStatIn} so the returned stat version always corresponds to the returned bytes. * @param haGroupName the HA group name - * @return a pair of HAGroupStoreRecord and Stat + * @return a pair of HAGroupStoreRecord and Stat; both {@code null} if the znode does not exist * @throws IOException if any error occurs during the retrieval */ public Pair getHAGroupStoreRecordInZooKeeper(String haGroupName) throws IOException { try { - byte[] data = getCurator().getData().forPath(toPath(haGroupName)); + Stat stat = new Stat(); + byte[] data = getCurator().getData().storingStatIn(stat).forPath(toPath(haGroupName)); HAGroupStoreRecord record = HAGroupStoreRecord.fromJson(data).orElse(null); - Stat stat = getCurator().checkExists().forPath(toPath(haGroupName)); return Pair.of(record, stat); } catch (KeeperException.NoNodeException nne) { LOG.warn("No HAGroupStoreRecord for HA group {} in ZK", haGroupName, nne); @@ -559,6 +581,71 @@ public void deleteHAGroupStoreRecordInZooKeeper(String haGroupName) throws IOExc } } + // ----- Legacy /phoenix/ha ClusterRoleRecord sync helpers ----- + + /** + * Atomic read of (record, stat) on the legacy CRR znode. Returns {@code (null, null)} if the + * znode does not exist. + */ + public Pair getClusterRoleRecordAndStatInZooKeeper(String haGroupName) + throws IOException { + try { + Stat stat = new Stat(); + byte[] data = getCurator().getData().storingStatIn(stat).forPath(toPath(haGroupName)); + ClusterRoleRecord record = ClusterRoleRecord.fromJson(data).orElse(null); + return Pair.of(record, stat); + } catch (KeeperException.NoNodeException nne) { + return Pair.of(null, null); + } catch (Exception e) { + LOG.error("Failed to get ClusterRoleRecord for HA group {}", haGroupName, e); + throw new IOException("Failed to get ClusterRoleRecord for HA group " + haGroupName, e); + } + } + + /** + * Writes {@code newRecord} per {@code mode}. {@code expectedStatVersion} is used only for + * {@link LegacyCrrWriteMode#CAS_WITH_VERSION} (must be {@code >= 0}). Both BadVersion and + * NodeExists surface as {@link StaleClusterRoleRecordVersionException}. + */ + public void createOrUpdateClusterRoleRecordWithCAS(String haGroupName, + ClusterRoleRecord newRecord, LegacyCrrWriteMode mode, int expectedStatVersion) + throws IOException, StaleClusterRoleRecordVersionException { + Preconditions.checkNotNull(mode, "mode"); + if (mode == LegacyCrrWriteMode.CAS_WITH_VERSION) { + Preconditions.checkArgument(expectedStatVersion >= 0, + "CAS_WITH_VERSION requires expectedStatVersion >= 0; got " + expectedStatVersion); + } + try { + byte[] data = ClusterRoleRecord.toJson(newRecord); + switch (mode) { + case CREATE_NEW: + getCurator().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .forPath(toPath(haGroupName), data); + break; + case FORCE_OVERWRITE: + getCurator().setData().withVersion(ZK_MATCH_ANY_VERSION).forPath(toPath(haGroupName), + data); + break; + case CAS_WITH_VERSION: + getCurator().setData().withVersion(expectedStatVersion).forPath(toPath(haGroupName), + data); + break; + default: + throw new IllegalStateException("Unhandled LegacyCrrWriteMode: " + mode); + } + } catch (KeeperException.BadVersionException e) { + throw new StaleClusterRoleRecordVersionException( + "CAS failed for HA group " + haGroupName + " at expectedStatVersion " + expectedStatVersion, + e); + } catch (KeeperException.NodeExistsException e) { + throw new StaleClusterRoleRecordVersionException( + "Create failed for HA group " + haGroupName + ": node already exists", e); + } catch (Exception e) { + LOG.error("Failed to write ClusterRoleRecord for HA group {}", haGroupName, e); + throw new IOException("Failed to write ClusterRoleRecord for HA group " + haGroupName, e); + } + } + public String getZkUrl() { return zkUrl; } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index 72075853aa8..42514d73292 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -647,6 +647,16 @@ public interface QueryServices extends SQLCloseable { // HA Group Store sync job interval in seconds String HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = "phoenix.ha.group.store.sync.interval.seconds"; + // "CRR" = Cluster Role Record. Master switch for syncing the legacy /phoenix/ha cluster + // role record from /phoenix/consistentHA. When false, no legacy znode is read, written, or + // deleted by HAGroupStoreClient. + String PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED = "phoenix.ha.legacy.crr.sync.enabled"; + + // Periodic reconciliation interval for the legacy /phoenix/ha cluster role record sync, in + // seconds. 0 disables the periodic loop only; event-driven sync still runs. + String PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS = + "phoenix.ha.legacy.crr.reconciliation.interval.seconds"; + String REPLICATION_LOG_ROTATION_TIME_MS_KEY = "phoenix.replication.log.rotation.time.ms"; /** diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 339b6763a45..3e99b9fff68 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -92,6 +92,8 @@ import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK; import static org.apache.phoenix.query.QueryServices.PHOENIX_ACLS_ENABLED; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED; import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME; import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTRIB; @@ -517,6 +519,13 @@ public class QueryServicesOptions { // Default HA Group Store sync job interval in seconds (15 minutes = 900 seconds) public static final int DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = 900; + // Legacy /phoenix/ha CRR sync is opt-in (default off). + public static final boolean DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED = false; + + // Periodic reconciliation interval for legacy /phoenix/ha CRR sync, in seconds. + // 0 disables the periodic loop only; event-driven sync still runs. + public static final long DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS = 60L; + public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 * 1000L; private final Configuration config; @@ -643,7 +652,10 @@ public static QueryServicesOptions withDefaults() { .setIfUnset(HA_GROUP_STORE_SYNC_INTERVAL_SECONDS, DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS) .setIfUnset(HA_GROUP_STORE_CLIENT_PREWARM_ENABLED, - DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED); + DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED) + .setIfUnset(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED, DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED) + .setIfUnset(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS, + DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index edb38da970c..1f57187a2ca 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -4738,9 +4738,9 @@ public MutationState addColumn(PTable table, List origColumnDefs, /** * To check if TTL is defined at any of the child below we are checking it at * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List, ColumnMutator, int, PTable, PTable, boolean)} - * level where in function - * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], byte[], byte[], List, int)} - * we are already traversing through allDescendantViews. + * level where in function {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# + * validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], byte[], + * byte[], List, int)} we are already traversing through allDescendantViews. */ } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java index ba667876740..3e31656ab1e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java @@ -18,9 +18,12 @@ package org.apache.phoenix.jdbc; import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED; import static org.apache.phoenix.replication.reader.ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK; @@ -47,13 +50,16 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.exception.InvalidClusterRoleTransitionException; +import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException; import org.apache.phoenix.util.HAGroupStoreTestUtil; +import org.apache.phoenix.util.JDBCUtil; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; @@ -72,6 +78,8 @@ public class HAGroupStoreClientIT extends HABaseIT { private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 5000L; private PhoenixHAAdmin haAdmin; private PhoenixHAAdmin peerHaAdmin; + // Admin on the legacy /phoenix/ha namespace; used to inspect/seed/corrupt the legacy znode. + private PhoenixHAAdmin legacyHaAdmin; private String zkUrl; private String peerZKUrl; private String masterUrl; @@ -95,8 +103,11 @@ public void before() throws Exception { ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE); peerHaAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE); + legacyHaAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(), + PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE); haAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); peerHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); + legacyHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); zkUrl = getLocalZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration()); // Clean existing records in system table List haGroupNames = HAGroupStoreClient.getHAGroupNames(zkUrl); @@ -117,8 +128,10 @@ public void before() throws Exception { public void after() throws Exception { haAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); peerHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); + legacyHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); haAdmin.close(); peerHaAdmin.close(); + legacyHaAdmin.close(); } @Test @@ -1203,4 +1216,502 @@ public void testPeriodicSyncJobExecutorStartsAndSyncsData() throws Exception { assertTrue("Sync executor should be shutdown after close", syncExecutor.isShutdown()); } } + + // ============================================================================================ + // Legacy /phoenix/ha CRR sync tests + // Verify feature-flag gating, derivation, monotonic version, registry-type preservation, + // deletion mirroring, and short-circuit behavior of HAGroupStoreClient's legacy sync path. + // ============================================================================================ + + @Test + public void testLegacyCrrSyncFeatureOffByDefault_NoLegacyZnodeWritten() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(/* legacyEnabled */ false, /* periodicSec */ 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + Pair legacy = readLegacyCrr(haGroupName); + assertNull("Legacy CRR must not exist when feature is off", legacy.getLeft()); + assertNull("Legacy znode stat must be null when feature is off", legacy.getRight()); + } + + @Test + public void testLegacyCrrSyncFeatureOn_InitialSyncCreatesZkRegistryLegacyZnode() + throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + Pair legacy = awaitLegacyCrrPresent(haGroupName); + ClusterRoleRecord crr = legacy.getLeft(); + assertEquals("Legacy CRR must use ZK registry type for backward compatibility", + ClusterRoleRecord.RegistryType.ZK, crr.getRegistryType()); + assertEquals(haGroupName, crr.getHaGroupName()); + assertEquals(HighAvailabilityPolicy.FAILOVER, crr.getPolicy()); + // Local cluster role is ACTIVE per System Table seed. + assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE, + crr.getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertTrue("CRR version must be > 0 after initial sync", crr.getVersion() > 0); + } + + @Test + public void testLegacyCrrSyncRoleChangePropagatesAndIsNewerThanWorks() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + ClusterRoleRecord initial = awaitLegacyCrrPresent(haGroupName).getLeft(); + long initialVersion = initial.getVersion(); + + // ACTIVE_IN_SYNC -> ACTIVE_IN_SYNC_TO_STANDBY (role change ACTIVE -> ACTIVE_TO_STANDBY). + client.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY); + Pair updated = awaitLegacyCrrRole(haGroupName, ClusterType.LOCAL, + ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY); + ClusterRoleRecord updatedCRR = updated.getLeft(); + assertTrue("Version must monotonically increase after role change", + updatedCRR.getVersion() > initialVersion); + assertTrue("isNewerThan must return true for the updated record", + updatedCRR.isNewerThan(initial)); + assertEquals(ClusterRoleRecord.RegistryType.ZK, updatedCRR.getRegistryType()); + } + + @Test + public void testLegacyCrrSyncStateOnlyChangeDoesNotRewriteLegacy() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + Pair initial = awaitLegacyCrrPresent(haGroupName); + int initialZkVersion = initial.getRight().getVersion(); + long initialCrrVersion = initial.getLeft().getVersion(); + + // ACTIVE_IN_SYNC -> ACTIVE_NOT_IN_SYNC: ClusterRole stays ACTIVE. + client.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + Pair after = readLegacyCrr(haGroupName); + assertNotNull(after.getLeft()); + assertEquals("Legacy CRR ZK stat version must not change on state-only transitions", + initialZkVersion, after.getRight().getVersion()); + assertEquals("Legacy CRR logical version must not change on state-only transitions", + initialCrrVersion, after.getLeft().getVersion()); + } + + /** LOCAL CHILD_REMOVED on consistentHA does not delete the legacy znode. */ + @Test + public void testLegacyCrrSyncLocalChildRemovedDoesNotDeleteLegacy() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + Pair initial = awaitLegacyCrrPresent(haGroupName); + long initialVersion = initial.getLeft().getVersion(); + + haAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName); + + // Wait long enough for any potential event-driven delete to have fired. + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + Pair after = readLegacyCrr(haGroupName); + assertNotNull("Legacy znode must NOT be deleted on LOCAL CHILD_REMOVED", after.getLeft()); + assertTrue("Legacy CRR version must not regress after LOCAL CHILD_REMOVED", + after.getLeft().getVersion() >= initialVersion); + } + + @Test + public void testLegacyCrrSyncPeriodicDisabledStillSyncsViaEvents() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 0); // periodic disabled + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + ClusterRoleRecord initial = awaitLegacyCrrPresent(haGroupName).getLeft(); + assertEquals("Initial local role should be ACTIVE per @Before seed", + ClusterRoleRecord.ClusterRole.ACTIVE, initial.getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertEquals("Initial registry type must be ZK", ClusterRoleRecord.RegistryType.ZK, + initial.getRegistryType()); + + client.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY); + ClusterRoleRecord updated = awaitLegacyCrrRole(haGroupName, ClusterType.LOCAL, + ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).getLeft(); + assertTrue("Updated version must monotonically advance past the initial version", + updated.getVersion() > initial.getVersion()); + assertTrue("isNewerThan must return true for the post-event record", + updated.isNewerThan(initial)); + assertEquals("Registry type must remain ZK after an event-driven sync", + ClusterRoleRecord.RegistryType.ZK, updated.getRegistryType()); + } + + /** PEER CHILD_REMOVED on consistentHA does not delete the legacy znode. */ + @Test + public void testLegacyCrrSyncPeerChildRemovedDoesNotDeleteLegacy() throws Exception { + String haGroupName = testName.getMethodName(); + // Seed a peer record so that PEER cache initializes and PEER CHILD_REMOVED can fire later. + HAGroupStoreRecord peerRecord = + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peerRecord); + + Configuration conf = legacyCrrConf(true, 0); // periodic disabled to isolate event behavior + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + Pair initial = awaitLegacyCrrPresent(haGroupName); + long initialCrrVersion = initial.getLeft().getVersion(); + + peerHaAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + Pair after = readLegacyCrr(haGroupName); + assertNotNull("Legacy znode must NOT be deleted on PEER CHILD_REMOVED", after.getLeft()); + assertTrue("Legacy CRR version must not regress after PEER CHILD_REMOVED", + after.getLeft().getVersion() >= initialCrrVersion); + } + + /** + * Each {@link PhoenixHAAdmin.LegacyCrrWriteMode}: error mapping (BadVersion + NodeExists -> + * {@link StaleClusterRoleRecordVersionException}), unconditional FORCE_OVERWRITE, and + * CAS_WITH_VERSION rejecting negative versions. Sequential: ZK serializes versioned writes + * server-side, so the client retry path is identical to a real race. + */ + @Test + public void testLegacyCrrCasErrorMappingAndModeDispatch() throws Exception { + String haGroupName = testName.getMethodName(); + + // Create. + ClusterRoleRecord initial = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.ACTIVE, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 1L); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, initial, + PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0); + + Pair existing = + legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); + assertNotNull(existing.getLeft()); + int sharedVersion = existing.getRight().getVersion(); + + // CAS winner. + ClusterRoleRecord writerA = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, + ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, this.peerZKUrl, + ClusterRoleRecord.ClusterRole.STANDBY, 2L); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, writerA, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, sharedVersion); + + // CAS loser: same expected version -> BadVersion -> StaleClusterRoleRecordVersionException. + ClusterRoleRecord writerB = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.ACTIVE, 2L); + assertThrows(StaleClusterRoleRecordVersionException.class, + () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, writerB, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, sharedVersion)); + + Pair winner = + legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); + assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, + winner.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertTrue(winner.getRight().getVersion() > sharedVersion); + + // CREATE_NEW on an existing znode -> NodeExists -> StaleClusterRoleRecordVersionException. + ClusterRoleRecord raceCreate = + new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 3L); + assertThrows(StaleClusterRoleRecordVersionException.class, + () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, raceCreate, + PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0)); + + // FORCE_OVERWRITE bypasses CAS and bumps the stat version. + Stat statBeforeOverwrite = + legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName).getRight(); + ClusterRoleRecord overwrite = + new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 4L); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, overwrite, + PhoenixHAAdmin.LegacyCrrWriteMode.FORCE_OVERWRITE, 0); + Pair afterOverwrite = + legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); + assertEquals(ClusterRoleRecord.ClusterRole.STANDBY, + afterOverwrite.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertTrue(afterOverwrite.getRight().getVersion() > statBeforeOverwrite.getVersion()); + + // CAS_WITH_VERSION rejects negative expectedStatVersion before any ZK call. + ClusterRoleRecord illegalRecord = + new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.OFFLINE, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.OFFLINE, 5L); + assertThrows(IllegalArgumentException.class, + () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, illegalRecord, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, -1)); + assertThrows(IllegalArgumentException.class, + () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, illegalRecord, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, Integer.MIN_VALUE)); + } + + /** Peer-side role flip propagates to role2 in the local legacy CRR. */ + @Test + public void testLegacyCrrSyncPeerRoleFlipUpdatesLegacyRole2() throws Exception { + String haGroupName = testName.getMethodName(); + // Seed peer with STANDBY before client starts so the initial sync sees role2=STANDBY. + HAGroupStoreRecord peerStandby = + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peerStandby); + + Configuration conf = legacyCrrConf(true, 0); // periodic disabled to isolate event-driven path + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + awaitLegacyCrrRole(haGroupName, ClusterType.PEER, ClusterRoleRecord.ClusterRole.STANDBY); + + // Flip the peer record to a state whose cluster role is ACTIVE_TO_STANDBY. + HAGroupStoreRecord peerFlipped = new HAGroupStoreRecord("v1.0", haGroupName, + HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peerFlipped); + + Pair after = awaitLegacyCrrRole(haGroupName, ClusterType.PEER, + ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY); + assertEquals("Registry type must remain ZK after a peer-driven role flip", + ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); + } + + /** Absent peer record yields role2=UNKNOWN; converges when the peer record appears. */ + @Test + public void testLegacyCrrSyncPeerAbsentYieldsUnknownAndConvergesOnRecovery() throws Exception { + String haGroupName = testName.getMethodName(); + // No peer record seeded: peer cache is empty so getHAGroupStoreRecordFromPeer() returns null + // and role2 falls through to UNKNOWN. + Configuration conf = legacyCrrConf(true, 0); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + Pair initial = + awaitLegacyCrrRole(haGroupName, ClusterType.PEER, ClusterRoleRecord.ClusterRole.UNKNOWN); + long initialVersion = initial.getLeft().getVersion(); + + // Peer "recovers" by writing its consistentHA record. The PEER CHILD_ADDED event triggers + // the legacy sync to update role2. + HAGroupStoreRecord peerRecord = + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peerRecord); + + Pair recovered = + awaitLegacyCrrRole(haGroupName, ClusterType.PEER, ClusterRoleRecord.ClusterRole.STANDBY); + assertTrue("Version must bump when role2 transitions UNKNOWN -> STANDBY", + recovered.getLeft().getVersion() > initialVersion); + } + + /** registryType stays ZK across multiple sync cycles (never reverts to RPC). */ + @Test + public void testLegacyCrrSyncRegistryTypePreservedAcrossMultipleCycles() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 0); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + Pair initial = awaitLegacyCrrPresent(haGroupName); + assertEquals(ClusterRoleRecord.RegistryType.ZK, initial.getLeft().getRegistryType()); + assertEquals("Initial local role should be ACTIVE per @Before seed", + ClusterRoleRecord.ClusterRole.ACTIVE, + initial.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + long lastVersion = initial.getLeft().getVersion(); + + // Drive a sequence of distinct peer states; each event drives a sync that rewrites the + // legacy znode (or short-circuits if logically equal). Direct ZK writes intentionally + // bypass setHAGroupStatusIfNeeded's transition guard. + HAGroupStoreRecord.HAGroupState[] cycle = + new HAGroupStoreRecord.HAGroupState[] { HAGroupStoreRecord.HAGroupState.STANDBY, + HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, + HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE, HAGroupStoreRecord.HAGroupState.OFFLINE, + HAGroupStoreRecord.HAGroupState.STANDBY }; + for (HAGroupStoreRecord.HAGroupState state : cycle) { + HAGroupStoreRecord peer = new HAGroupStoreRecord("v1.0", haGroupName, state, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peer); + Pair after = + awaitLegacyCrrRole(haGroupName, ClusterType.PEER, state.getClusterRole()); + assertEquals( + "Local role must remain ACTIVE across peer-driven cycles (peer state=" + state + ")", + ClusterRoleRecord.ClusterRole.ACTIVE, + after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertEquals("Registry type must remain ZK after a sync cycle (peer state=" + state + ")", + ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); + assertTrue( + "Logical version must monotonically increase across distinct sync cycles (peer state=" + + state + ")", + after.getLeft().getVersion() > lastVersion); + lastVersion = after.getLeft().getVersion(); + } + } + + /** Periodic loop repairs an external divergence with no consistentHA event. */ + @Test + public void testLegacyCrrSyncPeriodicReconciliationRecoversAfterDivergence() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 2); // 2s interval; jitter is 0-30s on first run + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + Pair initial = awaitLegacyCrrPresent(haGroupName); + assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE, + initial.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + + // Externally corrupt the legacy znode; no consistentHA event fires, so only the periodic + // reconciler can recover. + ClusterRoleRecord corrupt = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.ACTIVE, initial.getLeft().getVersion() + 10); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, corrupt, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, initial.getRight().getVersion()); + Pair corrupted = readLegacyCrr(haGroupName); + assertEquals(ClusterRoleRecord.ClusterRole.STANDBY, + corrupted.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + + // Worst-case wait: jitter up to 30s + 2s interval; allow 40s. + long deadline = System.currentTimeMillis() + 40_000L; + Pair after = readLegacyCrr(haGroupName); + while ( + (after.getLeft() == null || after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)) + != ClusterRoleRecord.ClusterRole.ACTIVE) + && System.currentTimeMillis() < deadline + ) { + Thread.sleep(500); + after = readLegacyCrr(haGroupName); + } + assertNotNull(after.getLeft()); + assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE, + after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertTrue(after.getLeft().getVersion() > corrupted.getLeft().getVersion()); + assertEquals(ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); + } + + /** + * Peer view absent: client preserves the pre-seeded {@code role2} rather than downgrading it + * to UNKNOWN. Information from a prior write is more authoritative than a transient gap in + * the local peer cache; another RS with peer visibility (or this client once peer recovers) + * will overwrite when there is real news. + */ + @Test + public void testLegacyCrrSyncPreservesPreSeededRole2WhenPeerMissing() throws Exception { + String haGroupName = testName.getMethodName(); + // Pre-seed role2=OFFLINE; do NOT create a peer consistentHA record. + ClusterRoleRecord preSeed = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.ACTIVE, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.OFFLINE, 5L); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, preSeed, + PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0); + Pair seeded = readLegacyCrr(haGroupName); + assertNotNull(seeded.getLeft()); + int seededStatVersion = seeded.getRight().getVersion(); + + Configuration conf = legacyCrrConf(true, 0); // periodic disabled; initial sync only + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + // Allow the initial async sync ample time to run. With peer absent and role2=OFFLINE + // already in the znode, the equality check must short-circuit and the znode must remain + // byte-identical. + Thread.sleep(3_000); + + Pair after = readLegacyCrr(haGroupName); + assertNotNull(after.getLeft()); + assertEquals("Pre-seeded role2 must be preserved when peer view is absent", + ClusterRoleRecord.ClusterRole.OFFLINE, + after.getLeft().getRole(formattedZkUrlFor(ClusterType.PEER))); + assertEquals("Znode must not be rewritten when desired record is logically equal", + seededStatVersion, after.getRight().getVersion()); + assertEquals(seeded.getLeft().getVersion(), after.getLeft().getVersion()); + } + + /** + * Peer view present: client overwrites a pre-seeded stale {@code role2} with the live peer + * state on the initial sync, bumping the version. + */ + @Test + public void testLegacyCrrSyncOverwritesPreSeededRole2WhenPeerPresent() throws Exception { + String haGroupName = testName.getMethodName(); + // Pre-seed role2=OFFLINE. + ClusterRoleRecord preSeed = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.ACTIVE, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.OFFLINE, 5L); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, preSeed, + PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0); + Pair seeded = readLegacyCrr(haGroupName); + assertNotNull(seeded.getLeft()); + + // Create a peer consistentHA record so the sync can see real peer state. + HAGroupStoreRecord peerRecord = + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peerRecord); + + Configuration conf = legacyCrrConf(true, 0); // periodic disabled; initial sync only + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + Pair after = + awaitLegacyCrrRole(haGroupName, ClusterType.PEER, ClusterRoleRecord.ClusterRole.STANDBY); + assertEquals(ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); + assertTrue("Version must bump when peer state replaces stale role2", + after.getLeft().getVersion() > seeded.getLeft().getVersion()); + } + + // ---------- Legacy CRR sync test helpers ---------- + + /** Configuration clone with the legacy CRR flag and reconciliation interval set. */ + private Configuration legacyCrrConf(boolean legacyEnabled, long periodicSec) { + Configuration src = CLUSTERS.getHBaseCluster1().getConfiguration(); + Configuration cloned = new Configuration(src); + cloned.setBoolean(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED, legacyEnabled); + cloned.setLong(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS, periodicSec); + return cloned; + } + + private Pair readLegacyCrr(String haGroupName) throws IOException { + return legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); + } + + /** Polls the legacy CRR until {@code condition} matches or the propagation deadline elapses. */ + private Pair awaitLegacyCrr(String haGroupName, + Predicate condition, String description) throws Exception { + long deadline = System.currentTimeMillis() + ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS; + Pair legacy = readLegacyCrr(haGroupName); + while ( + (legacy.getLeft() == null || !condition.test(legacy.getLeft())) + && System.currentTimeMillis() < deadline + ) { + Thread.sleep(100); + legacy = readLegacyCrr(haGroupName); + } + assertNotNull("Legacy znode missing while awaiting: " + description, legacy.getLeft()); + assertTrue("Legacy CRR condition not met within timeout: " + description, + condition.test(legacy.getLeft())); + return legacy; + } + + private Pair awaitLegacyCrrPresent(String haGroupName) throws Exception { + return awaitLegacyCrr(haGroupName, crr -> true, "znode present"); + } + + /** Polls until the LOCAL or PEER role in the legacy CRR matches {@code expectedRole}. */ + private Pair awaitLegacyCrrRole(String haGroupName, + ClusterType clusterType, ClusterRoleRecord.ClusterRole expectedRole) throws Exception { + String url = formattedZkUrlFor(clusterType); + return awaitLegacyCrr(haGroupName, crr -> crr.getRole(url) == expectedRole, + clusterType + " role == " + expectedRole); + } + + /** LOCAL or PEER ZK URL in the canonical ZK-registry form used by the legacy sync. */ + private String formattedZkUrlFor(ClusterType clusterType) { + String raw = (clusterType == ClusterType.LOCAL) ? zkUrl : peerZKUrl; + return JDBCUtil.formatUrl(raw, ClusterRoleRecord.RegistryType.ZK); + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java index e6e71d86a3f..07b3bb51bba 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java @@ -229,6 +229,110 @@ public void testClusterRoleFromInvalidBytes() { assertEquals(ClusterRole.UNKNOWN, role); } + /** JSON without a {@code registryType} field defaults to RPC on deserialization. */ + @Test + public void testFromJsonWithoutRegistryTypeDefaultsToRpc() throws IOException { + byte[] data = readFile("json/test_role_record_no_registry_type.json"); + Optional opt = ClusterRoleRecord.fromJson(data); + assertTrue(opt.isPresent()); + ClusterRoleRecord record = opt.get(); + assertEquals(ClusterRoleRecord.RegistryType.RPC, record.getRegistryType()); + assertEquals(HighAvailabilityPolicy.FAILOVER, record.getPolicy()); + assertEquals(7L, record.getVersion()); + assertEquals(ClusterRole.ACTIVE, record.getRole1()); + assertEquals(ClusterRole.STANDBY, record.getRole2()); + } + + /** Explicit {@code registryType=RPC} must round-trip as RPC. */ + @Test + public void testFromJsonExplicitRpcRegistryTypeRoundTrips() throws IOException { + Optional opt = + ClusterRoleRecord.fromJson(readFile("json/test_role_record_explicit_rpc.json")); + assertTrue(opt.isPresent()); + assertEquals(ClusterRoleRecord.RegistryType.RPC, opt.get().getRegistryType()); + assertEquals(11L, opt.get().getVersion()); + } + + /** Explicit {@code registryType=ZK} round-trips as ZK. */ + @Test + public void testFromJsonExplicitZkRegistryTypeRoundTrips() throws IOException { + Optional opt = + ClusterRoleRecord.fromJson(readFile("json/test_role_record_explicit_zk.json")); + assertTrue(opt.isPresent()); + assertEquals(ClusterRoleRecord.RegistryType.ZK, opt.get().getRegistryType()); + assertEquals(13L, opt.get().getVersion()); + } + + /** Round-trip: ZK-registry CRR -> JSON -> CRR preserves {@code registryType}. */ + @Test + public void testToFromJsonPreservesZkRegistryTypeAcrossRoundTrip() throws IOException { + ClusterRoleRecord written = new ClusterRoleRecord(testName.getMethodName(), + HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", + ClusterRole.ACTIVE, "zk2\\:2181::/hbase", ClusterRole.STANDBY, 42L); + Optional read = + ClusterRoleRecord.fromJson(ClusterRoleRecord.toJson(written)); + assertTrue(read.isPresent()); + assertEquals(ClusterRoleRecord.RegistryType.ZK, read.get().getRegistryType()); + assertEquals(written, read.get()); + } + + // Tests for isLogicallyEqualIgnoringVersionAndRegistry + + @Test + public void testLogicalEquality_nullOther() { + ClusterRoleRecord r = new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", ClusterRole.ACTIVE, + "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L); + assertFalse(r.isLogicallyEqualIgnoringVersionAndRegistry(null)); + } + + @Test + public void testLogicalEquality_sameFieldsDifferentVersion() { + ClusterRoleRecord a = new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", ClusterRole.ACTIVE, + "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L); + ClusterRoleRecord b = new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", ClusterRole.ACTIVE, + "zk2\\:2181::/hbase", ClusterRole.STANDBY, 42L); + assertTrue(a.isLogicallyEqualIgnoringVersionAndRegistry(b)); + assertTrue(b.isLogicallyEqualIgnoringVersionAndRegistry(a)); + } + + @Test + public void testLogicalEquality_zkVsRpcWithDifferentUrlForms() { + // Same logical roles but different URL forms (different registryType normalization) + // are NOT equal because the normalized url1/url2 differ. + ClusterRoleRecord zkForm = new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", ClusterRole.ACTIVE, + "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L); + ClusterRoleRecord rpcForm = new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.RPC, "master1\\:16000", ClusterRole.ACTIVE, + "master2\\:16000", ClusterRole.STANDBY, 1L); + assertFalse(zkForm.isLogicallyEqualIgnoringVersionAndRegistry(rpcForm)); + } + + @Test + public void testLogicalEquality_differentRole() { + ClusterRoleRecord a = new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", ClusterRole.ACTIVE, + "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L); + ClusterRoleRecord b = new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", ClusterRole.ACTIVE, + "zk2\\:2181::/hbase", ClusterRole.OFFLINE, 1L); + assertFalse(a.isLogicallyEqualIgnoringVersionAndRegistry(b)); + } + + @Test + public void testLogicalEquality_differentHaGroupName() { + ClusterRoleRecord a = new ClusterRoleRecord("g1", HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", ClusterRole.ACTIVE, + "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L); + ClusterRoleRecord b = new ClusterRoleRecord("g2", HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", ClusterRole.ACTIVE, + "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L); + assertFalse(a.isLogicallyEqualIgnoringVersionAndRegistry(b)); + } + // Private Helper Methods private ClusterRoleRecord getClusterRoleRecord(String name, HighAvailabilityPolicy policy, diff --git a/phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json b/phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json new file mode 100644 index 00000000000..e2b7269b48b --- /dev/null +++ b/phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json @@ -0,0 +1,10 @@ +{ + "haGroupName" : "testFromJsonExplicitRpcRoundTrips", + "policy" : "FAILOVER", + "registryType" : "RPC", + "url1" : "url1\\:2181", + "role1" : "ACTIVE", + "url2" : "url2\\:2181", + "role2" : "STANDBY", + "version" : 11 +} diff --git a/phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json b/phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json new file mode 100644 index 00000000000..1e9ce174ef7 --- /dev/null +++ b/phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json @@ -0,0 +1,10 @@ +{ + "haGroupName" : "testFromJsonExplicitZkRoundTrips", + "policy" : "FAILOVER", + "registryType" : "ZK", + "url1" : "zk1\\:2181::/hbase", + "role1" : "ACTIVE", + "url2" : "zk2\\:2181::/hbase", + "role2" : "STANDBY", + "version" : 13 +} diff --git a/phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json b/phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json new file mode 100644 index 00000000000..517e061b654 --- /dev/null +++ b/phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json @@ -0,0 +1,9 @@ +{ + "haGroupName" : "testFromJsonWithoutRegistryTypeDefaultsToRpc", + "policy" : "FAILOVER", + "url1" : "url1\\:2181", + "role1" : "ACTIVE", + "url2" : "url2\\:2181", + "role2" : "STANDBY", + "version" : 7 +} From 8fbb90544825b88df5564e504e057294c1bc1263 Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Sun, 17 May 2026 14:48:27 -0700 Subject: [PATCH 2/4] PHOENIX-7787 Look up preserved peer role by URL in buildDesiredLegacyCrr ClusterRoleRecord's ctor canonicalizes url1/url2 by lexical order, so existing.getRole2() returns the role for whichever URL sorts larger - not necessarily the peer URL. When the local peer cache is empty and we were trying to preserve existing peer role, we'd half the time inherit the local role instead, then the equality check would see a "logical change" and trigger a redundant CAS write. Manifests as flakes (deployment-dependent on ZK URL alphabetical order) in HAGroupStoreClientIT.testLegacyCrrSyncPreservesPreSeededRole2WhenPeerMissing and testLegacyCrrSyncStateOnlyChangeDoesNotRewriteLegacy under the full IT suite; clean 40/40 across two consecutive full-verify runs with the fix. Co-authored-by: Cursor --- .../java/org/apache/phoenix/jdbc/HAGroupStoreClient.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java index 9f9afbd3dab..211b9c8b2a8 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java @@ -1202,10 +1202,14 @@ private boolean shouldWriteLegacyCrr(ClusterRoleRecord existing) { /** * Builds the desired legacy CRR, always stamped {@link RegistryType#ZK}. When the local - * client's peer view is unavailable, preserves the {@code existing} record's {@code role2} + * client's peer view is unavailable, preserves the {@code existing} record's peer role * rather than downgrading it to {@link ClusterRole#UNKNOWN} — another RS with peer visibility * would otherwise keep flipping it back, and the equality check naturally short-circuits * when no other field changed. + *

+ * Look up the preserved role by the peer URL (not by {@code getRole2()}) since + * {@link ClusterRoleRecord} canonicalizes {@code url1}/{@code url2} by alphabetical order; + * the peer URL may end up in either slot depending on lexical comparison. */ private ClusterRoleRecord buildDesiredLegacyCrr(HAGroupStoreRecord local, HAGroupStoreRecord peer, ClusterRoleRecord existing) { @@ -1213,7 +1217,8 @@ private ClusterRoleRecord buildDesiredLegacyCrr(HAGroupStoreRecord local, HAGrou if (peer != null) { role2 = peer.getClusterRole(); } else if (existing != null) { - role2 = existing.getRole2(); + role2 = existing.getRole( + JDBCUtil.formatUrl(local.getPeerZKUrl(), RegistryType.ZK)); } else { role2 = ClusterRole.UNKNOWN; } From 3e95d017699d464c2cf5605466ed121fe9ea2df4 Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Mon, 18 May 2026 08:48:56 -0700 Subject: [PATCH 3/4] PHOENIX-7787 Apply spotless formatting Pure formatting changes from `mvn spotless:apply` across the touched files; no behavior change. Generated-by: Cursor (Claude). Co-authored-by: Cursor --- ...taleClusterRoleRecordVersionException.java | 3 +- .../phoenix/jdbc/HAGroupStoreClient.java | 33 +++++++-------- .../apache/phoenix/jdbc/PhoenixHAAdmin.java | 5 +-- .../phoenix/jdbc/HAGroupStoreClientIT.java | 12 +++--- .../phoenix/jdbc/ClusterRoleRecordTest.java | 40 +++++++++---------- 5 files changed, 45 insertions(+), 48 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java index 0b20a53a7bc..e15a716fa66 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java @@ -19,7 +19,8 @@ /** * CAS write to the legacy {@code /phoenix/ha} CRR znode failed (BadVersion or NodeExists); the - * caller can re-read and retry if needed. Analog of {@link StaleHAGroupStoreRecordVersionException}. + * caller can re-read and retry if needed. Analog of + * {@link StaleHAGroupStoreRecordVersionException}. */ public class StaleClusterRoleRecordVersionException extends Exception { private static final long serialVersionUID = 1L; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java index 211b9c8b2a8..df8ea2da5e5 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java @@ -1141,8 +1141,7 @@ private void syncLegacyCRRIfRoleChanged() { } // Wait for peer URL before building the desired CRR (ctor NPEs on null url2). if (StringUtils.isBlank(local.getPeerZKUrl())) { - LOGGER.debug("Skipping legacy CRR sync for HA group {}: peer ZK URL is blank", - haGroupName); + LOGGER.debug("Skipping legacy CRR sync for HA group {}: peer ZK URL is blank", haGroupName); return; } HAGroupStoreRecord peer = getHAGroupStoreRecordFromPeer(); @@ -1175,8 +1174,8 @@ private void syncLegacyCRRIfRoleChanged() { existing != null ? existing.getVersion() : -1L, desired.getVersion()); } catch (StaleClusterRoleRecordVersionException stale) { // CAS lost; next event/periodic cycle reconverges. - LOGGER.info("Legacy CRR CAS lost for HA group {} at expected stat version {}", - haGroupName, existingStat != null ? existingStat.getVersion() : -1); + LOGGER.info("Legacy CRR CAS lost for HA group {} at expected stat version {}", haGroupName, + existingStat != null ? existingStat.getVersion() : -1); } } catch (Exception e) { LOGGER.warn( @@ -1201,15 +1200,15 @@ private boolean shouldWriteLegacyCrr(ClusterRoleRecord existing) { } /** - * Builds the desired legacy CRR, always stamped {@link RegistryType#ZK}. When the local - * client's peer view is unavailable, preserves the {@code existing} record's peer role - * rather than downgrading it to {@link ClusterRole#UNKNOWN} — another RS with peer visibility - * would otherwise keep flipping it back, and the equality check naturally short-circuits - * when no other field changed. + * Builds the desired legacy CRR, always stamped {@link RegistryType#ZK}. When the local client's + * peer view is unavailable, preserves the {@code existing} record's peer role rather than + * downgrading it to {@link ClusterRole#UNKNOWN} — another RS with peer visibility would otherwise + * keep flipping it back, and the equality check naturally short-circuits when no other field + * changed. *

* Look up the preserved role by the peer URL (not by {@code getRole2()}) since - * {@link ClusterRoleRecord} canonicalizes {@code url1}/{@code url2} by alphabetical order; - * the peer URL may end up in either slot depending on lexical comparison. + * {@link ClusterRoleRecord} canonicalizes {@code url1}/{@code url2} by alphabetical order; the + * peer URL may end up in either slot depending on lexical comparison. */ private ClusterRoleRecord buildDesiredLegacyCrr(HAGroupStoreRecord local, HAGroupStoreRecord peer, ClusterRoleRecord existing) { @@ -1217,8 +1216,7 @@ private ClusterRoleRecord buildDesiredLegacyCrr(HAGroupStoreRecord local, HAGrou if (peer != null) { role2 = peer.getClusterRole(); } else if (existing != null) { - role2 = existing.getRole( - JDBCUtil.formatUrl(local.getPeerZKUrl(), RegistryType.ZK)); + role2 = existing.getRole(JDBCUtil.formatUrl(local.getPeerZKUrl(), RegistryType.ZK)); } else { role2 = ClusterRole.UNKNOWN; } @@ -1246,14 +1244,13 @@ private Pair readLegacyCrrSnapshot(NodeCache cache) { } /** - * Initialize legacy {@code /phoenix/ha} sync: admin handle, NodeCache, single-thread executor, - * an off-lock initial sync, and the periodic reconciler. Called only when the feature is - * enabled and the client is healthy. + * Initialize legacy {@code /phoenix/ha} sync: admin handle, NodeCache, single-thread executor, an + * off-lock initial sync, and the periodic reconciler. Called only when the feature is enabled and + * the client is healthy. */ private void setupLegacyCrrSync() throws Exception { this.legacyHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf, PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE); - this.legacyCrrNodeCache = - new NodeCache(this.legacyHaAdmin.getCurator(), toPath(haGroupName)); + this.legacyCrrNodeCache = new NodeCache(this.legacyHaAdmin.getCurator(), toPath(haGroupName)); this.legacyCrrNodeCache.start(true); this.legacyCrrSyncExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "HAGroupStoreClient-LegacyCRRSync-" + haGroupName); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java index 896b017128e..0eaa35e163c 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException; import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException; -import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.util.JDBCUtil; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -92,8 +91,8 @@ public CuratorFramework getCurator(String zkUrl, Properties properties, String n * Write mode for {@link #createOrUpdateClusterRoleRecordWithCAS}. The accompanying * {@code expectedStatVersion} argument is interpreted only for {@link #CAS_WITH_VERSION}. *

- * Phoenix-internal. Not part of any public API contract — values may be added or renamed - * without notice. Do not switch exhaustively from outside this module. + * Phoenix-internal. Not part of any public API contract — values may be added or renamed without + * notice. Do not switch exhaustively from outside this module. */ public enum LegacyCrrWriteMode { /** Create the znode; no prior version expected. */ diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java index 3e31656ab1e..c7e0ee21f42 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java @@ -1593,10 +1593,10 @@ public void testLegacyCrrSyncPeriodicReconciliationRecoversAfterDivergence() thr } /** - * Peer view absent: client preserves the pre-seeded {@code role2} rather than downgrading it - * to UNKNOWN. Information from a prior write is more authoritative than a transient gap in - * the local peer cache; another RS with peer visibility (or this client once peer recovers) - * will overwrite when there is real news. + * Peer view absent: client preserves the pre-seeded {@code role2} rather than downgrading it to + * UNKNOWN. Information from a prior write is more authoritative than a transient gap in the local + * peer cache; another RS with peer visibility (or this client once peer recovers) will overwrite + * when there is real news. */ @Test public void testLegacyCrrSyncPreservesPreSeededRole2WhenPeerMissing() throws Exception { @@ -1631,8 +1631,8 @@ public void testLegacyCrrSyncPreservesPreSeededRole2WhenPeerMissing() throws Exc } /** - * Peer view present: client overwrites a pre-seeded stale {@code role2} with the live peer - * state on the initial sync, bumping the version. + * Peer view present: client overwrites a pre-seeded stale {@code role2} with the live peer state + * on the initial sync, bumping the version. */ @Test public void testLegacyCrrSyncOverwritesPreSeededRole2WhenPeerPresent() throws Exception { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java index 07b3bb51bba..4a2e43a3b85 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java @@ -280,20 +280,20 @@ public void testToFromJsonPreservesZkRegistryTypeAcrossRoundTrip() throws IOExce @Test public void testLogicalEquality_nullOther() { - ClusterRoleRecord r = new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, - ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", ClusterRole.ACTIVE, - "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L); + ClusterRoleRecord r = + new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, + "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L); assertFalse(r.isLogicallyEqualIgnoringVersionAndRegistry(null)); } @Test public void testLogicalEquality_sameFieldsDifferentVersion() { - ClusterRoleRecord a = new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, - ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", ClusterRole.ACTIVE, - "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L); - ClusterRoleRecord b = new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, - ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", ClusterRole.ACTIVE, - "zk2\\:2181::/hbase", ClusterRole.STANDBY, 42L); + ClusterRoleRecord a = + new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, + "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L); + ClusterRoleRecord b = + new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, + "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase", ClusterRole.STANDBY, 42L); assertTrue(a.isLogicallyEqualIgnoringVersionAndRegistry(b)); assertTrue(b.isLogicallyEqualIgnoringVersionAndRegistry(a)); } @@ -302,23 +302,23 @@ public void testLogicalEquality_sameFieldsDifferentVersion() { public void testLogicalEquality_zkVsRpcWithDifferentUrlForms() { // Same logical roles but different URL forms (different registryType normalization) // are NOT equal because the normalized url1/url2 differ. - ClusterRoleRecord zkForm = new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, - ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", ClusterRole.ACTIVE, - "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L); + ClusterRoleRecord zkForm = + new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, + "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L); ClusterRoleRecord rpcForm = new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, - ClusterRoleRecord.RegistryType.RPC, "master1\\:16000", ClusterRole.ACTIVE, - "master2\\:16000", ClusterRole.STANDBY, 1L); + ClusterRoleRecord.RegistryType.RPC, "master1\\:16000", ClusterRole.ACTIVE, "master2\\:16000", + ClusterRole.STANDBY, 1L); assertFalse(zkForm.isLogicallyEqualIgnoringVersionAndRegistry(rpcForm)); } @Test public void testLogicalEquality_differentRole() { - ClusterRoleRecord a = new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, - ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", ClusterRole.ACTIVE, - "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L); - ClusterRoleRecord b = new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, - ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", ClusterRole.ACTIVE, - "zk2\\:2181::/hbase", ClusterRole.OFFLINE, 1L); + ClusterRoleRecord a = + new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, + "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L); + ClusterRoleRecord b = + new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, + "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase", ClusterRole.OFFLINE, 1L); assertFalse(a.isLogicallyEqualIgnoringVersionAndRegistry(b)); } From 7344771030e4aa84a9d264dc1c55aaef085d6327 Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Fri, 22 May 2026 13:22:50 -0700 Subject: [PATCH 4/4] PHOENIX-7787 Address PR review comments Co-authored-by: Cursor --- .../phoenix/jdbc/ClusterRoleRecord.java | 6 ++- .../phoenix/jdbc/HAGroupStoreClient.java | 48 ++++++++++++++----- .../phoenix/jdbc/HAGroupStoreClientIT.java | 47 +++++++++++++++++- 3 files changed, 86 insertions(+), 15 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java index 327db3eaa5e..7e5b597fb92 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java @@ -243,8 +243,10 @@ public boolean hasSameInfo(ClusterRoleRecord other) { /** * Equality on the six identity/role fields ({@code haGroupName, policy, url1, url2, role1, - * role2}); ignores {@code version} (always bumps) and {@code registryType} (avoids RPC->ZK - * thrash). Returns {@code false} if {@code other} is {@code null}. + * role2}); ignores {@code version} (always bumps) and the {@code registryType} field itself. + * Same-registry callers only: {@code url1}/{@code url2} are normalized at construction per + * {@code registryType}, so cross-registry records will not compare equal even for the same + * underlying host:port. Returns {@code false} if {@code other} is {@code null}. */ public boolean isLogicallyEqualIgnoringVersionAndRegistry(ClusterRoleRecord other) { if (other == null) { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java index df8ea2da5e5..9d2c268eae8 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java @@ -61,6 +61,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -183,8 +184,15 @@ public static HAGroupStoreClient getInstanceForZkUrl(Configuration conf, String result.close(); result = null; } else { - instances.putIfAbsent(localZkUrl, new ConcurrentHashMap<>()); - instances.get(localZkUrl).put(haGroupName, result); + // Atomic register; pairs with deregisterFromInstances()'s computeIfPresent. + final HAGroupStoreClient created = result; + instances.compute(localZkUrl, (k, v) -> { + if (v == null) { + v = new ConcurrentHashMap<>(); + } + v.put(haGroupName, created); + return v; + }); } } } @@ -917,7 +925,13 @@ private PathChildrenCacheListener createCacheListener(CountDownLatch latch, // Curator's per-namespace event dispatcher. ScheduledExecutorService syncExec = legacyCrrSyncExecutor; if (syncExec != null) { - syncExec.execute(this::syncLegacyCRRIfRoleChanged); + try { + syncExec.execute(this::syncLegacyCRRIfRoleChanged); + } catch (RejectedExecutionException ree) { + // Executor already shutting down (close() race); drop silently. + LOGGER.debug("Legacy CRR sync skipped for HA group {}: executor shut down", + haGroupName); + } } } break; @@ -1014,13 +1028,12 @@ private void closePeerConnection() { } } - /** - * Shuts down the periodic sync executor gracefully. - */ /** * Remove this instance from the static {@link #instances} map. Idempotent. Uses value-based * remove so that, if a concurrent {@link #getInstanceForZkUrl} has already swapped in a fresh - * replacement, the replacement is preserved. + * replacement, the replacement is preserved. The outer-CHM {@code computeIfPresent} below pairs + * with the {@code compute} in {@link #getInstanceForZkUrl} to serialize bucket creation/removal + * on the same key. */ private void deregisterFromInstances() { final String key = (this.zkUrl != null) ? this.zkUrl : getLocalZkUrl(this.conf); @@ -1173,8 +1186,9 @@ private void syncLegacyCRRIfRoleChanged() { LOGGER.info("Synced legacy CRR for HA group {} (version {} -> {})", haGroupName, existing != null ? existing.getVersion() : -1L, desired.getVersion()); } catch (StaleClusterRoleRecordVersionException stale) { - // CAS lost; next event/periodic cycle reconverges. - LOGGER.info("Legacy CRR CAS lost for HA group {} at expected stat version {}", haGroupName, + // CAS lost (another RS won); next event/periodic cycle reconverges. DEBUG: N-1 RSes + // log this per transition. + LOGGER.debug("Legacy CRR CAS lost for HA group {} at expected stat version {}", haGroupName, existingStat != null ? existingStat.getVersion() : -1); } } catch (Exception e) { @@ -1251,14 +1265,24 @@ private Pair readLegacyCrrSnapshot(NodeCache cache) { private void setupLegacyCrrSync() throws Exception { this.legacyHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf, PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE); this.legacyCrrNodeCache = new NodeCache(this.legacyHaAdmin.getCurator(), toPath(haGroupName)); - this.legacyCrrNodeCache.start(true); + // Async start; rebuild() below warms the cache off-lock. + this.legacyCrrNodeCache.start(false); this.legacyCrrSyncExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "HAGroupStoreClient-LegacyCRRSync-" + haGroupName); t.setDaemon(true); return t; }); - // Run the initial sync off-thread so we don't block the static HAGroupStoreClient.class - // monitor held by getInstanceForZkUrl on JDBC/ZK I/O. + // Warm NodeCache and run initial sync off-thread so neither blocks the static + // HAGroupStoreClient.class monitor held by getInstanceForZkUrl on ZK/JDBC I/O. The sync's + // empty-snapshot fallback handles the race where it runs before rebuild() lands. + final NodeCache cacheRef = this.legacyCrrNodeCache; + this.legacyCrrSyncExecutor.execute(() -> { + try { + cacheRef.rebuild(); + } catch (Exception e) { + LOGGER.debug("Legacy CRR cache rebuild failed for HA group {}", haGroupName, e); + } + }); this.legacyCrrSyncExecutor.execute(this::syncLegacyCRRIfRoleChanged); startLegacyCrrReconciliation(); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java index c7e0ee21f42..8997fc2bc4a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; @@ -765,7 +766,6 @@ public void testSetHAGroupStatusIfNeededWithInvalidTransition() throws Exception @Test public void testSetHAGroupStatusIfNeededWithUnhealthyClient() throws Exception { String haGroupName = testName.getMethodName(); - HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient .getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(), haGroupName, zkUrl); @@ -1664,6 +1664,51 @@ public void testLegacyCrrSyncOverwritesPreSeededRole2WhenPeerPresent() throws Ex after.getLeft().getVersion() > seeded.getLeft().getVersion()); } + /** + * Regression for the apurtell PR-2479 lockout concern. Seeds the legacy znode with the + * pre-PHOENIX-7495 JSON shape ({@code zk1}/{@code zk2} keys, no {@code registryType}). Strict + * Jackson on this build rejects the unknown {@code zk1}/{@code zk2} fields, so {@code fromJson} + * returns empty and the sync path takes the {@code existing=null} + {@code stat!=null} branch, + * CAS-overwriting the bytes with a fresh ZK-registry record. + */ + @Test + public void testLegacyCrrSyncMigratesOlderZk1Zk2Record() throws Exception { + String haGroupName = testName.getMethodName(); + + // \\\\ in the Java literal -> \\ in JSON bytes -> \ in the parsed url string. + String legacyJson = String.format("{\"haGroupName\":\"%s\"," + "\"policy\":\"FAILOVER\"," + + "\"zk1\":\"legacy-host-1\\\\:2181::/hbase\"," + "\"role1\":\"ACTIVE\"," + + "\"zk2\":\"legacy-host-2\\\\:2181::/hbase\"," + "\"role2\":\"STANDBY\"," + "\"version\":7}", + haGroupName); + + // Sanity: this build's strict Jackson cannot decode the pre-PHOENIX-7495 shape. + assertFalse("Pre-PHOENIX-7495 JSON must fail to parse on this build", + ClusterRoleRecord.fromJson(legacyJson.getBytes(StandardCharsets.UTF_8)).isPresent()); + + // Seed the legacy znode with the older bytes, then start the client with sync on. + legacyHaAdmin.getCurator().create().creatingParentsIfNeeded().forPath(toPath(haGroupName), + legacyJson.getBytes(StandardCharsets.UTF_8)); + + Configuration conf = legacyCrrConf(/* legacyEnabled */ true, /* periodicSec */ 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + // Migration: existing=null (unparseable) but stat populated -> CAS overwrite path. + Pair migrated = awaitLegacyCrrPresent(haGroupName); + ClusterRoleRecord crr = migrated.getLeft(); + + assertEquals("Migrated record must carry registryType=ZK", ClusterRoleRecord.RegistryType.ZK, + crr.getRegistryType()); + assertEquals("Local role must reflect live consistentHA state (seeded ACTIVE)", + ClusterRoleRecord.ClusterRole.ACTIVE, crr.getRole(formattedZkUrlFor(ClusterType.LOCAL))); + // The seeded record's version=7 is lost because the bytes don't parse; buildDesiredLegacyCrr + // sees existing=null and bases the new version on local.getAdminCRRVersion(). Just assert + // monotonic-from-zero. + assertTrue("Migrated record must have positive version", crr.getVersion() >= 1L); + // ZK stat reflects an overwrite (CAS at stat.version=0 produced stat.version=1). + assertEquals("ZK stat version must reflect overwrite", 1, migrated.getRight().getVersion()); + } + // ---------- Legacy CRR sync test helpers ---------- /** Configuration clone with the legacy CRR flag and reconciliation interval set. */