Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1596,6 +1596,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
SqlNodeList propertyList = SqlNodeList.EMPTY;
SqlDistribution distribution = null;
SqlNodeList partitionColumns = SqlNodeList.EMPTY;
SqlIdentifier connection = null;
SqlParserPos pos = startPos;
boolean isColumnsIdentifiersOnly = false;
}
Expand Down Expand Up @@ -1629,6 +1630,10 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
<PARTITIONED> <BY>
partitionColumns = ParenthesizedSimpleIdentifierList()
]
[
<USING> <CONNECTION>
connection = CompoundIdentifier()
]
[
<WITH>
propertyList = Properties()
Expand All @@ -1652,6 +1657,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
partitionColumns,
watermark,
comment,
connection,
tableLike,
isTemporary,
ifNotExists);
Expand All @@ -1660,6 +1666,11 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
<AS>
asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
{
if (connection != null) {
throw SqlUtil.newContextException(
pos,
ParserResource.RESOURCE.usingConnectionWithAsUnsupported());
}
if (replace) {
return new SqlReplaceTableAs(startPos.plus(getPos()),
tableName,
Expand Down Expand Up @@ -1706,6 +1717,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
partitionColumns,
watermark,
comment,
connection,
isTemporary,
ifNotExists);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public SqlReplaceTableAs(
partitionKeyList,
watermark,
comment,
null,
isTemporary,
ifNotExists,
true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class SqlCreateTable extends SqlCreateObject implements ExtendedSqlNode {

private final SqlWatermark watermark;

private final SqlIdentifier connection;

public SqlCreateTable(
SqlParserPos pos,
SqlIdentifier tableName,
Expand All @@ -76,6 +78,7 @@ public SqlCreateTable(
SqlNodeList partitionKeyList,
@Nullable SqlWatermark watermark,
@Nullable SqlCharStringLiteral comment,
@Nullable SqlIdentifier connection,
boolean isTemporary,
boolean ifNotExists) {
this(
Expand All @@ -89,6 +92,7 @@ public SqlCreateTable(
partitionKeyList,
watermark,
comment,
connection,
isTemporary,
ifNotExists,
false);
Expand All @@ -105,6 +109,7 @@ protected SqlCreateTable(
SqlNodeList partitionKeyList,
@Nullable SqlWatermark watermark,
@Nullable SqlCharStringLiteral comment,
@Nullable SqlIdentifier connection,
boolean isTemporary,
boolean ifNotExists,
boolean replace) {
Expand All @@ -117,6 +122,7 @@ protected SqlCreateTable(
this.partitionKeyList =
requireNonNull(partitionKeyList, "partitionKeyList should not be null");
this.watermark = watermark;
this.connection = connection;
}

@Override
Expand All @@ -128,7 +134,8 @@ protected SqlCreateTable(
properties,
partitionKeyList,
watermark,
comment);
comment,
connection);
}

public SqlNodeList getColumnList() {
Expand All @@ -151,6 +158,10 @@ public Optional<SqlWatermark> getWatermark() {
return Optional.ofNullable(watermark);
}

public Optional<SqlIdentifier> getConnection() {
return Optional.ofNullable(connection);
}

@Override
protected String getScope() {
return "TABLE";
Expand Down Expand Up @@ -214,6 +225,11 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
SqlUnparseUtils.unparseComment(comment, true, writer, leftPrec, rightPrec);
SqlUnparseUtils.unparseDistribution(distribution, writer, leftPrec, rightPrec);
SqlUnparseUtils.unparsePartitionKeyList(partitionKeyList, writer, leftPrec, rightPrec);
if (connection != null) {
writer.newlineAndIndent();
writer.keyword("USING CONNECTION");
Comment thread
lihaosky marked this conversation as resolved.
Outdated
connection.unparse(writer, leftPrec, rightPrec);
}
SqlUnparseUtils.unparseProperties(properties, writer, leftPrec, rightPrec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public SqlCreateTableAs(
partitionKeyList,
watermark,
comment,
null,
isTemporary,
ifNotExists,
false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public SqlCreateTableLike(
SqlNodeList partitionKeyList,
@Nullable SqlWatermark watermark,
@Nullable SqlCharStringLiteral comment,
@Nullable SqlIdentifier connection,
SqlTableLike tableLike,
boolean isTemporary,
boolean ifNotExists) {
Expand All @@ -100,6 +101,7 @@ public SqlCreateTableLike(
partitionKeyList,
watermark,
comment,
connection,
isTemporary,
ifNotExists,
false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,8 @@ public interface ParserResource {

@Resources.BaseMessage("DROP TEMPORARY MATERIALIZED TABLE is not supported.")
Resources.ExInst<ParseException> dropTemporaryMaterializedTableUnsupported();

@Resources.BaseMessage(
"USING CONNECTION clause is not supported with CREATE TABLE AS SELECT or REPLACE TABLE AS SELECT statements.")
Resources.ExInst<ParseException> usingConnectionWithAsUnsupported();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,93 @@ void testCreateTable() {
sql(sql).ok(expected);
}

@Test
void testCreateTableUsingConnection() {
final String sql =
"CREATE TABLE orders (\n"
+ " order_id INT,\n"
+ " customer_id INT,\n"
+ " amount DECIMAL(10, 2)\n"
+ ") USING CONNECTION mycat.mydb.mysql_prod\n"
+ "WITH (\n"
+ " 'connector' = 'jdbc',\n"
+ " 'tables' = 'orders'\n"
+ ")";
final String expected =
"CREATE TABLE `ORDERS` (\n"
+ " `ORDER_ID` INTEGER,\n"
+ " `CUSTOMER_ID` INTEGER,\n"
+ " `AMOUNT` DECIMAL(10, 2)\n"
+ ")\n"
+ "USING CONNECTION `MYCAT`.`MYDB`.`MYSQL_PROD`\n"
+ "WITH (\n"
+ " 'connector' = 'jdbc',\n"
+ " 'tables' = 'orders'\n"
+ ")";
sql(sql).ok(expected);
}

@Test
void testCreateTableUsingConnectionWithPartitionAndDistribution() {
final String sql =
"CREATE TABLE tbl1 (\n"
+ " a bigint,\n"
+ " h varchar,\n"
+ " b varchar\n"
+ ")\n"
+ "DISTRIBUTED BY HASH(a) INTO 3 BUCKETS\n"
+ "PARTITIONED BY (a, h)\n"
+ "USING CONNECTION cat1.db1.conn1\n"
+ "WITH (\n"
+ " 'connector' = 'jdbc'\n"
+ ")";
final String expected =
"CREATE TABLE `TBL1` (\n"
+ " `A` BIGINT,\n"
+ " `H` VARCHAR,\n"
+ " `B` VARCHAR\n"
+ ")\n"
+ "DISTRIBUTED BY HASH(`A`) INTO 3 BUCKETS\n"
+ "PARTITIONED BY (`A`, `H`)\n"
+ "USING CONNECTION `CAT1`.`DB1`.`CONN1`\n"
+ "WITH (\n"
+ " 'connector' = 'jdbc'\n"
+ ")";
sql(sql).ok(expected);
}

@Test
void testCreateTableAsWithUsingConnectionFails() {
final String sql =
Comment thread
lihaosky marked this conversation as resolved.
"^CREATE^ TABLE t1\n"
+ "USING CONNECTION cat1.db1.conn1\n"
+ "WITH ('connector' = 'jdbc')\n"
+ "AS SELECT 1 AS a";
sql(sql).fails(
"(?s).*USING CONNECTION clause is not supported with CREATE TABLE AS SELECT or REPLACE TABLE AS SELECT statements.*");
}

@Test
void testCreateTableLikeUsingConnection() {
final String sql =
"CREATE TABLE t1 (\n"
+ " a INT\n"
+ ")\n"
+ "USING CONNECTION cat1.db1.conn1\n"
+ "WITH ('connector' = 'jdbc')\n"
+ "LIKE base_table";
final String expected =
"CREATE TABLE `T1` (\n"
+ " `A` INTEGER\n"
+ ")\n"
+ "USING CONNECTION `CAT1`.`DB1`.`CONN1`\n"
+ "WITH (\n"
+ " 'connector' = 'jdbc'\n"
+ ")\n"
+ "LIKE `BASE_TABLE`";
sql(sql).ok(expected);
}

String buildDistributionInput(final String distributionClause) {
return "CREATE TABLE tbl1 (\n"
+ " a bigint,\n"
Expand Down