Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
<cbioportal.version>v6.4.1</cbioportal.version>
<maf.version>1.0.0</maf.version>

<mysql_driver.version>8.0.28</mysql_driver.version>
<mysql_commons_lang3.version>3.13.0</mysql_commons_lang3.version>
<commons_lang3.version>3.13.0</commons_lang3.version>
<clickhouse_jdbc.version>0.6.4</clickhouse_jdbc.version>
<lz4.version>1.8.0</lz4.version>

<commons_dbcp2.version>2.10.0</commons_dbcp2.version>
<commons_io.version>2.14.0</commons_io.version>
Expand All @@ -37,7 +38,7 @@
<easymock.version>4.3</easymock.version>
<junitparams.version>1.1.0</junitparams.version>
<byte_buddy.version>1.18.3</byte_buddy.version>
<testcontainers.version>1.19.8</testcontainers.version>
<testcontainers.version>1.21.4</testcontainers.version>
</properties>

<dependencies>
Expand All @@ -47,12 +48,22 @@
<artifactId>maf</artifactId>
<version>${maf.version}</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${clickhouse_jdbc.version}</version>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>${lz4.version}</version>
</dependency>

<!-- Apache Commons -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${mysql_commons_lang3.version}</version>
<version>${commons_lang3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down Expand Up @@ -170,11 +181,6 @@
<artifactId>byte-buddy</artifactId>
<version>${byte_buddy.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql_driver.version}</version>
</dependency>
<dependency>
<groupId>org.skyscreamer</groupId>
<artifactId>jsonassert</artifactId>
Expand All @@ -183,7 +189,7 @@
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<artifactId>clickhouse</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
Expand Down
220 changes: 220 additions & 0 deletions src/main/java/org/mskcc/cbio/portal/dao/ClickHouseAutoIncrement.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package org.mskcc.cbio.portal.dao;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Sequence manager that keeps counters in memory and persists the high-water mark
* in ClickHouse on shutdown so ids are never reused even if rows are deleted.
*/
public final class ClickHouseAutoIncrement {

private static final Logger LOG = LoggerFactory.getLogger(ClickHouseAutoIncrement.class);

private ClickHouseAutoIncrement() {
}

private static final class SequenceConfig {
private final String tableName;
private final String columnName;

private SequenceConfig(String tableName, String columnName) {
this.tableName = tableName;
this.columnName = columnName;
}
}

private static final Map<String, SequenceConfig> CONFIG = new ConcurrentHashMap<>();
private static final Map<String, AtomicLong> COUNTERS = new ConcurrentHashMap<>();
private static final String SEQUENCE_TABLE = "cbioportal_sequence_state";
private static final AtomicBoolean SEQUENCE_TABLE_READY = new AtomicBoolean(false);
private static final AtomicBoolean SHUTDOWN_HOOK_REGISTERED = new AtomicBoolean(false);

static {
register("seq_reference_genome", "reference_genome", "reference_genome_id");
register("seq_cancer_study", "cancer_study", "cancer_study_id");
register("seq_patient", "patient", "internal_id");
register("seq_sample", "sample", "internal_id");
register("seq_sample_list", "sample_list", "list_id");
register("seq_genetic_entity", "genetic_entity", "id");
register("seq_geneset", "geneset", "id");
register("seq_geneset_hierarchy_node", "geneset_hierarchy_node", "node_id");
register("seq_generic_entity_properties", "generic_entity_properties", "id");
register("seq_genetic_profile", "genetic_profile", "genetic_profile_id");
register("seq_gene_panel", "gene_panel", "internal_id");
register("seq_structural_variant", "structural_variant", "internal_id");
register("seq_mutation_event", "mutation_event", "mutation_event_id");
register("seq_gistic", "gistic", "gistic_roi_id");
register("seq_cna_event", "cna_event", "cna_event_id");
register("seq_copy_number_seg", "copy_number_seg", "seg_id");
register("seq_copy_number_seg_file", "copy_number_seg_file", "seg_file_id");
register("seq_clinical_event", "clinical_event", "clinical_event_id");
registerShutdownHook();
}

private static void register(String sequenceName, String tableName, String columnName) {
CONFIG.put(sequenceName, new SequenceConfig(tableName, columnName.toLowerCase(Locale.ROOT)));
}

private static void registerShutdownHook() {
if (!SHUTDOWN_HOOK_REGISTERED.compareAndSet(false, true)) {
return;
}
Runtime.getRuntime().addShutdownHook(
new Thread(ClickHouseAutoIncrement::persistAllSequences, "clickhouse-sequence-persist"));
}

public static long nextId(String sequenceName) throws DaoException {
SequenceConfig config = CONFIG.get(sequenceName);
if (config == null) {
throw new DaoException("Unknown sequence: " + sequenceName);
}
AtomicLong counter = COUNTERS.get(sequenceName);
if (counter == null) {
synchronized (ClickHouseAutoIncrement.class) {
counter = COUNTERS.get(sequenceName);
if (counter == null) {
long current = initializeCounter(sequenceName, config);
counter = new AtomicLong(current);
COUNTERS.put(sequenceName, counter);
}
}
}
return counter.incrementAndGet();
}

private static long initializeCounter(String sequenceName, SequenceConfig config) throws DaoException {
ensureSequenceTableExists();
long persisted = fetchPersistedMax(sequenceName);
long tableMax = fetchCurrentMax(config);
long current = Math.max(persisted, tableMax);
if (current != persisted) {
persistLastValue(sequenceName, current);
}
return current;
}

private static void ensureSequenceTableExists() throws DaoException {
if (SEQUENCE_TABLE_READY.get()) {
return;
}
synchronized (ClickHouseAutoIncrement.class) {
if (SEQUENCE_TABLE_READY.get()) {
return;
}
Connection con = null;
PreparedStatement stmt = null;
try {
con = JdbcUtil.getDbConnection(ClickHouseAutoIncrement.class);
stmt = con.prepareStatement(
"CREATE TABLE IF NOT EXISTS " + SEQUENCE_TABLE + " ("
+ "sequence_name String, "
+ "last_value UInt64"
+ ") ENGINE = ReplacingMergeTree "
+ "ORDER BY sequence_name");
stmt.execute();
SEQUENCE_TABLE_READY.set(true);
} catch (SQLException ex) {
throw new DaoException(ex);
} finally {
JdbcUtil.closeAll(ClickHouseAutoIncrement.class, con, stmt, null);
}
}
}

private static long fetchPersistedMax(String sequenceName) throws DaoException {
Connection con = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
con = JdbcUtil.getDbConnection(ClickHouseAutoIncrement.class);
stmt = con.prepareStatement(
"SELECT max(last_value) FROM " + SEQUENCE_TABLE + " WHERE sequence_name = ?");
stmt.setString(1, sequenceName);
rs = stmt.executeQuery();
if (rs.next()) {
return rs.getLong(1);
}
return 0L;
} catch (SQLException ex) {
throw new DaoException(ex);
} finally {
JdbcUtil.closeAll(ClickHouseAutoIncrement.class, con, stmt, rs);
}
}

private static long fetchCurrentMax(SequenceConfig config) throws DaoException {
Connection con = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
con = JdbcUtil.getDbConnection(ClickHouseAutoIncrement.class);
stmt = con.prepareStatement("SELECT max(" + config.columnName + ") FROM " + config.tableName);
rs = stmt.executeQuery();
if (rs.next()) {
return rs.getLong(1);
}
return 0L;
} catch (SQLException ex) {
throw new DaoException(ex);
} finally {
JdbcUtil.closeAll(ClickHouseAutoIncrement.class, con, stmt, rs);
}
}

private static void persistLastValue(String sequenceName, long value) throws DaoException {
Connection con = null;
PreparedStatement stmt = null;
try {
con = JdbcUtil.getDbConnection(ClickHouseAutoIncrement.class);
stmt = con.prepareStatement(
"INSERT INTO " + SEQUENCE_TABLE + " (sequence_name, last_value) VALUES (?, ?)");
stmt.setString(1, sequenceName);
stmt.setLong(2, value);
stmt.executeUpdate();
} catch (SQLException ex) {
throw new DaoException(ex);
} finally {
JdbcUtil.closeAll(ClickHouseAutoIncrement.class, con, stmt, null);
}
}

private static void persistAllSequences() {
if (COUNTERS.isEmpty()) {
return;
}
try {
ensureSequenceTableExists();
} catch (DaoException ex) {
LOG.warn("Failed to ensure sequence table during shutdown persistence.", ex);
return;
}
for (Map.Entry<String, AtomicLong> entry : COUNTERS.entrySet()) {
long value = entry.getValue().get();
try {
persistLastValue(entry.getKey(), value);
} catch (DaoException ex) {
LOG.warn(
"Failed to persist sequence {} with value {} during shutdown.",
entry.getKey(),
value,
ex);
}
}
}

public static void resetCounters() {
COUNTERS.clear();
SEQUENCE_TABLE_READY.set(false);
}

}
Loading
Loading