diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 254a297ef0e..925300e332d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -60,13 +60,7 @@ public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch in protected boolean setupNewSchema() throws SchemaChangeException { container.zeroVectors(); transfers.clear(); - - - for(final VectorWrapper v : incoming) { - final TransferPair pair = v.getValueVector().makeTransferPair( - container.addOrGet(v.getField(), callBack)); - transfers.add(pair); - } + container.onSchemaChange(incoming, callBack, transfers); final BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index a96dfe14724..a2ccc87239b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -17,10 +17,11 @@ */ package org.apache.drill.exec.physical.impl.project; -import com.carrotsearch.hppc.IntHashSet; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; + import org.apache.commons.collections.map.CaseInsensitiveMap; import org.apache.drill.common.expression.ConvertExpression; import org.apache.drill.common.expression.ErrorCollector; @@ -67,9 +68,10 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; +import com.carrotsearch.hppc.IntHashSet; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; public class ProjectRecordBatch extends AbstractSingleRecordBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class); @@ -303,7 +305,13 @@ private boolean isWildcard(final NamedExpression ex) { return expr.getPath().contains(SchemaPath.DYNAMIC_STAR); } - private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException { + /** + * Sets up projection logic based on the metadata and incoming batch + * @param incomingBatch incoming batch + * @param clearContainers indicator on whether to clear the current containers + * @return true if there are dangling columns in the container (could happen due to schema changes) + */ + private boolean setupNewSchemaFromInput(RecordBatch incomingBatch, boolean clearContainers) throws SchemaChangeException { if (allocationVectors != null) { for (final ValueVector v : allocationVectors) { v.clear(); @@ -311,10 +319,15 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha } this.allocationVectors = Lists.newArrayList(); if (complexWriters != null) { - container.clear(); + clearContainers = true; } else { container.zeroVectors(); } + + if (clearContainers) { + container.clear(); + } + final List exprs = getExpressionList(); final ErrorCollector collector = new ErrorCollectorImpl(); final List transfers = Lists.newArrayList(); @@ -325,7 +338,7 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha // cg.getCodeGenerator().saveCodeForDebugging(true); final IntHashSet transferFieldIds = new IntHashSet(); - + final HashSet usedColumns = !clearContainers ? new HashSet(container.getNumberOfColumns()) : null; final boolean isAnyWildcard = isAnyWildcard(exprs); final ClassifierResult result = new ClassifierResult(); @@ -358,6 +371,10 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha final FieldReference ref = new FieldReference(name); final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(), vvIn.getField().getType()), callBack); + + if (usedColumns != null) { + usedColumns.add(vvOut.getField().getName()); + } final TransferPair tp = vvIn.makeTransferPair(vvOut); transfers.add(tp); } @@ -385,6 +402,10 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha final MaterializedField outputField = MaterializedField.create(name, expr.getMajorType()); final ValueVector vv = container.addOrGet(outputField, callBack); + + if (usedColumns != null) { + usedColumns.add(vv.getField().getName()); + } allocationVectors.add(vv); final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName())); final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); @@ -438,6 +459,10 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha final FieldReference ref = getRef(namedExpression); final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(), vectorRead.getMajorType()), callBack); + if (usedColumns != null) { + usedColumns.add(vvOut.getField().getName()); + } + final TransferPair tp = vvIn.makeTransferPair(vvOut); transfers.add(tp); transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]); @@ -462,6 +487,11 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha } else { // need to do evaluation. final ValueVector vector = container.addOrGet(outputField, callBack); + + if (usedColumns != null) { + usedColumns.add(vector.getField().getName()); + } + allocationVectors.add(vector); final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName())); final boolean useSetSafe = !(vector instanceof FixedWidthVector); @@ -491,11 +521,20 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class", e); } + + return usedColumns != null && usedColumns.size() != container.getNumberOfColumns(); } @Override protected boolean setupNewSchema() throws SchemaChangeException { - setupNewSchemaFromInput(this.incoming); + if (setupNewSchemaFromInput(this.incoming, false)) { + // There are dangling columns; we need to re-perform this task + // though with the clear vector indicator on. + // NOTE - the reason I am rerunning the setup operation instead of just + // removing columns is that the setup code seems to be aware of + // container columns position (TypeFieldId). + setupNewSchemaFromInput(this.incoming, true); + } if (container.isSchemaChanged()) { container.buildSchema(SelectionVectorMode.NONE); return true; @@ -800,7 +839,7 @@ protected IterOutcome handleNullInput() { RecordBatch emptyIncomingBatch = new SimpleRecordBatch(emptyVC, context); try { - setupNewSchemaFromInput(emptyIncomingBatch); + setupNewSchemaFromInput(emptyIncomingBatch, false); } catch (SchemaChangeException e) { kill(false); logger.error("Failure during query", e); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index a2c4363df7f..17d101c9c5e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -117,7 +117,7 @@ public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeExcepti // If the field is a map, check if the map schema changed. } else if (vector.getField().getType().getMinorType() == MinorType.MAP && - ! isSameSchema(vector.getField().getChildren(), field.getChildList())) { + ! SchemaUtil.isSameSchema(vector.getField().getChildren(), field.getChildList())) { // The map schema changed. Discard the old map and create a new one. @@ -166,62 +166,6 @@ public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeExcepti return schemaChanged; } - /** - * Check if two schemas are the same. The schemas, given as lists, represent the - * children of the original and new maps (AKA structures.) - * - * @param currentChildren current children of a Drill map - * @param newChildren new children, in an incoming batch, of the same - * Drill map - * @return true if the schemas are identical, false if a child is missing - * or has changed type or cardinality (AKA "mode"). - */ - - private boolean isSameSchema(Collection currentChildren, - List newChildren) { - if (currentChildren.size() != newChildren.size()) { - return false; - } - - // Column order can permute (see DRILL-5828). So, use a map - // for matching. - - Map childMap = CaseInsensitiveMap.newHashMap(); - for (MaterializedField currentChild : currentChildren) { - childMap.put(currentChild.getName(), currentChild); - } - for (SerializedField newChild : newChildren) { - MaterializedField currentChild = childMap.get(newChild.getNamePart().getName()); - - // New map member? - - if (currentChild == null) { - return false; - } - - // Changed data type? - - if (! currentChild.getType().equals(newChild.getMajorType())) { - return false; - } - - // Perform schema diff for child column(s) - if (currentChild.getChildren().size() != newChild.getChildCount()) { - return false; - } - - if (!currentChild.getChildren().isEmpty()) { - if (!isSameSchema(currentChild.getChildren(), newChild.getChildList())) { - return false; - } - } - } - - // Everything matches. - - return true; - } - @Override public TypedFieldId getValueVectorId(SchemaPath path) { return container.getValueVectorId(path); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java index 67b25220b52..3a54f848fdc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java @@ -17,11 +17,14 @@ */ package org.apache.drill.exec.record; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.map.CaseInsensitiveMap; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; @@ -29,6 +32,7 @@ import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.UnionVector; @@ -179,4 +183,120 @@ public static VectorContainer coerceContainer(VectorAccessible in, BatchSchema t Preconditions.checkState(vectorMap.size() == 0, "Leftover vector from incoming batch"); return c; } + + /** + * Check if two schemas are the same. The schemas, given as lists, represent the + * children of the original and new maps (AKA structures.) + * + * @param currentChildren current children of a Drill map + * @param newChildren new children, in an incoming batch, of the same + * Drill map + * @return true if the schemas are identical, false if a child is missing + * or has changed type or cardinality (AKA "mode"). + */ + public static boolean isSameSchema(Collection currentChildren, + List newChildren) { + if (currentChildren.size() != newChildren.size()) { + return false; + } + + // Column order can permute (see DRILL-5828). So, use a map + // for matching. + + Map childMap = CaseInsensitiveMap.newHashMap(); + for (MaterializedField currentChild : currentChildren) { + childMap.put(currentChild.getName(), currentChild); + } + for (SerializedField newChild : newChildren) { + MaterializedField currentChild = childMap.get(newChild.getNamePart().getName()); + + // New map member? + + if (currentChild == null) { + return false; + } + + // Changed data type? + + if (! currentChild.getType().equals(newChild.getMajorType())) { + return false; + } + + // Perform schema diff for child column(s) + if (MinorType.MAP.equals(currentChild.getType().getMinorType())) { + if (currentChild.getChildren().size() != newChild.getChildCount()) { + return false; + } + + if (!currentChild.getChildren().isEmpty()) { + if (!isSameSchema(currentChild.getChildren(), newChild.getChildList())) { + return false; + } + } + } + } + + // Everything matches. + + return true; + } + + /** + * Check if two schemas are the same including the order of their children. The schemas, given as lists, represent the + * children of the original and new maps (AKA structures.) + * + * @param currentChildren current children of a Drill map + * @param newChildren new children, in an incoming batch, of the same + * Drill map + * @return true if the schemas are identical, false if a child is missing + * or has changed type or cardinality (AKA "mode"). + */ + public static boolean isSameSchemaIncludingOrder(Collection currentChildren, + Collection newChildren) { + + if (currentChildren.size() != newChildren.size()) { + return false; + } + + // Insert the two collections in a List data structure to implement ordering logic + // TODO - MaterializedField should expose a LIST data structure since ordering is key for + // batch operators. + List currentChildrenList = new ArrayList(currentChildren); + List newChildrenList = new ArrayList(newChildren); + + for (int idx = 0; idx < newChildrenList.size(); idx++) { + MaterializedField currentChild = currentChildrenList.get(idx); + MaterializedField newChild = newChildrenList.get(idx); + + // Same name ? + if (!currentChild.getName().equalsIgnoreCase(newChild.getName())) { + return false; + } + + // Changed data type? + if (!currentChild.getType().equals(newChild.getType())) { + return false; + } + + // Perform schema diff for child column(s) + if (MinorType.MAP.equals(currentChild.getType().getMinorType())) { + if (currentChild.getChildren().size() != newChild.getChildren().size()) { + return false; + } + + if (!currentChild.getChildren().isEmpty()) { + if (!isSameSchemaIncludingOrder(currentChild.getChildren(), newChild.getChildren())) { + return false; + } + } + } + } + + // Everything matches. + + return true; + } + + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index c46efaff27f..80943ed0430 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -18,12 +18,14 @@ package org.apache.drill.exec.record; import java.lang.reflect.Array; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.OperatorContext; @@ -49,7 +51,7 @@ public VectorContainer() { allocator = null; } - public VectorContainer(OperatorContext oContext) { + public VectorContainer( OperatorContext oContext) { this(oContext.getAllocator()); } @@ -136,14 +138,28 @@ public T addOrGet(MaterializedField field) { public T addOrGet(final MaterializedField field, final SchemaChangeCallBack callBack) { final TypedFieldId id = getValueVectorId(SchemaPath.getSimplePath(field.getName())); final ValueVector vector; - final Class clazz = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getType().getMode()); + if (id != null) { - vector = getValueAccessorById(id.getFieldIds()).getValueVector(); + vector = getValueAccessorById(id.getFieldIds()).getValueVector(); + final Class clazz = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getType().getMode()); + + // Check whether incoming field and the current one are compatible; if not then replace previous one with the new one if (id.getFieldIds().length == 1 && clazz != null && !clazz.isAssignableFrom(vector.getClass())) { final ValueVector newVector = TypeHelper.getNewVector(field, this.getAllocator(), callBack); replace(vector, newVector); return (T) newVector; } + + // At this point, we know incoming and current fields are compatible. Maps can have children, + // we need to ensure they have the same structure. + if (MinorType.MAP.equals(field.getType().getMinorType()) + && vector != null + && !SchemaUtil.isSameSchemaIncludingOrder(vector.getField().getChildren(), field.getChildren())) { + + final ValueVector newVector = TypeHelper.getNewVector(field, this.getAllocator(), callBack); + replace(vector, newVector); + return (T) newVector; + } } else { vector = TypeHelper.getNewVector(field, this.getAllocator(), callBack); add(vector); @@ -156,6 +172,48 @@ public T addOrGet(String name, MajorType type, Class return addOrGet(field); } + /** + * This method will handle the following schema changes: + *
    + *
  • Replace any existing field which has changed (including children for complex types); + * note that the index is kept as it is key for code generation + *
  • Removing dangling fields (field which only exist in this container) + *
+ * + * NOTE - If dangling fields are detected, then all wrappers + * are cleared and the incoming fields will be added (potentially with + * new indexes). The schemaChanged flag will be set. + * + * @param incoming incoming batch + * @param callBack schema change callback + * @param transfers transfer pairs + */ + public void onSchemaChange(RecordBatch incoming, + SchemaChangeCallBack callBack, + List transfers) { + + // We need to figure out whether there are dangling fields; this is + // critical as a) we cannot keep them in there as otherwise the copy + // logic will attempt accessing uninitialized data and b) removing + // them will cause issues as code generation is based on field index + // within the container. + boolean danglingColumns = danglingColumnsFound(incoming); + + if (danglingColumns) { + schemaChanged = true; + zeroVectors(); + wrappers.clear(); + } + + // - Add a field if it didn't exist & unchanged + // - Replace changed fields + // - If there were dangling columns, then essentially all incoming fields will be added + for(final VectorWrapper v : incoming) { + final TransferPair pair = v.getValueVector().makeTransferPair(addOrGet(v.getField(), callBack)); + transfers.add(pair); + } + } + /** * Get a set of transferred clones of this container. Note that this guarantees that the vectors in the cloned * container have the same TypedFieldIds as the existing container, allowing interchangeability in generated code. In @@ -374,7 +432,7 @@ public SelectionVector4 getSelectionVector4() { public void zeroVectors() { VectorAccessibleUtilities.clear(this); - } + } public int getNumberOfColumns() { return wrappers.size(); @@ -395,6 +453,33 @@ public boolean allocateNewSafe() { return true; } + /** + * @param incoming incoming record batch + * @return returns true if there are dangling fields + */ + public boolean danglingColumnsFound(RecordBatch incoming) { + // Could happen for null input (when operators are used for output) + if (wrappers.size() == 0 + || incoming == null || incoming.iterator() == null) { + return false; + } + + // Map to detect dangling fields + final HashSet oldFields = new HashSet(wrappers.size()); + + // Add existing columns + for(VectorWrapper wrapper : wrappers) { + oldFields.add(wrapper.getField().getName()); + } + + // Remove columns from the incoming batch + for(final VectorWrapper v : incoming) { + oldFields.remove(v.getField().getName()); + } + + return !oldFields.isEmpty(); + } + /** * Merge two batches to create a single, combined, batch. Vectors * appear in the order defined by {@link BatchSchema#merge(BatchSchema)}. @@ -447,4 +532,5 @@ public void exchange(VectorContainer other) { schemaChanged = other.schemaChanged; other.schemaChanged = temp2; } + }