Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private boolean isWildcard(final NamedExpression ex) {
return false;
}
final NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
return expr.getPath().contains(SchemaPath.DYNAMIC_STAR);
return StarColumnHelper.isStarColumn(expr.getPath());
}

private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Iterator<Prel> iterator() {

@Override
public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X value) throws E {
return visitor.visitPrel(this, value);
return visitor.visitUnnest(this, value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.planner.physical.ScreenPrel;
import org.apache.drill.exec.planner.physical.UnnestPrel;
import org.apache.drill.exec.planner.physical.WriterPrel;

public class BasePrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> implements PrelVisitor<RETURN, EXTRA, EXCEP> {
Expand Down Expand Up @@ -58,6 +59,11 @@ public RETURN visitWriter(WriterPrel prel, EXTRA value) throws EXCEP {
return visitPrel(prel, value);
}

@Override
public RETURN visitUnnest(UnnestPrel prel, EXTRA value) throws EXCEP {
return visitPrel(prel, value);
}

@Override
public RETURN visitPrel(Prel prel, EXTRA value) throws EXCEP {
throw new UnsupportedOperationException(String.format("No visit method defined for prel %s in visitor %s.", prel, this.getClass().getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.planner.physical.ScreenPrel;
import org.apache.drill.exec.planner.physical.UnnestPrel;
import org.apache.drill.exec.planner.physical.WriterPrel;


Expand All @@ -35,6 +36,7 @@ public interface PrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
public RETURN visitScan(ScanPrel prel, EXTRA value) throws EXCEP;
public RETURN visitJoin(JoinPrel prel, EXTRA value) throws EXCEP;
public RETURN visitProject(ProjectPrel prel, EXTRA value) throws EXCEP;
public RETURN visitUnnest(UnnestPrel prel, EXTRA value) throws EXCEP;
public RETURN visitPrel(Prel prel, EXTRA value) throws EXCEP;

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*/

public class PrelVisualizerVisitor
implements PrelVisitor<Void, PrelVisualizerVisitor.VisualizationState, Exception> {
extends BasePrelVisitor<Void, PrelVisualizerVisitor.VisualizationState, Exception> {

public static class VisualizationState {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@

import org.apache.calcite.rel.rules.ProjectRemoveRule;
import org.apache.drill.exec.planner.StarColumnHelper;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.ProjectAllowDupPrel;
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.planner.physical.ScreenPrel;
import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.ProjectAllowDupPrel;
import org.apache.drill.exec.planner.physical.UnnestPrel;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
Expand Down Expand Up @@ -189,38 +190,48 @@ public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
return (Prel) prel.copy(prel.getTraitSet(), children);
}

@Override
public Prel visitScan(ScanPrel scanPrel, Void value) throws RuntimeException {
if (StarColumnHelper.containsStarColumn(scanPrel.getRowType()) && prefixedForStar) {
private Prel convertStarInLeaf(Prel prel, Void value) throws RuntimeException {
if (StarColumnHelper.containsStarColumn(prel.getRowType()) && prefixedForStar) {

List<RexNode> exprs = Lists.newArrayList();

for (RelDataTypeField field : scanPrel.getRowType().getFieldList()) {
RexNode expr = scanPrel.getCluster().getRexBuilder().makeInputRef(field.getType(), field.getIndex());
for (RelDataTypeField field : prel.getRowType().getFieldList()) {
RexNode expr = prel.getCluster().getRexBuilder().makeInputRef(field.getType(), field.getIndex());
exprs.add(expr);
}

List<String> fieldNames = Lists.newArrayList();

long tableId = tableNumber.getAndIncrement();

for (String name : scanPrel.getRowType().getFieldNames()) {
for (String name : prel.getRowType().getFieldNames()) {
if (StarColumnHelper.isNonPrefixedStarColumn(name)) {
fieldNames.add("T" + tableId + StarColumnHelper.PREFIX_DELIMITER + name); // Add prefix to * column.
} else {
fieldNames.add(name); // Keep regular column as it is.
}
}
RelDataType rowType = RexUtil.createStructType(scanPrel.getCluster().getTypeFactory(),
exprs, fieldNames, null);
RelDataType rowType = RexUtil.createStructType(prel.getCluster().getTypeFactory(),
exprs, fieldNames, null);

// insert a PAS.
ProjectPrel proj = new ProjectPrel(scanPrel.getCluster(), scanPrel.getTraitSet(), scanPrel, exprs, rowType);
ProjectPrel proj = new ProjectPrel(prel.getCluster(), prel.getTraitSet(), prel, exprs, rowType);

return proj;
} else {
return visitPrel(scanPrel, value);
return visitPrel(prel, value);
}

}

@Override
public Prel visitScan(ScanPrel scanPrel, Void value) throws RuntimeException {
return convertStarInLeaf(scanPrel, value);
}

@Override
public Prel visitUnnest(UnnestPrel unnestPrel, Void value) throws RuntimeException {
return convertStarInLeaf(unnestPrel, value);
}

private List<String> makeUniqueNames(List<String> names) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.planner.StarColumnHelper;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.DrillRelFactories;
import org.apache.drill.exec.store.AbstractSchema;
Expand Down Expand Up @@ -156,7 +157,7 @@ public static RelNode qualifyPartitionCol(RelNode input, List<String> partitionC
.message("Partition column %s is not in the SELECT list of CTAS!", col)
.build(logger);
} else {
if (SchemaPath.DYNAMIC_STAR.equals(field.getName())) {
if (StarColumnHelper.isStarColumn(field.getName())) {
colRefStarNames.add(col);

final List<RexNode> operands = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.drill.common.expression.SchemaPath;

import com.google.common.collect.Maps;
import org.apache.drill.exec.planner.StarColumnHelper;


/**
Expand Down Expand Up @@ -147,7 +148,7 @@ public FieldSelection getChild(String name){

private static boolean containsStar(List<SchemaPath> columns) {
for (SchemaPath expr : columns) {
if (SchemaPath.DYNAMIC_STAR.equals(expr.getRootSegment().getPath())) {
if (StarColumnHelper.isStarColumn(expr.getRootSegment().getPath())) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import static org.junit.Assert.assertEquals;

import org.apache.drill.PlanTestBase;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.exec.work.prepare.PreparedStatementTestBase;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

public class TestLateralPlans extends BaseTestQuery {
public class TestLateralPlans extends PreparedStatementTestBase {

@BeforeClass
public static void enableUnnestLateral() throws Exception {
Expand Down Expand Up @@ -90,8 +90,50 @@ public void testLateralSqlPlainCol() throws Exception {

@Test
public void testLateralSqlStar() throws Exception {
String Sql = "select * from cp.`lateraljoin/nested-customer.json` t, unnest(t.orders) t2 limit 1";
test(Sql);
String Sql = "select * from cp.`lateraljoin/nested-customer.json` t, unnest(t.orders) t2 limit 0";

testBuilder()
.unOrdered()
.sqlQuery(Sql)
.baselineColumns("c_name", "c_id", "c_phone", "orders", "c_address", "o_id", "o_shop", "o_amount", "items")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this set of columns is not right. The UNNEST should produce only the top level column since that's what the planner knows about. De-constructing the underlying map in the data means that we will not be able to specify a column alias for the unnest output: e.g UNNEST(t.orders) as t2(o).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The set of columns returned from UNNEST is another issue. And can be changed later if really needed. This fix is mainly about star case.

.expectsEmptyResultSet()
.go();
}

@Test
public void testLateralSqlStar2() throws Exception {
String Sql = "select c.* from cp.`lateraljoin/nested-customer.json` c, unnest(c.orders) o limit 0";

testBuilder()
.unOrdered()
.sqlQuery(Sql)
.baselineColumns("c_name", "c_id", "c_phone", "orders", "c_address")
.expectsEmptyResultSet()
.go();
}

@Test
public void testLateralSqlStar3() throws Exception {
String Sql = "select o.*, c.* from cp.`lateraljoin/nested-customer.json` c, unnest(c.orders) o limit 0";

testBuilder()
.unOrdered()
.sqlQuery(Sql)
.baselineColumns("o_id", "o_shop", "o_amount", "items", "c_name", "c_id", "c_phone", "orders", "c_address")
.expectsEmptyResultSet()
.go();
}

@Test
public void testLateralSqlStar4() throws Exception {
String Sql = "select o.* from cp.`lateraljoin/nested-customer.json` c, unnest(c.orders) o limit 0";

testBuilder()
.unOrdered()
.sqlQuery(Sql)
.baselineColumns("o_id", "o_shop", "o_amount", "items")
.expectsEmptyResultSet()
.go();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,15 @@ protected void compareUnorderedResults() throws Exception {
addTypeInfoIfMissing(actual.get(0), testBuilder);
addToMaterializedResults(actualRecords, actual, loader);

// If actual result record number is 0,
// and the baseline records does not exist, and baselineColumns provided,
// compare actual column number/names with expected columns
if (actualRecords.size() == 0
&& (baselineRecords == null || baselineRecords.size()==0)
&& baselineColumns != null) {
checkColumnDef(loader.getSchema());
}

// If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
// the cases where the baseline is stored in a file.
if (baselineRecords == null) {
Expand Down Expand Up @@ -716,6 +725,18 @@ public static void addToMaterializedResults(List<Map<String, Object>> materializ
}
}

public void checkColumnDef(BatchSchema batchSchema) throws Exception{
assert (batchSchema != null && batchSchema.getFieldCount()==baselineColumns.length);
for (int i=0; i<baselineColumns.length; ++i) {
if (!SchemaPath.parseFromString(baselineColumns[i]).equals(
SchemaPath.parseFromString(batchSchema.getColumn(i).getName()))) {
throw new Exception(i + "the expected column name is not matching: "
+ baselineColumns[i] + " is not " +
batchSchema.getColumn(i).getName());
}
}
}

public static boolean compareValuesErrorOnMismatch(Object expected, Object actual, int counter, String column) throws Exception {

if (compareValues(expected, actual, counter, column)) {
Expand Down