Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,7 @@ public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch in
protected boolean setupNewSchema() throws SchemaChangeException {
container.zeroVectors();
transfers.clear();


for(final VectorWrapper<?> v : incoming) {
final TransferPair pair = v.getValueVector().makeTransferPair(
container.addOrGet(v.getField(), callBack));
transfers.add(pair);
}
container.onSchemaChange(incoming, callBack, transfers);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onSchemaChange() may perhaps be the wrong name. It is why this functionality is called in this case. But, the actual functionality is closer to setupTransfers() (assuming the removed code was simply moved into the container class...)


final BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
*/
package org.apache.drill.exec.physical.impl.project;

import com.carrotsearch.hppc.IntHashSet;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.drill.common.expression.ConvertExpression;
import org.apache.drill.common.expression.ErrorCollector;
Expand Down Expand Up @@ -67,9 +68,10 @@
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import com.carrotsearch.hppc.IntHashSet;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
Expand Down Expand Up @@ -303,18 +305,29 @@ private boolean isWildcard(final NamedExpression ex) {
return expr.getPath().contains(SchemaPath.DYNAMIC_STAR);
}

private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException {
/**
* Sets up projection logic based on the metadata and incoming batch
* @param incomingBatch incoming batch
* @param clearContainers indicator on whether to clear the current containers
* @return true if there are dangling columns in the container (could happen due to schema changes)
*/
private boolean setupNewSchemaFromInput(RecordBatch incomingBatch, boolean clearContainers) throws SchemaChangeException {
if (allocationVectors != null) {
for (final ValueVector v : allocationVectors) {
v.clear();
}
}
this.allocationVectors = Lists.newArrayList();
if (complexWriters != null) {
container.clear();
clearContainers = true;
} else {
container.zeroVectors();
}

if (clearContainers) {
container.clear();
}

final List<NamedExpression> exprs = getExpressionList();
final ErrorCollector collector = new ErrorCollectorImpl();
final List<TransferPair> transfers = Lists.newArrayList();
Expand All @@ -325,7 +338,7 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha
// cg.getCodeGenerator().saveCodeForDebugging(true);

final IntHashSet transferFieldIds = new IntHashSet();

final HashSet<String> usedColumns = !clearContainers ? new HashSet<String>(container.getNumberOfColumns()) : null;
final boolean isAnyWildcard = isAnyWildcard(exprs);

final ClassifierResult result = new ClassifierResult();
Expand Down Expand Up @@ -358,6 +371,10 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha

final FieldReference ref = new FieldReference(name);
final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(), vvIn.getField().getType()), callBack);

if (usedColumns != null) {
usedColumns.add(vvOut.getField().getName());
}
final TransferPair tp = vvIn.makeTransferPair(vvOut);
transfers.add(tp);
}
Expand Down Expand Up @@ -385,6 +402,10 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha

final MaterializedField outputField = MaterializedField.create(name, expr.getMajorType());
final ValueVector vv = container.addOrGet(outputField, callBack);

if (usedColumns != null) {
usedColumns.add(vv.getField().getName());
}
allocationVectors.add(vv);
final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
Expand Down Expand Up @@ -438,6 +459,10 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha
final FieldReference ref = getRef(namedExpression);
final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
vectorRead.getMajorType()), callBack);
if (usedColumns != null) {
usedColumns.add(vvOut.getField().getName());
}

final TransferPair tp = vvIn.makeTransferPair(vvOut);
transfers.add(tp);
transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
Expand All @@ -462,6 +487,11 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha
} else {
// need to do evaluation.
final ValueVector vector = container.addOrGet(outputField, callBack);

if (usedColumns != null) {
usedColumns.add(vector.getField().getName());
}

allocationVectors.add(vector);
final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
final boolean useSetSafe = !(vector instanceof FixedWidthVector);
Expand Down Expand Up @@ -491,11 +521,20 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException("Failure while attempting to load generated class", e);
}

return usedColumns != null && usedColumns.size() != container.getNumberOfColumns();
}

@Override
protected boolean setupNewSchema() throws SchemaChangeException {
setupNewSchemaFromInput(this.incoming);
if (setupNewSchemaFromInput(this.incoming, false)) {
// There are dangling columns; we need to re-perform this task
// though with the clear vector indicator on.
// NOTE - the reason I am rerunning the setup operation instead of just
// removing columns is that the setup code seems to be aware of
// container columns position (TypeFieldId).
setupNewSchemaFromInput(this.incoming, true);
}
if (container.isSchemaChanged()) {
container.buildSchema(SelectionVectorMode.NONE);
return true;
Expand Down Expand Up @@ -800,7 +839,7 @@ protected IterOutcome handleNullInput() {
RecordBatch emptyIncomingBatch = new SimpleRecordBatch(emptyVC, context);

try {
setupNewSchemaFromInput(emptyIncomingBatch);
setupNewSchemaFromInput(emptyIncomingBatch, false);
} catch (SchemaChangeException e) {
kill(false);
logger.error("Failure during query", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeExcepti
// If the field is a map, check if the map schema changed.

} else if (vector.getField().getType().getMinorType() == MinorType.MAP &&
! isSameSchema(vector.getField().getChildren(), field.getChildList())) {
! SchemaUtil.isSameSchema(vector.getField().getChildren(), field.getChildList())) {

// The map schema changed. Discard the old map and create a new one.

Expand Down Expand Up @@ -166,62 +166,6 @@ public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeExcepti
return schemaChanged;
}

/**
* Check if two schemas are the same. The schemas, given as lists, represent the
* children of the original and new maps (AKA structures.)
*
* @param currentChildren current children of a Drill map
* @param newChildren new children, in an incoming batch, of the same
* Drill map
* @return true if the schemas are identical, false if a child is missing
* or has changed type or cardinality (AKA "mode").
*/

private boolean isSameSchema(Collection<MaterializedField> currentChildren,
List<SerializedField> newChildren) {
if (currentChildren.size() != newChildren.size()) {
return false;
}

// Column order can permute (see DRILL-5828). So, use a map
// for matching.

Map<String, MaterializedField> childMap = CaseInsensitiveMap.newHashMap();
for (MaterializedField currentChild : currentChildren) {
childMap.put(currentChild.getName(), currentChild);
}
for (SerializedField newChild : newChildren) {
MaterializedField currentChild = childMap.get(newChild.getNamePart().getName());

// New map member?

if (currentChild == null) {
return false;
}

// Changed data type?

if (! currentChild.getType().equals(newChild.getMajorType())) {
return false;
}

// Perform schema diff for child column(s)
if (currentChild.getChildren().size() != newChild.getChildCount()) {
return false;
}

if (!currentChild.getChildren().isEmpty()) {
if (!isSameSchema(currentChild.getChildren(), newChild.getChildList())) {
return false;
}
}
}

// Everything matches.

return true;
}

@Override
public TypedFieldId getValueVectorId(SchemaPath path) {
return container.getValueVectorId(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
*/
package org.apache.drill.exec.record;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.UnionVector;

Expand Down Expand Up @@ -179,4 +183,120 @@ public static VectorContainer coerceContainer(VectorAccessible in, BatchSchema t
Preconditions.checkState(vectorMap.size() == 0, "Leftover vector from incoming batch");
return c;
}

/**
* Check if two schemas are the same. The schemas, given as lists, represent the
* children of the original and new maps (AKA structures.)
*
* @param currentChildren current children of a Drill map
* @param newChildren new children, in an incoming batch, of the same
* Drill map
* @return true if the schemas are identical, false if a child is missing
* or has changed type or cardinality (AKA "mode").
*/
public static boolean isSameSchema(Collection<MaterializedField> currentChildren,
List<SerializedField> newChildren) {
if (currentChildren.size() != newChildren.size()) {
return false;
}

// Column order can permute (see DRILL-5828). So, use a map
// for matching.

Map<String, MaterializedField> childMap = CaseInsensitiveMap.newHashMap();
for (MaterializedField currentChild : currentChildren) {
childMap.put(currentChild.getName(), currentChild);
}
for (SerializedField newChild : newChildren) {
MaterializedField currentChild = childMap.get(newChild.getNamePart().getName());

// New map member?

if (currentChild == null) {
return false;
}

// Changed data type?

if (! currentChild.getType().equals(newChild.getMajorType())) {
return false;
}

// Perform schema diff for child column(s)
if (MinorType.MAP.equals(currentChild.getType().getMinorType())) {
if (currentChild.getChildren().size() != newChild.getChildCount()) {
return false;
}

if (!currentChild.getChildren().isEmpty()) {
if (!isSameSchema(currentChild.getChildren(), newChild.getChildList())) {
return false;
}
}
}
}

// Everything matches.

return true;
}

/**
* Check if two schemas are the same including the order of their children. The schemas, given as lists, represent the
* children of the original and new maps (AKA structures.)
*
* @param currentChildren current children of a Drill map
* @param newChildren new children, in an incoming batch, of the same
* Drill map
* @return true if the schemas are identical, false if a child is missing
* or has changed type or cardinality (AKA "mode").
*/
public static boolean isSameSchemaIncludingOrder(Collection<MaterializedField> currentChildren,
Collection<MaterializedField> newChildren) {

if (currentChildren.size() != newChildren.size()) {
return false;
}

// Insert the two collections in a List data structure to implement ordering logic
// TODO - MaterializedField should expose a LIST data structure since ordering is key for
// batch operators.
List<MaterializedField> currentChildrenList = new ArrayList<MaterializedField>(currentChildren);
List<MaterializedField> newChildrenList = new ArrayList<MaterializedField>(newChildren);

for (int idx = 0; idx < newChildrenList.size(); idx++) {
MaterializedField currentChild = currentChildrenList.get(idx);
MaterializedField newChild = newChildrenList.get(idx);

// Same name ?
if (!currentChild.getName().equalsIgnoreCase(newChild.getName())) {
return false;
}

// Changed data type?
if (!currentChild.getType().equals(newChild.getType())) {
return false;
}

// Perform schema diff for child column(s)
if (MinorType.MAP.equals(currentChild.getType().getMinorType())) {
if (currentChild.getChildren().size() != newChild.getChildren().size()) {
return false;
}

if (!currentChild.getChildren().isEmpty()) {
if (!isSameSchemaIncludingOrder(currentChild.getChildren(), newChild.getChildren())) {
return false;
}
}
}
}

// Everything matches.

return true;
}



}
Loading