diff --git a/.gitignore b/.gitignore index b2b37af6..12b7484d 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ target e2e/results/* *.iml .java-version +.vscode +e2e/avro-tools.jar \ No newline at end of file diff --git a/README.md b/README.md index 8d659edd..5e5e472f 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,8 @@ com.spotify.dbeam.options.JdbcExportPipelineOptions: --nullableArrayItems= Default: false Controls whether array items should be nullable, ignored if arrayMode is 'bytes'. + --excludeColumns= + A comma-separated list of columns to be excluded from the export. ``` #### Input Avro schema file diff --git a/dbeam-core/pom.xml b/dbeam-core/pom.xml index 6e2cd66e..ed2aa95c 100644 --- a/dbeam-core/pom.xml +++ b/dbeam-core/pom.xml @@ -124,4 +124,23 @@ + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.13.0 + + + + com.google.auto.value + auto-value + ${auto-value.version} + + + + + + + diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcExportArgs.java b/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcExportArgs.java index 8209ce2d..5e7c795c 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcExportArgs.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcExportArgs.java @@ -22,9 +22,11 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import java.io.Serializable; import java.sql.Connection; import java.time.Duration; +import java.util.Arrays; import java.util.Optional; import org.apache.avro.Schema; @@ -49,6 +51,17 @@ public abstract class JdbcExportArgs implements Serializable { public abstract Optional inputAvroSchema(); + public abstract Optional> excludedColumns(); + + public static Optional> parseExcludedColumns( + final Optional rawExcludedColumns) { + return rawExcludedColumns.map( + s -> + Arrays.stream(s.split(",")) + .map(String::trim) + .collect(ImmutableSet.toImmutableSet())); + } + @AutoValue.Builder abstract static class Builder { @@ -68,6 +81,8 @@ abstract static class Builder { abstract Builder setInputAvroSchema(Optional inputAvroSchema); + abstract Builder setExcludedColumns(Optional> excludedColumns); + abstract JdbcExportArgs build(); } @@ -82,6 +97,7 @@ static JdbcExportArgs create( Optional.empty(), false, Duration.ofDays(7), + Optional.empty(), Optional.empty()); } @@ -93,7 +109,8 @@ public static JdbcExportArgs create( final Optional avroDoc, final Boolean useAvroLogicalTypes, final Duration exportTimeout, - final Optional inputAvroSchema) { + final Optional inputAvroSchema, + final Optional> excludedColumns) { return new AutoValue_JdbcExportArgs.Builder() .setJdbcAvroOptions(jdbcAvroArgs) .setQueryBuilderArgs(queryBuilderArgs) @@ -103,6 +120,7 @@ public static JdbcExportArgs create( .setUseAvroLogicalTypes(useAvroLogicalTypes) .setExportTimeout(exportTimeout) .setInputAvroSchema(inputAvroSchema) + .setExcludedColumns(excludedColumns) .build(); } diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilder.java b/dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilder.java index b44d2f50..d968086a 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilder.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilder.java @@ -21,7 +21,14 @@ package com.spotify.dbeam.args; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import java.io.Serializable; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -117,18 +124,28 @@ public int hashCode() { private final QueryBase base; private final List whereConditions; private final Optional limitStr; + private final Optional> excludedColumns; + private final Optional splitColumn; private QueryBuilder(final QueryBase base) { this.base = base; this.limitStr = Optional.empty(); this.whereConditions = ImmutableList.of(); + this.excludedColumns = Optional.empty(); + this.splitColumn = Optional.empty(); } private QueryBuilder( - final QueryBase base, final List whereConditions, final Optional limitStr) { + final QueryBase base, + final List whereConditions, + final Optional limitStr, + final Optional> excludedColumns, + final Optional splitColumn) { this.base = base; this.whereConditions = whereConditions; this.limitStr = limitStr; + this.excludedColumns = excludedColumns; + this.splitColumn = splitColumn; } public static QueryBuilder fromTablename(final String tableName) { @@ -144,11 +161,35 @@ public QueryBuilder withPartitionCondition( return new QueryBuilder( this.base, Stream.concat( - this.whereConditions.stream(), - Stream.of( - createSqlPartitionCondition(partitionColumn, startPointIncl, endPointExcl))) + this.whereConditions.stream(), + Stream.of( + createSqlPartitionCondition(partitionColumn, startPointIncl, endPointExcl))) .collect(Collectors.toList()), - this.limitStr); + this.limitStr, + this.excludedColumns, + this.splitColumn); + } + + public QueryBuilder withSplitColumn(final Optional splitColumn) { + return new QueryBuilder( + this.base, this.whereConditions, this.limitStr, this.excludedColumns, splitColumn); + } + + public QueryBuilder withExcludedColumns(final Optional> excludedColumns) { + if (excludedColumns.isPresent() && this.base instanceof UserQueryBase) { + UserQueryBase userQueryBase = (UserQueryBase) this.base; + String newSqlQuery = + rebuildSelectClause(userQueryBase.userSqlQuery, excludedColumns.get(), this.splitColumn); + return new QueryBuilder( + new UserQueryBase(newSqlQuery, userQueryBase.selectClause), + this.whereConditions, + this.limitStr, + excludedColumns, + this.splitColumn); + } else { + return new QueryBuilder( + this.base, this.whereConditions, this.limitStr, excludedColumns, this.splitColumn); + } } private static String createSqlPartitionCondition( @@ -166,12 +207,14 @@ public QueryBuilder withParallelizationCondition( return new QueryBuilder( this.base, Stream.concat( - this.whereConditions.stream(), - Stream.of( - createSqlSplitCondition( - partitionColumn, startPointIncl, endPoint, isEndPointExcl))) + this.whereConditions.stream(), + Stream.of( + createSqlSplitCondition( + partitionColumn, startPointIncl, endPoint, isEndPointExcl))) .collect(Collectors.toList()), - this.limitStr); + this.limitStr, + this.excludedColumns, + this.splitColumn); } private static String createSqlSplitCondition( @@ -205,9 +248,85 @@ private static String removeTrailingSymbols(String sqlQuery) { return sqlQuery.replaceAll(regex, "$1"); } + private static String rebuildSelectClause( + String sqlQuery, ImmutableSet excludedColumns, Optional splitColumn) { + String lowerCaseQuery = sqlQuery.toLowerCase(); + int selectIdx = lowerCaseQuery.indexOf("select"); + int fromIdx = lowerCaseQuery.indexOf("from"); + + if (selectIdx == -1 || fromIdx == -1 || selectIdx > fromIdx) { + // Cannot parse, return original query + return sqlQuery; + } + + String selectClause = sqlQuery.substring(selectIdx + "select".length(), fromIdx).trim(); + List columns = splitColumns(selectClause); + List newColumns = + columns.stream() + .map(String::trim) + .filter( + column -> { + if (splitColumn.isPresent() && isColumn(column, splitColumn.get())) { + return true; + } + return excludedColumns.stream().noneMatch(excluded -> isColumn(column, excluded)); + }) + .collect(Collectors.toList()); + + if (splitColumn.isPresent()) { + boolean exists = newColumns.stream().anyMatch(c -> isColumn(c, splitColumn.get())); + if (!exists) { + newColumns.add(splitColumn.get()); + } + } + + if (newColumns.isEmpty()) { + return "SELECT * " + sqlQuery.substring(fromIdx); + } else { + return "SELECT " + String.join(", ", newColumns) + " " + sqlQuery.substring(fromIdx); + } + } + + private static List splitColumns(String selectClause) { + List columns = new ArrayList<>(); + int parenDepth = 0; + int start = 0; + boolean inQuote = false; + for (int i = 0; i < selectClause.length(); i++) { + char c = selectClause.charAt(i); + if (c == '\'' && (i == 0 || selectClause.charAt(i - 1) != '\\')) { + inQuote = !inQuote; + } else if (!inQuote) { + if (c == '(') { + parenDepth++; + } else if (c == ')') { + parenDepth--; + } else if (c == ',' && parenDepth == 0) { + columns.add(selectClause.substring(start, i)); + start = i + 1; + } + } + } + columns.add(selectClause.substring(start)); + return columns; + } + + private static boolean isColumn(String columnDefinition, String columnName) { + String trimmed = columnDefinition.trim(); + if (trimmed.equalsIgnoreCase(columnName)) { + return true; + } + // Check for alias + return trimmed.matches("(?i).*\\s+(AS\\s+)?\\Q" + columnName + "\\E$"); + } + public QueryBuilder withLimit(long limit) { return new QueryBuilder( - this.base, this.whereConditions, Optional.of(String.format(" LIMIT %d", limit))); + this.base, + this.whereConditions, + Optional.of(String.format(" LIMIT %d", limit)), + this.excludedColumns, + this.splitColumn); } @Override @@ -232,6 +351,51 @@ public int hashCode() { return base.hashCode(); } + public QueryBuilder resolveSelect(final Connection connection) throws SQLException { + if (this.excludedColumns.isPresent()) { + String queryToCheck = this.base.getBaseSql() + " AND 1=0"; + List columns = getColumnsFromQuery(connection, queryToCheck); + List filteredColumns = + columns.stream() + .filter(c -> this.excludedColumns.get().stream() + .noneMatch(excluded -> excluded.equalsIgnoreCase(c))) + .collect(Collectors.toList()); + + if (filteredColumns.isEmpty()) { + throw new SQLException("All columns excluded for query: " + queryToCheck); + } + + String selectClause = "SELECT " + String.join(", ", filteredColumns); + return new QueryBuilder( + this.base.withSelect(selectClause), + this.whereConditions, + this.limitStr, + this.excludedColumns, + this.splitColumn); + } + return this; + } + + private List getColumnsFromQuery(Connection connection, String query) + throws SQLException { + List columns = new ArrayList<>(); + try (Statement st = connection.createStatement()) { + try (ResultSet rs = st.executeQuery(query)) { + ResultSetMetaData meta = rs.getMetaData(); + for (int i = 1; i <= meta.getColumnCount(); i++) { + final String columnName; + if (meta.getColumnName(i).isEmpty()) { + columnName = meta.getColumnLabel(i); + } else { + columnName = meta.getColumnName(i); + } + columns.add(columnName); + } + } + } + return columns; + } + /** * Generates a new query to get MIN/MAX values for splitColumn. * @@ -248,6 +412,11 @@ public QueryBuilder generateQueryToGetLimitsOfSplitColumn( "SELECT MIN(%s) as %s, MAX(%s) as %s", splitColumn, minSplitColumnName, splitColumn, maxSplitColumnName); - return new QueryBuilder(base.withSelect(selectMinMax), this.whereConditions, this.limitStr); + return new QueryBuilder( + base.withSelect(selectMinMax), + this.whereConditions, + this.limitStr, + this.excludedColumns, + this.splitColumn); } } diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilderArgs.java b/dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilderArgs.java index 98f3444a..b7540c38 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilderArgs.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilderArgs.java @@ -25,6 +25,7 @@ import static com.spotify.dbeam.args.ParallelQueryBuilder.queriesForBounds; import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import java.io.Serializable; import java.sql.Connection; @@ -57,6 +58,8 @@ public abstract class QueryBuilderArgs implements Serializable { public abstract Optional queryParallelism(); + public abstract Optional> excludedColumns(); + public abstract Builder builder(); @AutoValue.Builder @@ -86,6 +89,8 @@ public abstract static class Builder { public abstract Builder setQueryParallelism(Optional queryParallelism); + public abstract Builder setExcludedColumns(Optional> excludedColumns); + public abstract QueryBuilderArgs build(); } @@ -122,6 +127,13 @@ public String sqlQueryWithLimitOne() { */ public List buildQueries(final Connection connection) throws SQLException { QueryBuilder queryBuilder = this.baseSqlQuery(); + if (this.splitColumn().isPresent()) { + queryBuilder = queryBuilder.withSplitColumn(this.splitColumn()); + } + if (this.excludedColumns().isPresent()) { + queryBuilder = queryBuilder.withExcludedColumns(this.excludedColumns()); + queryBuilder = queryBuilder.resolveSelect(connection); + } if (this.partitionColumn().isPresent() && this.partition().isPresent()) { queryBuilder = configurePartitionCondition( diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java index 92e69b78..53542d6f 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java @@ -139,6 +139,7 @@ private static class JdbcAvroWriter extends FileBasedSink.Writer { private Connection connection; private JdbcAvroMetering metering; private CountingOutputStream countingOutputStream; + private Schema schema; JdbcAvroWriter( FileBasedSink.WriteOperation writeOperation, @@ -160,7 +161,7 @@ protected void prepareWrite(final WritableByteChannel channel) throws Exception LOGGER.info("jdbcavroio : Preparing write..."); connection = jdbcAvroArgs.jdbcConnectionConfiguration().createConnection(); final Void destination = getDestination(); - final Schema schema = dynamicDestinations.getSchema(destination); + this.schema = dynamicDestinations.getSchema(destination); dataFileWriter = new DataFileWriter<>(new GenericDatumWriter(schema)) .setCodec(jdbcAvroArgs.getCodecFactory()) @@ -206,6 +207,7 @@ public void write(final String query) throws Exception { try (ResultSet resultSet = executeQuery(query)) { metering.startWriteMeter(); final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet, + this.schema, this.jdbcAvroArgs.arrayMode(), this.jdbcAvroArgs.nullableArrayItems()); while (resultSet.next()) { dataFileWriter.appendEncoded(converter.convertResultSetIntoAvroBytes()); diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecordConverter.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecordConverter.java index 576d2b4a..005b5dc1 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecordConverter.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecordConverter.java @@ -27,52 +27,60 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.UUID; +import org.apache.avro.Schema; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; public class JdbcAvroRecordConverter { private final JdbcAvroRecord.SqlFunction[] mappings; - private final int columnCount; private final ResultSet resultSet; private final EncoderFactory encoderFactory = EncoderFactory.get(); private final boolean nullableArrayItems; + private final Schema schema; public JdbcAvroRecordConverter( final JdbcAvroRecord.SqlFunction[] mappings, - final int columnCount, final ResultSet resultSet, - final boolean nullableArrayItems) { + final boolean nullableArrayItems, + final Schema schema) { this.mappings = mappings; - this.columnCount = columnCount; this.resultSet = resultSet; this.nullableArrayItems = nullableArrayItems; + this.schema = schema; } public static JdbcAvroRecordConverter create(final ResultSet resultSet, + final Schema schema, final String arrayMode, final boolean nullableArrayItems) throws SQLException { return new JdbcAvroRecordConverter( - computeAllMappings(resultSet, arrayMode), - resultSet.getMetaData().getColumnCount(), + computeAllMappings(resultSet, schema, arrayMode), resultSet, - nullableArrayItems); + nullableArrayItems, + schema); } @SuppressWarnings("unchecked") static JdbcAvroRecord.SqlFunction[] computeAllMappings( - final ResultSet resultSet, final String arrayMode) + final ResultSet resultSet, final Schema schema, final String arrayMode) throws SQLException { final ResultSetMetaData meta = resultSet.getMetaData(); - final int columnCount = meta.getColumnCount(); + final int columnCount = schema.getFields().size(); final JdbcAvroRecord.SqlFunction[] mappings = (JdbcAvroRecord.SqlFunction[]) - new JdbcAvroRecord.SqlFunction[columnCount + 1]; + new JdbcAvroRecord.SqlFunction[columnCount]; - for (int i = 1; i <= columnCount; i++) { - mappings[i] = JdbcAvroRecord.computeMapping(meta, i, arrayMode); + for (int i = 0; i < columnCount; i++) { + Schema.Field field = schema.getFields().get(i); + String columnName = field.getProp("columnName"); + if (columnName == null) { + columnName = field.name(); + } + int columnIndex = resultSet.findColumn(columnName); + mappings[i] = JdbcAvroRecord.computeMapping(meta, columnIndex, arrayMode); } return mappings; } @@ -100,16 +108,16 @@ byte[] getBufffer() { * @throws IOException in case binary encoding fails */ public ByteBuffer convertResultSetIntoAvroBytes() throws SQLException, IOException { - final MyByteArrayOutputStream out = new MyByteArrayOutputStream(columnCount * 64); + final MyByteArrayOutputStream out = new MyByteArrayOutputStream(mappings.length * 64); binaryEncoder = encoderFactory.directBinaryEncoder(out, binaryEncoder); - for (int i = 1; i <= columnCount; i++) { + for (int i = 0; i < mappings.length; i++) { final Object value = mappings[i].apply(resultSet); if (value == null || resultSet.wasNull()) { binaryEncoder.writeIndex(0); binaryEncoder.writeNull(); } else { binaryEncoder.writeIndex(1); - writeValue(value, resultSet.getMetaData().getColumnName(i), binaryEncoder); + writeValue(value, schema.getFields().get(i).name(), binaryEncoder); } } binaryEncoder.flush(); diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java index 9d2eefee..0d80b034 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java @@ -50,6 +50,7 @@ import static java.sql.Types.VARBINARY; import static java.sql.Types.VARCHAR; +import com.google.common.collect.ImmutableSet; import com.spotify.dbeam.args.QueryBuilderArgs; import com.spotify.dbeam.options.ArrayHandlingMode; import java.sql.Array; @@ -95,9 +96,14 @@ public static Schema createSchemaByReadingOneRow( avroDoc, useLogicalTypes, arrayMode, - nullableArrayItems); - LOGGER.info("Schema created successfully. useLogicalTypes={}, arrayMode={}, " - + "Generated schema: {}", useLogicalTypes, arrayMode, schema.toString()); + nullableArrayItems, + queryBuilderArgs.excludedColumns()); + LOGGER.info( + "Schema created successfully. useLogicalTypes={}, arrayMode={}, " + + "Generated schema: {}", + useLogicalTypes, + arrayMode, + schema.toString()); return schema; } } @@ -110,7 +116,8 @@ public static Schema createAvroSchema( final String avroDoc, final boolean useLogicalTypes, final String arrayMode, - final boolean nullableArrayItems) + final boolean nullableArrayItems, + final Optional> excludedColumns) throws SQLException { final ResultSetMetaData meta = resultSet.getMetaData(); @@ -124,13 +131,18 @@ public static Schema createAvroSchema( .prop("tableName", tableName) .prop("connectionUrl", connectionUrl) .fields(); - return createAvroFields(resultSet, builder, useLogicalTypes, arrayMode, nullableArrayItems) + return createAvroFields( + resultSet, + builder, + useLogicalTypes, + arrayMode, + nullableArrayItems, + excludedColumns.orElse(ImmutableSet.of())) .endRecord(); } static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLException { final String defaultTableName = "no_table_name"; - for (int i = 1; i <= meta.getColumnCount(); i++) { String metaTableName = meta.getTableName(i); if (metaTableName != null && !metaTableName.isEmpty()) { @@ -145,13 +157,13 @@ private static SchemaBuilder.FieldAssembler createAvroFields( final SchemaBuilder.FieldAssembler builder, final boolean useLogicalTypes, final String arrayMode, - final boolean nullableArrayItems) + final boolean nullableArrayItems, + final ImmutableSet excludedColumns) throws SQLException { ResultSetMetaData meta = resultSet.getMetaData(); for (int i = 1; i <= meta.getColumnCount(); i++) { - final String columnName; if (meta.getColumnName(i).isEmpty()) { columnName = meta.getColumnLabel(i); @@ -159,6 +171,10 @@ private static SchemaBuilder.FieldAssembler createAvroFields( columnName = meta.getColumnName(i); } + if (excludedColumns.stream().anyMatch(columnName::equalsIgnoreCase)) { + continue; + } + final int columnType = meta.getColumnType(i); final String typeName = JDBCType.valueOf(columnType).getName(); final String columnClassName = meta.getColumnClassName(i); diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java b/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java index 7b3c725c..33ea1ad5 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java @@ -81,7 +81,9 @@ public static JdbcExportArgs fromPipelineOptions(final PipelineOptions options) Optional.ofNullable(exportOptions.getAvroDoc()), exportOptions.isUseAvroLogicalTypes(), Duration.parse(exportOptions.getExportTimeout()), - BeamJdbcAvroSchema.parseOptionalInputAvroSchemaFile(exportOptions.getAvroSchemaFilePath())); + BeamJdbcAvroSchema.parseOptionalInputAvroSchemaFile(exportOptions.getAvroSchemaFilePath()), + JdbcExportArgs.parseExcludedColumns( + Optional.ofNullable(exportOptions.getExcludeColumns()))); } public static QueryBuilderArgs createQueryArgs(final JdbcExportPipelineOptions options) @@ -129,6 +131,9 @@ public static QueryBuilderArgs createQueryArgs(final JdbcExportPipelineOptions o .setPartitionPeriod(partitionPeriod) .setSplitColumn(splitColumn) .setQueryParallelism(queryParallelism) + .setExcludedColumns( + JdbcExportArgs.parseExcludedColumns( + Optional.ofNullable(options.getExcludeColumns()))) .build(); } diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportPipelineOptions.java b/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportPipelineOptions.java index eb546529..0cf31ce2 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportPipelineOptions.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportPipelineOptions.java @@ -149,4 +149,9 @@ public interface JdbcExportPipelineOptions extends DBeamPipelineOptions { Long getMinRows(); void setMinRows(Long value); + + @Description("A comma-separated list of columns to be excluded from the export.") + String getExcludeColumns(); + + void setExcludeColumns(String value); } diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/TestHelper.java b/dbeam-core/src/test/java/com/spotify/dbeam/TestHelper.java index 58985902..ea2a23da 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/TestHelper.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/TestHelper.java @@ -74,6 +74,7 @@ public static void mockArrayColumn(ResultSetMetaData meta, ResultSet resultSet, throws SQLException { mockResultSetMeta(meta, columnIdx, Types.ARRAY, columnName, "java.sql.Array", columnTypeName); + when(resultSet.findColumn(columnName)).thenReturn(columnIdx); Array res1; if (array1 == null) { res1 = null; diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/args/JdbcExportOptionsTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/args/JdbcExportOptionsTest.java index d6a0faf4..a9288208 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/args/JdbcExportOptionsTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/args/JdbcExportOptionsTest.java @@ -368,4 +368,16 @@ public void shouldFailOnNegativeQueryParallelism() throws IOException, ClassNotF "--connectionUrl=jdbc:postgresql://some_db --table=some_table " + "--password=secret --queryParallelism=-5 --splitColumn=id"); } + + @Test + public void shouldParseExcludedColumns() throws IOException, ClassNotFoundException { + final JdbcExportArgs options = + optionsFromArgs( + "--connectionUrl=jdbc:postgresql://some_db --table=some_table " + + "--password=secret --excludeColumns=col1,col2"); + + Assert.assertEquals( + Optional.of(com.google.common.collect.ImmutableSet.of("col1", "col2")), + options.excludedColumns()); + } } diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/args/QueryBuilderArgsTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/args/QueryBuilderArgsTest.java index 0012770a..5c78a4d2 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/args/QueryBuilderArgsTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/args/QueryBuilderArgsTest.java @@ -291,6 +291,41 @@ public void shouldCreateParallelQueriesWithPartitionColumn() throws IOException, actual.buildQueries(connection)); } + @Test + public void shouldConfigureExcludedColumnsWithTable() throws IOException, SQLException { + final QueryBuilderArgs actual = + parseOptions( + "--connectionUrl=jdbc:postgresql://some_db --table=COFFEES " + + "--excludeColumns=COF_NAME"); + + String query = actual.buildQueries(connection).get(0); + Assert.assertTrue(query.startsWith("SELECT ")); + Assert.assertFalse(query.contains("*")); + Assert.assertFalse(query.contains("COF_NAME")); + Assert.assertTrue(query.contains("SUP_ID")); + Assert.assertTrue(query.contains("PRICE")); + } + + @Test + public void shouldCreateQueriesWithExplicitSelectAndExcludeColumnsAndMissingSplitColumn() + throws IOException, SQLException { + Path sqlPath = + TestHelper.createTmpDirPath("jdbc-export-args-test").resolve("explicit_select_exclude.sql"); + Files.write( + sqlPath, + "SELECT COF_NAME FROM COFFEES".getBytes(StandardCharsets.UTF_8)); + + final QueryBuilderArgs actual = + parseOptions( + String.format( + "--connectionUrl=jdbc:postgresql://some_db " + + "--sqlFile=%s --splitColumn=TOTAL --queryParallelism=5 " + + "--excludeColumns=COF_NAME", + sqlPath.toString())); + + actual.buildQueries(connection); + } + private QueryBuilderArgs parseOptions(String cmdLineArgs) throws IOException { JdbcExportPipelineOptions opts = commandLineToOptions(cmdLineArgs); return JdbcExportArgsFactory.createQueryArgs(opts); diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/args/QueryBuilderTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/args/QueryBuilderTest.java index 5abf0687..721209bc 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/args/QueryBuilderTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/args/QueryBuilderTest.java @@ -210,6 +210,78 @@ public void testItGeneratesQueryForLimits() { Assert.assertEquals(expected, actual); } + + @Test + public void testTableNameWithExcludedColumnsShouldStillUseSelectStar() { + final com.google.common.collect.ImmutableSet excludedColumns = + com.google.common.collect.ImmutableSet.of("col1"); + final QueryBuilder wrapper = + QueryBuilder.fromTablename("some_table") + .withExcludedColumns(java.util.Optional.of(excludedColumns)); + + final String expected = "SELECT * FROM some_table WHERE 1=1"; + + Assert.assertEquals(expected, wrapper.build()); + } + + @Test + public void testRawSqlWithExcludedColumnShouldRemoveColumns() { + final com.google.common.collect.ImmutableSet excludedColumns = + com.google.common.collect.ImmutableSet.of("col1"); + final QueryBuilder wrapper = + QueryBuilder.fromSqlQuery("SELECT col1, col2 FROM some_table") + .withExcludedColumns(java.util.Optional.of(excludedColumns)); + + final String expected = + "SELECT * FROM (SELECT col2 FROM some_table) as user_sql_query WHERE 1=1"; + + Assert.assertEquals(expected, wrapper.build()); + } + + @Test + public void testRawSqlWithFunctionAndAliasExcluded() { + final com.google.common.collect.ImmutableSet excludedColumns = + com.google.common.collect.ImmutableSet.of("PSEUDO_PARTITION_ID"); + + final QueryBuilder wrapper = + QueryBuilder.fromSqlQuery( + "SELECT t.*, MOD(id,2) AS PSEUDO_PARTITION_ID FROM demo_table t") + .withSplitColumn(java.util.Optional.of("PSEUDO_PARTITION_ID")) + .withExcludedColumns(java.util.Optional.of(excludedColumns)); + + final String expected = + "SELECT * FROM (SELECT t.*, MOD(id,2) AS PSEUDO_PARTITION_ID FROM demo_table t) " + + "as user_sql_query WHERE 1=1"; + + Assert.assertEquals(expected, wrapper.build()); + } + + @Test + public void testRawSqlWithCaseInsensitiveExcludedColumn() { + final com.google.common.collect.ImmutableSet excludedColumns = + com.google.common.collect.ImmutableSet.of("COL1"); + final QueryBuilder wrapper = + QueryBuilder.fromSqlQuery("SELECT col1, col2 FROM some_table") + .withExcludedColumns(java.util.Optional.of(excludedColumns)); + + final String expected = + "SELECT * FROM (SELECT col2 FROM some_table) as user_sql_query WHERE 1=1"; + + Assert.assertEquals(expected, wrapper.build()); + } + + @Test + public void testRawSqlWithCaseInsensitiveSplitColumn() { + final QueryBuilder wrapper = + QueryBuilder.fromSqlQuery("SELECT col1, col2 FROM some_table") + .withSplitColumn(java.util.Optional.of("COL1")); + + final String expected = + "SELECT * FROM (SELECT col1, col2 FROM some_table) as user_sql_query WHERE 1=1"; + + Assert.assertEquals(expected, wrapper.build()); + } + private void execAndCompare(String rawInput, String expected) { final String actual = QueryBuilder.fromSqlQuery(rawInput).build(); diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java index f9906186..14b4f73c 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java @@ -196,8 +196,9 @@ public void shouldEncodeResultSetToValidAvro() final Schema schema = JdbcAvroSchema.createAvroSchema( rs, "dbeam_generated", "connection", Optional.empty(), "doc", - false, arrayMode, false); - final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(rs, arrayMode, false); + false, arrayMode, false, Optional.empty()); + final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(rs, schema, + arrayMode, false); final DataFileWriter dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<>(schema)); final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroSchemaTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroSchemaTest.java index f981c16f..8b0fd3a3 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroSchemaTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroSchemaTest.java @@ -196,7 +196,7 @@ private Schema createAvroSchemaForSingleField( Schema avroSchema = JdbcAvroSchema.createAvroSchema( resultSet, "namespace1", "url1", Optional.empty(), "doc1", useLogicalTypes, - ArrayHandlingMode.TypedMetaFromFirstRow, false); + ArrayHandlingMode.TypedMetaFromFirstRow, false, Optional.empty()); return avroSchema.getField("column1").schema().getTypes().get(COLUMN_NUM); } @@ -221,4 +221,36 @@ private ResultSet buildMockResultSet(final int inputColumnType, when(resultSet.getMetaData()).thenReturn(meta); return resultSet; } + + @Test + public void shouldExcludeColumnsInCreateAvroSchema() throws SQLException { + final ResultSetMetaData meta = Mockito.mock(ResultSetMetaData.class); + when(meta.getColumnCount()).thenReturn(2); + when(meta.getTableName(1)).thenReturn("test_table"); + when(meta.getColumnName(1)).thenReturn("column1"); + when(meta.getColumnType(1)).thenReturn(Types.VARCHAR); + when(meta.getColumnClassName(1)).thenReturn("java.lang.String"); + when(meta.getTableName(2)).thenReturn("test_table"); + when(meta.getColumnName(2)).thenReturn("column2"); + when(meta.getColumnType(2)).thenReturn(Types.INTEGER); + when(meta.getColumnClassName(2)).thenReturn("java.lang.Integer"); + + final ResultSet resultSet = Mockito.mock(ResultSet.class); + when(resultSet.getMetaData()).thenReturn(meta); + + Schema avroSchema = + JdbcAvroSchema.createAvroSchema( + resultSet, + "namespace1", + "url1", + Optional.empty(), + "doc1", + false, + ArrayHandlingMode.TypedMetaFromFirstRow, + false, + Optional.of(com.google.common.collect.ImmutableSet.of("column1"))); + + Assert.assertNull(avroSchema.getField("column1")); + Assert.assertNotNull(avroSchema.getField("column2")); + } } diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/avro/PostgresJdbcAvroTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/avro/PostgresJdbcAvroTest.java index 5f1d8239..e4f9a8d6 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/avro/PostgresJdbcAvroTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/avro/PostgresJdbcAvroTest.java @@ -103,17 +103,18 @@ public void shouldEncodeUUIDValues() throws SQLException, IOException { final ResultSet resultSet = Mockito.mock(ResultSet.class); when(resultSet.getMetaData()).thenReturn(meta); + when(resultSet.findColumn("uuid_field")).thenReturn(1); final UUID uuidExpected = UUID.randomUUID(); when(resultSet.getObject(1)).thenReturn(uuidExpected); TestHelper.mockArrayColumn(meta, resultSet, 2, "array_field", "_uuid", Types.OTHER, "uuid", new UUID[] {uuidExpected}); when(resultSet.isFirst()).thenReturn(true); - + String arrayMode = ArrayHandlingMode.TypedMetaFromFirstRow; final Schema schema = JdbcAvroSchema.createAvroSchema(resultSet, "ns", "conn_url", - Optional.empty(), "doc", true, arrayMode, false); - final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create( - resultSet, arrayMode, false); + Optional.empty(), "doc", true, arrayMode, false, Optional.empty()); + final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet, schema, + arrayMode, false); GenericRecord actualRecord = bytesToGenericRecords(schema, converter.convertResultSetIntoAvroBytes())[0]; @@ -132,6 +133,8 @@ public void shouldEncodeStringAndOtherValues() throws SQLException, IOException final ResultSet resultSet = Mockito.mock(ResultSet.class); when(resultSet.getMetaData()).thenReturn(meta); + when(resultSet.findColumn("text_field")).thenReturn(1); + when(resultSet.findColumn("other_field")).thenReturn(2); when(resultSet.getString(1)).thenReturn("some_text_42"); when(resultSet.getString(2)).thenReturn("some_other_42"); @@ -145,9 +148,9 @@ public void shouldEncodeStringAndOtherValues() throws SQLException, IOException String arrayMode = ArrayHandlingMode.TypedMetaFromFirstRow; final Schema schema = JdbcAvroSchema.createAvroSchema(resultSet, "ns", "conn_url", - Optional.empty(), "doc", true, arrayMode, false); - final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet, arrayMode, - false); + Optional.empty(), "doc", true, arrayMode, false, Optional.empty()); + final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet, schema, + arrayMode, false); GenericRecord actualRecord = bytesToGenericRecords(schema, converter.convertResultSetIntoAvroBytes())[0]; @@ -165,12 +168,14 @@ public void shouldThrowOnArrayWithNulls() throws SQLException, IOException { TestHelper.mockResultSetMeta(meta, 1, Types.ARRAY, "array_field", "java.sql.Array", "_uuid"); final ResultSet resultSet = Mockito.mock(ResultSet.class); when(resultSet.getMetaData()).thenReturn(meta); + when(resultSet.findColumn("array_field")).thenReturn(1); when(resultSet.getArray(1)).thenReturn(null); when(resultSet.isFirst()).thenReturn(true); Assert.assertThrows(RuntimeException.class, () -> JdbcAvroSchema.createAvroSchema(resultSet, "ns", "conn_url", - Optional.empty(), "doc", true, ArrayHandlingMode.TypedMetaFromFirstRow, false)); + Optional.empty(), "doc", true, ArrayHandlingMode.TypedMetaFromFirstRow, false, + Optional.empty())); } @Test @@ -180,6 +185,7 @@ public void shouldHandleArrayWithNullsUsingArrayAsBytes() throws SQLException, I TestHelper.mockResultSetMeta(meta, 1, Types.ARRAY, "array_field", "java.sql.Array", "_uuid"); final ResultSet resultSet = Mockito.mock(ResultSet.class); when(resultSet.getMetaData()).thenReturn(meta); + when(resultSet.findColumn("array_field")).thenReturn(1); when(resultSet.getArray(1)).thenReturn(null); byte[] expectedValue = new byte[] {1, 2, 3}; when(resultSet.getBytes(1)).thenReturn(expectedValue); @@ -187,9 +193,9 @@ public void shouldHandleArrayWithNullsUsingArrayAsBytes() throws SQLException, I String arrayMode = ArrayHandlingMode.Bytes; final Schema schema = JdbcAvroSchema.createAvroSchema(resultSet, "ns", "conn_url", - Optional.empty(), "doc", true, arrayMode, false); - final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet, arrayMode, - false); + Optional.empty(), "doc", true, arrayMode, false, Optional.empty()); + final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet, schema, + arrayMode, false); GenericRecord actualRecord = bytesToGenericRecords(schema, converter.convertResultSetIntoAvroBytes())[0]; @@ -223,9 +229,9 @@ public void shouldHandleArrayWithNullsWithoutReadingFirstRow() throws SQLExcepti String arrayMode = ArrayHandlingMode.TypedMetaPostgres; final Schema schema = JdbcAvroSchema.createAvroSchema(resultSet, "ns", "conn_url", - Optional.empty(), "doc", true, arrayMode, false); - final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet, arrayMode, - false); + Optional.empty(), "doc", true, arrayMode, false, Optional.empty()); + final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet, schema, + arrayMode, false); GenericRecord[] actualRecords = bytesToGenericRecords(schema, converter.convertResultSetIntoAvroBytes(), converter.convertResultSetIntoAvroBytes()); @@ -264,9 +270,9 @@ public void shouldHandleArrayWithNullsIfEnabled() throws SQLException, IOExcepti boolean nullableArrayItems = true; final Schema schema = JdbcAvroSchema.createAvroSchema(resultSet, "ns", "conn_url", - Optional.empty(), "doc", true, arrayMode, nullableArrayItems); - final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet, arrayMode, - nullableArrayItems); + Optional.empty(), "doc", true, arrayMode, nullableArrayItems, Optional.empty()); + final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet, schema, + arrayMode, nullableArrayItems); GenericRecord actualRecord = bytesToGenericRecords(schema, converter.convertResultSetIntoAvroBytes(), converter.convertResultSetIntoAvroBytes())[0]; @@ -296,8 +302,15 @@ public void shouldThrowOnArrayWithNullsIfDisabled() throws SQLException { String arrayMode = ArrayHandlingMode.TypedMetaPostgres; boolean nullableArrayItems = false; - final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet, arrayMode, - nullableArrayItems); + // Schema needs to be mocked or created correctly for this test if possible, + // or passing null if converter doesn't use it before method call + // But converter uses it in constructor/create. + // Let's create a dummy schema or proper one. + final Schema schema = JdbcAvroSchema.createAvroSchema(resultSet, "ns", "conn_url", + Optional.empty(), "doc", true, arrayMode, nullableArrayItems, Optional.empty()); + + final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet, schema, + arrayMode, nullableArrayItems); RuntimeException thrown = Assert.assertThrows(RuntimeException.class, () -> converter.convertResultSetIntoAvroBytes()); Assert.assertEquals("Array item is null in column 'array_field_varchar', use " @@ -318,8 +331,11 @@ public void shouldThrowOnUnknownType() throws SQLException { String arrayMode = ArrayHandlingMode.TypedMetaPostgres; boolean nullableArrayItems = false; - final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet, arrayMode, - nullableArrayItems); + final Schema schema = JdbcAvroSchema.createAvroSchema(resultSet, "ns", "conn_url", + Optional.empty(), "doc", true, arrayMode, nullableArrayItems, Optional.empty()); + + final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet, schema, + arrayMode, nullableArrayItems); RuntimeException thrown = Assert.assertThrows(RuntimeException.class, () -> converter.convertResultSetIntoAvroBytes()); Assert.assertEquals("Value of type class java.io.File in column 'invalid_array' is not " @@ -342,7 +358,7 @@ public void shouldThrowOnInvalidArrayColumnTypeName() throws SQLException { RuntimeException thrown = Assert.assertThrows(RuntimeException.class, () -> JdbcAvroSchema.createAvroSchema(resultSet, "ns", "conn_url", - Optional.empty(), "doc", true, arrayMode, nullableArrayItems)); + Optional.empty(), "doc", true, arrayMode, nullableArrayItems, Optional.empty())); Assert.assertEquals("columnName=array_field_text columnTypeName=text should start with '_'", thrown.getMessage()); } @@ -363,7 +379,7 @@ public void shouldThrowOnNotSupportedArrayColumnTypeName() throws SQLException { RuntimeException thrown = Assert.assertThrows(RuntimeException.class, () -> JdbcAvroSchema.createAvroSchema(resultSet, "ns", "conn_url", - Optional.empty(), "doc", true, arrayMode, nullableArrayItems)); + Optional.empty(), "doc", true, arrayMode, nullableArrayItems, Optional.empty())); Assert.assertEquals( "columnName=array_field_text Postgres type 'not_supported' is not supported", thrown.getMessage()); diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/jobs/PsqlReplicationCheckTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/PsqlReplicationCheckTest.java index b5491463..6eea781f 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/jobs/PsqlReplicationCheckTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/PsqlReplicationCheckTest.java @@ -45,6 +45,7 @@ private static JdbcExportArgs createArgs(String url, QueryBuilderArgs queryBuild Optional.empty(), false, Duration.ZERO, + Optional.empty(), Optional.empty()); } diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/options/InputAvroSchemaTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/options/InputAvroSchemaTest.java index 1b03dcc9..1627c2a0 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/options/InputAvroSchemaTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/options/InputAvroSchemaTest.java @@ -20,7 +20,6 @@ package com.spotify.dbeam.options; -import com.spotify.dbeam.args.QueryBuilderArgs; import com.spotify.dbeam.args.QueryBuilderArgsTest; import com.spotify.dbeam.avro.BeamJdbcAvroSchema; import java.io.File; @@ -206,14 +205,4 @@ public void checkEmptyCommandLineArgIsParsedAsOptions() throws IOException, SQLE Assert.assertNull(options.getAvroSchemaFilePath()); } - - private QueryBuilderArgs pareOptions(String cmdLineArgs) throws IOException { - PipelineOptionsFactory.register(JdbcExportPipelineOptions.class); - final JdbcExportPipelineOptions opts = - PipelineOptionsFactory.fromArgs(cmdLineArgs.split(" ")) - .withValidation() - .create() - .as(JdbcExportPipelineOptions.class); - return JdbcExportArgsFactory.createQueryArgs(opts); - } } diff --git a/pom.xml b/pom.xml index b38ac1ae..c6924849 100644 --- a/pom.xml +++ b/pom.xml @@ -232,6 +232,16 @@ threetenbp ${threetenbp.version} + + net.bytebuddy + byte-buddy + 1.17.5 + + + net.bytebuddy + byte-buddy-agent + 1.17.5 + io.opencensus