diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java index 113b3adb7d9..cca34e3ca67 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java @@ -33,6 +33,7 @@ import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; @@ -205,7 +206,7 @@ public int next() { if (reader == null) { break; // no more documents for this scanner } else if (isSkipQuery()) { - vectorWriter.rootAsMap().bit("count").writeBit(1); + vectorWriter.rootAsMap().bit("count", TypeProtos.DataMode.OPTIONAL).writeBit(1); } else { MapOrListWriterImpl writer = new MapOrListWriterImpl(vectorWriter.rootAsMap()); if (idOnly) { @@ -337,7 +338,8 @@ private void writeTimeStamp(MapOrListWriterImpl writer, String fieldName, DBDocu if (allTextMode) { writeString(writer, fieldName, reader.getTimestamp().toUTCString()); } else { - ((writer.map != null) ? writer.map.timeStamp(fieldName) : writer.list.timeStamp()).writeTimeStamp(reader.getTimestampLong()); + ((writer.map != null) ? writer.map.timeStamp(fieldName, TypeProtos.DataMode.OPTIONAL) : + writer.list.timeStamp()).writeTimeStamp(reader.getTimestampLong()); } } @@ -345,7 +347,8 @@ private void writeTime(MapOrListWriterImpl writer, String fieldName, DBDocumentR if (allTextMode) { writeString(writer, fieldName, reader.getTime().toTimeStr()); } else { - ((writer.map != null) ? writer.map.time(fieldName) : writer.list.time()).writeTime(reader.getTimeInt()); + ((writer.map != null) ? writer.map.time(fieldName, TypeProtos.DataMode.OPTIONAL) : + writer.list.time()).writeTime(reader.getTimeInt()); } } @@ -354,7 +357,8 @@ private void writeDate(MapOrListWriterImpl writer, String fieldName, DBDocumentR writeString(writer, fieldName, reader.getDate().toDateStr()); } else { long milliSecondsSinceEpoch = reader.getDateInt() * MILLISECONDS_IN_A_DAY; - ((writer.map != null) ? writer.map.date(fieldName) : writer.list.date()).writeDate(milliSecondsSinceEpoch); + ((writer.map != null) ? writer.map.date(fieldName, TypeProtos.DataMode.OPTIONAL) : + writer.list.date()).writeDate(milliSecondsSinceEpoch); } } @@ -362,7 +366,7 @@ private void writeDouble(MapOrListWriterImpl writer, String fieldName, DBDocumen if (allTextMode) { writeString(writer, fieldName, String.valueOf(reader.getDouble())); } else { - writer.float8(fieldName).writeFloat8(reader.getDouble()); + writer.float8(fieldName, TypeProtos.DataMode.OPTIONAL).writeFloat8(reader.getDouble()); } } @@ -370,9 +374,9 @@ private void writeFloat(MapOrListWriterImpl writer, String fieldName, DBDocument if (allTextMode) { writeString(writer, fieldName, String.valueOf(reader.getFloat())); } else if (readNumbersAsDouble) { - writer.float8(fieldName).writeFloat8(reader.getFloat()); + writer.float8(fieldName, TypeProtos.DataMode.OPTIONAL).writeFloat8(reader.getFloat()); } else { - writer.float4(fieldName).writeFloat4(reader.getFloat()); + writer.float4(fieldName, TypeProtos.DataMode.OPTIONAL).writeFloat4(reader.getFloat()); } } @@ -380,9 +384,9 @@ private void writeLong(MapOrListWriterImpl writer, String fieldName, DBDocumentR if (allTextMode) { writeString(writer, fieldName, String.valueOf(reader.getLong())); } else if (readNumbersAsDouble) { - writer.float8(fieldName).writeFloat8(reader.getLong()); + writer.float8(fieldName, TypeProtos.DataMode.OPTIONAL).writeFloat8(reader.getLong()); } else { - writer.bigInt(fieldName).writeBigInt(reader.getLong()); + writer.bigInt(fieldName, TypeProtos.DataMode.OPTIONAL).writeBigInt(reader.getLong()); } } @@ -390,9 +394,9 @@ private void writeInt(MapOrListWriterImpl writer, String fieldName, DBDocumentRe if (allTextMode) { writeString(writer, fieldName, String.valueOf(reader.getInt())); } else if (readNumbersAsDouble) { - writer.float8(fieldName).writeFloat8(reader.getInt()); + writer.float8(fieldName, TypeProtos.DataMode.OPTIONAL).writeFloat8(reader.getInt()); } else { - writer.integer(fieldName).writeInt(reader.getInt()); + writer.integer(fieldName, TypeProtos.DataMode.OPTIONAL).writeInt(reader.getInt()); } } @@ -400,9 +404,9 @@ private void writeShort(MapOrListWriterImpl writer, String fieldName, DBDocument if (allTextMode) { writeString(writer, fieldName, String.valueOf(reader.getShort())); } else if (readNumbersAsDouble) { - writer.float8(fieldName).writeFloat8(reader.getShort()); + writer.float8(fieldName, TypeProtos.DataMode.OPTIONAL).writeFloat8(reader.getShort()); } else { - ((writer.map != null) ? writer.map.smallInt(fieldName) : writer.list.smallInt()).writeSmallInt(reader.getShort()); + ((writer.map != null) ? writer.map.smallInt(fieldName, TypeProtos.DataMode.OPTIONAL) : writer.list.smallInt()).writeSmallInt(reader.getShort()); } } @@ -410,9 +414,9 @@ private void writeByte(MapOrListWriterImpl writer, String fieldName, DBDocumentR if (allTextMode) { writeString(writer, fieldName, String.valueOf(reader.getByte())); } else if (readNumbersAsDouble) { - writer.float8(fieldName).writeFloat8(reader.getByte()); + writer.float8(fieldName, TypeProtos.DataMode.OPTIONAL).writeFloat8(reader.getByte()); } else { - ((writer.map != null) ? writer.map.tinyInt(fieldName) : writer.list.tinyInt()).writeTinyInt(reader.getByte()); + ((writer.map != null) ? writer.map.tinyInt(fieldName, TypeProtos.DataMode.OPTIONAL) : writer.list.tinyInt()).writeTinyInt(reader.getByte()); } } @@ -420,7 +424,7 @@ private void writeBoolean(MapOrListWriterImpl writer, String fieldName, DBDocume if (allTextMode) { writeString(writer, fieldName, String.valueOf(reader.getBoolean())); } else { - writer.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0); + writer.bit(fieldName, TypeProtos.DataMode.OPTIONAL).writeBit(reader.getBoolean() ? 1 : 0); } } @@ -430,7 +434,7 @@ private void writeBinary(MapOrListWriterImpl writer, String fieldName, ByteBuffe } else { buffer = buffer.reallocIfNeeded(buf.remaining()); buffer.setBytes(0, buf, buf.position(), buf.remaining()); - writer.binary(fieldName).writeVarBinary(0, buf.remaining(), buffer); + writer.binary(fieldName, TypeProtos.DataMode.OPTIONAL).writeVarBinary(0, buf.remaining(), buffer); } } @@ -438,7 +442,7 @@ private void writeString(MapOrListWriterImpl writer, String fieldName, String va final byte[] strBytes = Bytes.toBytes(value); buffer = buffer.reallocIfNeeded(strBytes.length); buffer.setBytes(0, strBytes); - writer.varChar(fieldName).writeVarChar(0, strBytes.length, buffer); + writer.varChar(fieldName, TypeProtos.DataMode.OPTIONAL).writeVarChar(0, strBytes.length, buffer); } private UserException unsupportedError(String format, Object... args) { diff --git a/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java b/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java deleted file mode 100644 index 7584b826440..00000000000 --- a/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.apache.drill.common.types.MinorType; -import org.apache.parquet.format.ConvertedType; -import org.apache.parquet.schema.DecimalMetadata; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; - -<@pp.dropOutputFile /> -<@pp.changeOutputFile name="org/apache/drill/exec/store/parquet/ParquetTypeHelper.java" /> -<#include "/@includes/license.ftl" /> - -package org.apache.drill.exec.store.parquet; - -import org.apache.drill.common.types.TypeProtos.DataMode; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.record.MaterializedField; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.DecimalMetadata; -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; -import org.apache.parquet.schema.Type.Repetition; - -import java.util.HashMap; -import java.util.Map; - -/* - * This class is generated using freemarker and the ${.template_name} template. - */ - -public class ParquetTypeHelper { - private static Map typeMap; - private static Map modeMap; - private static Map originalTypeMap; - - static { - typeMap = new HashMap(); - - <#list vv.types as type> - <#list type.minor as minor> - <#if minor.class == "TinyInt" || - minor.class == "UInt1" || - minor.class == "UInt2" || - minor.class == "SmallInt" || - minor.class == "Int" || - minor.class == "Time" || - minor.class == "Decimal9" || - minor.class == "Date" || - minor.class == "UInt4"> - typeMap.put(MinorType.${minor.class?upper_case}, PrimitiveTypeName.INT32); - <#elseif - minor.class == "Float4"> - typeMap.put(MinorType.${minor.class?upper_case}, PrimitiveTypeName.FLOAT); - <#elseif - minor.class == "BigInt" || - minor.class == "Decimal18" || - minor.class == "TimeStamp" || - minor.class == "UInt8"> - typeMap.put(MinorType.${minor.class?upper_case}, PrimitiveTypeName.INT64); - <#elseif - minor.class == "Float8"> - typeMap.put(MinorType.${minor.class?upper_case}, PrimitiveTypeName.DOUBLE); - <#elseif - minor.class == "Bit"> - typeMap.put(MinorType.${minor.class?upper_case}, PrimitiveTypeName.BOOLEAN); - <#elseif - minor.class == "TimeTZ" || - minor.class == "IntervalDay" || - minor.class == "IntervalYear" || - minor.class == "Interval" || - minor.class == "Decimal28Dense" || - minor.class == "Decimal38Dense" || - minor.class == "Decimal28Sparse" || - minor.class == "Decimal38Sparse"> - typeMap.put(MinorType.${minor.class?upper_case}, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY); - <#elseif - minor.class == "VarChar" || - minor.class == "Var16Char" || - minor.class == "VarBinary" > - typeMap.put(MinorType.${minor.class?upper_case}, PrimitiveTypeName.BINARY); - - - - - modeMap = new HashMap(); - - modeMap.put(DataMode.REQUIRED, Repetition.REQUIRED); - modeMap.put(DataMode.OPTIONAL, Repetition.OPTIONAL); - modeMap.put(DataMode.REPEATED, Repetition.REPEATED); - - originalTypeMap = new HashMap(); - - <#list vv.types as type> - <#list type.minor as minor> - <#if minor.class.startsWith("Decimal")> - originalTypeMap.put(MinorType.${minor.class?upper_case},OriginalType.DECIMAL); - - - - originalTypeMap.put(MinorType.VARCHAR, OriginalType.UTF8); - originalTypeMap.put(MinorType.DATE, OriginalType.DATE); - originalTypeMap.put(MinorType.TIME, OriginalType.TIME_MILLIS); - originalTypeMap.put(MinorType.TIMESTAMP, OriginalType.TIMESTAMP_MILLIS); - originalTypeMap.put(MinorType.INTERVALDAY, OriginalType.INTERVAL); - originalTypeMap.put(MinorType.INTERVALYEAR, OriginalType.INTERVAL); - originalTypeMap.put(MinorType.INTERVAL, OriginalType.INTERVAL); -// originalTypeMap.put(MinorType.TIMESTAMPTZ, OriginalType.TIMESTAMPTZ); - } - - public static PrimitiveTypeName getPrimitiveTypeNameForMinorType(MinorType minorType) { - return typeMap.get(minorType); - } - - public static Repetition getRepetitionForDataMode(DataMode dataMode) { - return modeMap.get(dataMode); - } - - public static OriginalType getOriginalTypeForMinorType(MinorType minorType) { - return originalTypeMap.get(minorType); - } - - public static DecimalMetadata getDecimalMetadataForField(MaterializedField field) { - switch(field.getType().getMinorType()) { - case DECIMAL9: - case DECIMAL18: - case DECIMAL28SPARSE: - case DECIMAL28DENSE: - case DECIMAL38SPARSE: - case DECIMAL38DENSE: - return new DecimalMetadata(field.getPrecision(), field.getScale()); - default: - return null; - } - } - - public static int getLengthForMinorType(MinorType minorType) { - switch(minorType) { - case INTERVALDAY: - case INTERVALYEAR: - case INTERVAL: - return 12; - case DECIMAL28SPARSE: - return 12; - case DECIMAL38SPARSE: - return 16; - default: - return 0; - } - } - -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java index b7877df1693..0e3ac1f0f79 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,13 +20,9 @@ import com.google.common.base.Charsets; import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.common.types.TypeProtos.DataMode; -//import org.apache.drill.common.types.DataMode; -import org.apache.drill.common.types.MinorType; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.expr.holders.VarCharHolder; import org.apache.drill.exec.vector.complex.MapUtility; -import org.apache.drill.exec.vector.complex.impl.SingleMapReaderImpl; import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.drill.exec.vector.complex.writer.BaseWriter; @@ -42,7 +38,7 @@ public class MappifyUtility { public static DrillBuf mappify(FieldReader reader, BaseWriter.ComplexWriter writer, DrillBuf buffer) { // Currently we expect single map as input - if (DataMode.REPEATED == reader.getType().getMode() || !(reader.getType().getMinorType() == TypeProtos.MinorType.MAP)) { + if (TypeProtos.DataMode.REPEATED == reader.getType().getMode() || !(reader.getType().getMinorType() == TypeProtos.MinorType.MAP)) { throw new DrillRuntimeException("kvgen function only supports Simple maps as input"); } BaseWriter.ListWriter listWriter = writer.rootAsList(); @@ -72,7 +68,7 @@ public static DrillBuf mappify(FieldReader reader, BaseWriter.ComplexWriter writ vh.start = 0; vh.end = b.length; vh.buffer = buffer; - mapWriter.varChar(fieldKey).write(vh); + mapWriter.varChar(fieldKey, TypeProtos.DataMode.OPTIONAL).write(vh); // Write the value to the map MapUtility.writeToMapFromReader(fieldReader, mapWriter); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ParseQueryFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ParseQueryFunction.java index 7dce1fc765d..3ebed4d6ac7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ParseQueryFunction.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ParseQueryFunction.java @@ -1,8 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.drill.exec.expr.fn.impl; -//* - import io.netty.buffer.DrillBuf; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; import org.apache.drill.exec.expr.annotations.Output; @@ -12,21 +28,6 @@ import javax.inject.Inject; -/* Copyright 2001-2004 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - @FunctionTemplate( name="parse_query", scope= FunctionTemplate.FunctionScope.SIMPLE, @@ -80,7 +81,7 @@ public void eval() { rowHolder.end = rowStringBytes.length; rowHolder.buffer = outBuffer; - queryMapWriter.varChar(queryParts[0]).write(rowHolder); + queryMapWriter.varChar(queryParts[0], TypeProtos.DataMode.OPTIONAL).write(rowHolder); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ParseUrlFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ParseUrlFunction.java index fa339d45ee4..4a4c30cfb48 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ParseUrlFunction.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ParseUrlFunction.java @@ -1,13 +1,13 @@ -package org.apache.drill.exec.expr.fn.impl; - /* - * Copyright 2001-2004 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,7 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.drill.exec.expr.fn.impl; + import io.netty.buffer.DrillBuf; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; import org.apache.drill.exec.expr.annotations.Output; @@ -70,7 +73,7 @@ public void eval() { rowHolder.end = rowStringBytes.length; rowHolder.buffer = outBuffer; - urlMapWriter.varChar("protocol").write(rowHolder); + urlMapWriter.varChar("protocol", TypeProtos.DataMode.OPTIONAL).write(rowHolder); byte[] authRowStringBytes = authority.getBytes(); @@ -82,7 +85,7 @@ public void eval() { rowHolder.end = authRowStringBytes.length; rowHolder.buffer = outBuffer; - urlMapWriter.varChar("authority").write(rowHolder); + urlMapWriter.varChar("authority", TypeProtos.DataMode.OPTIONAL).write(rowHolder); byte[] hostRowStringBytes = host.getBytes(); @@ -94,7 +97,7 @@ public void eval() { rowHolder.end = hostRowStringBytes.length; rowHolder.buffer = outBuffer; - urlMapWriter.varChar("host").write(rowHolder); + urlMapWriter.varChar("host", TypeProtos.DataMode.OPTIONAL).write(rowHolder); byte[] pathRowStringBytes = path.getBytes(); @@ -106,7 +109,7 @@ public void eval() { rowHolder.end = pathRowStringBytes.length; rowHolder.buffer = outBuffer; - urlMapWriter.varChar("path").write(rowHolder); + urlMapWriter.varChar("path", TypeProtos.DataMode.OPTIONAL).write(rowHolder); byte[] queryRowStringBytes = query.getBytes(); @@ -118,7 +121,7 @@ public void eval() { rowHolder.end = queryRowStringBytes.length; rowHolder.buffer = outBuffer; - urlMapWriter.varChar("query").write(rowHolder); + urlMapWriter.varChar("query", TypeProtos.DataMode.OPTIONAL).write(rowHolder); byte[] filenameRowStringBytes = filename.getBytes(); @@ -130,7 +133,7 @@ public void eval() { rowHolder.end = filenameRowStringBytes.length; rowHolder.buffer = outBuffer; - urlMapWriter.varChar("filename").write(rowHolder); + urlMapWriter.varChar("filename", TypeProtos.DataMode.OPTIONAL).write(rowHolder); byte[] refRowStringBytes = ref.getBytes(); @@ -142,11 +145,11 @@ public void eval() { rowHolder.end = refRowStringBytes.length; rowHolder.buffer = outBuffer; - urlMapWriter.varChar("ref").write(rowHolder); + urlMapWriter.varChar("ref", TypeProtos.DataMode.OPTIONAL).write(rowHolder); org.apache.drill.exec.expr.holders.IntHolder intHolder = new org.apache.drill.exec.expr.holders.IntHolder(); intHolder.value = port; - urlMapWriter.integer("port").write(intHolder); + urlMapWriter.integer("port", TypeProtos.DataMode.OPTIONAL).write(intHolder); } catch (Exception e ) {} } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java index bbc9b04a93b..04e52733cc5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -37,8 +37,8 @@ import org.apache.avro.util.Utf8; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; @@ -270,29 +270,29 @@ private void processPrimitive(final Object value, final Schema.Type type, final } ensure(length); buffer.setBytes(0, binary); - writer.varChar(fieldName).writeVarChar(0, length, buffer); + writer.varChar(fieldName, TypeProtos.DataMode.OPTIONAL).writeVarChar(0, length, buffer); break; case INT: - writer.integer(fieldName).writeInt((Integer) value); + writer.integer(fieldName, TypeProtos.DataMode.OPTIONAL).writeInt((Integer) value); break; case LONG: - writer.bigInt(fieldName).writeBigInt((Long) value); + writer.bigInt(fieldName, TypeProtos.DataMode.OPTIONAL).writeBigInt((Long) value); break; case FLOAT: - writer.float4(fieldName).writeFloat4((Float) value); + writer.float4(fieldName, TypeProtos.DataMode.OPTIONAL).writeFloat4((Float) value); break; case DOUBLE: - writer.float8(fieldName).writeFloat8((Double) value); + writer.float8(fieldName, TypeProtos.DataMode.OPTIONAL).writeFloat8((Double) value); break; case BOOLEAN: - writer.bit(fieldName).writeBit((Boolean) value ? 1 : 0); + writer.bit(fieldName, TypeProtos.DataMode.OPTIONAL).writeBit((Boolean) value ? 1 : 0); break; case BYTES: final ByteBuffer buf = (ByteBuffer) value; length = buf.remaining(); ensure(length); buffer.setBytes(0, buf); - writer.binary(fieldName).writeVarBinary(0, length, buffer); + writer.binary(fieldName, TypeProtos.DataMode.OPTIONAL).writeVarBinary(0, length, buffer); break; case NULL: // Nothing to do for null type @@ -307,7 +307,7 @@ private void processPrimitive(final Object value, final Schema.Type type, final } ensure(b.length); buffer.setBytes(0, b); - writer.varChar(fieldName).writeVarChar(0, b.length, buffer); + writer.varChar(fieldName, TypeProtos.DataMode.OPTIONAL).writeVarChar(0, b.length, buffer); break; default: diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java index c0b67263153..8cebbd36730 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,6 +28,7 @@ import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.expr.holders.BigIntHolder; import org.apache.drill.exec.expr.holders.BitHolder; import org.apache.drill.exec.expr.holders.Float8Holder; @@ -250,7 +251,7 @@ private void writeTimeStamp(int timestamp, final MapOrListWriterImpl writer, Str DateTime dateTime = new DateTime(timestamp); TimeWriter t; if (isList == false) { - t = writer.map.time(fieldName); + t = writer.map.time(fieldName, TypeProtos.DataMode.OPTIONAL); } else { t = writer.list.time(); } @@ -273,7 +274,7 @@ private void writeString(String readString, final MapOrListWriterImpl writer, St vh.start = 0; vh.end = length; if (isList == false) { - writer.varChar(fieldName).write(vh); + writer.varChar(fieldName, TypeProtos.DataMode.OPTIONAL).write(vh); } else { writer.list.varChar().write(vh); } @@ -289,7 +290,7 @@ private void writeDouble(double readDouble, final MapOrListWriterImpl writer, St final Float8Holder f8h = new Float8Holder(); f8h.value = readDouble; if (isList == false) { - writer.float8(fieldName).write(f8h); + writer.float8(fieldName, TypeProtos.DataMode.OPTIONAL).write(f8h); } else { writer.list.float8().write(f8h); } @@ -299,7 +300,7 @@ private void writeDateTime(long readDateTime, final MapOrListWriterImpl writer, DateTime date = new DateTime(readDateTime); DateWriter dt; if (isList == false) { - dt = writer.map.date(fieldName); + dt = writer.map.date(fieldName, TypeProtos.DataMode.OPTIONAL); } else { dt = writer.list.date(); } @@ -310,7 +311,7 @@ private void writeBoolean(boolean readBoolean, final MapOrListWriterImpl writer, final BitHolder bit = new BitHolder(); bit.value = readBoolean ? 1 : 0; if (isList == false) { - writer.bit(fieldName).write(bit); + writer.bit(fieldName, TypeProtos.DataMode.OPTIONAL).write(bit); } else { writer.list.bit().write(bit); } @@ -324,7 +325,7 @@ private void writeBinary(final MapOrListWriterImpl writer, String fieldName, boo vb.start = 0; vb.end = bytes.length; if (isList == false) { - writer.binary(fieldName).write(vb); + writer.binary(fieldName, TypeProtos.DataMode.OPTIONAL).write(vb); } else { writer.list.varBinary().write(vb); } @@ -334,7 +335,7 @@ private void writeInt64(long readInt64, final MapOrListWriterImpl writer, String final BigIntHolder bh = new BigIntHolder(); bh.value = readInt64; if (isList == false) { - writer.bigInt(fieldName).write(bh); + writer.bigInt(fieldName, TypeProtos.DataMode.OPTIONAL).write(bh); } else { writer.list.bigInt().write(bh); } @@ -344,7 +345,7 @@ private void writeInt32(int readInt32, final MapOrListWriterImpl writer, String final IntHolder ih = new IntHolder(); ih.value = readInt32; if (isList == false) { - writer.integer(fieldName).write(ih); + writer.integer(fieldName, TypeProtos.DataMode.OPTIONAL).write(ih); } else { writer.list.integer().write(ih); } @@ -361,7 +362,7 @@ public void ensureAtLeastOneField(ComplexWriter writer) { fieldWriter = fieldWriter.map(root.getNameSegment().getPath()); root = root.getChild(); } - fieldWriter.integer(root.getNameSegment().getPath()); + fieldWriter.integer(root.getNameSegment().getPath(), TypeProtos.DataMode.OPTIONAL); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java index 5f7a7a4ea91..f9596cc3e0d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,8 +23,7 @@ import io.netty.buffer.DrillBuf; -import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState; -import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor.JsonExceptionProcessingState; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.vector.complex.writer.BaseWriter; public class CountingJsonReader extends BaseJsonProcessor { @@ -54,7 +53,7 @@ public ReadState write(BaseWriter.ComplexWriter writer) throws IOException { // IllegalStateException(String.format("Cannot read from the middle of a record. Current token was %s", // token)); } - writer.rootAsMap().bit("count").writeBit(1); + writer.rootAsMap().bit("count", TypeProtos.DataMode.OPTIONAL).writeBit(1); parser.skipChildren(); } catch (com.fasterxml.jackson.core.JsonParseException ex) { if (ignoreJSONParseError()) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogRecord.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogRecord.java index 27752853c71..36dfd5f1482 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogRecord.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogRecord.java @@ -1,11 +1,13 @@ /* - * Copyright 2015 The Apache Software Foundation. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,6 +24,7 @@ import java.util.Map; import nl.basjes.parse.core.Casts; import nl.basjes.parse.core.Parser; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; import org.apache.drill.exec.vector.complex.writer.BigIntWriter; import org.apache.drill.exec.vector.complex.writer.Float8Writer; @@ -148,7 +151,7 @@ public void setWildcard(final String field, final String value) { if (value != null) { final MapWriter mapWriter = getWildcardWriter(field); LOG.trace("Parsed wildcard field: {}, as string: {}", field, value); - final VarCharWriter w = mapWriter.varChar(cleanExtensions.get(field)); + final VarCharWriter w = mapWriter.varChar(cleanExtensions.get(field), TypeProtos.DataMode.OPTIONAL); writeString(w, value); } } @@ -166,7 +169,7 @@ public void setWildcard(final String field, final Long value) { if (value != null) { final MapWriter mapWriter = getWildcardWriter(field); LOG.trace("Parsed wildcard field: {}, as long: {}", field, value); - final BigIntWriter w = mapWriter.bigInt(cleanExtensions.get(field)); + final BigIntWriter w = mapWriter.bigInt(cleanExtensions.get(field), TypeProtos.DataMode.OPTIONAL); w.writeBigInt(value); } } @@ -184,7 +187,7 @@ public void setWildcard(final String field, final Double value) { if (value != null) { final MapWriter mapWriter = getWildcardWriter(field); LOG.trace("Parsed wildcard field: {}, as double: {}", field, value); - final Float8Writer w = mapWriter.float8(cleanExtensions.get(field)); + final Float8Writer w = mapWriter.float8(cleanExtensions.get(field), TypeProtos.DataMode.OPTIONAL); w.writeFloat8(value); } } @@ -283,17 +286,17 @@ public void addField(final Parser parser, final MapWriter mapWri else if (type.contains(Casts.DOUBLE)) { LOG.debug("Adding DOUBLE parse target: {}, with field name: {}", parserFieldName, drillFieldName); parser.addParseTarget(this.getClass().getMethod("set", String.class, Double.class), parserFieldName); - doubles.put(parserFieldName, mapWriter.float8(drillFieldName)); + doubles.put(parserFieldName, mapWriter.float8(drillFieldName, TypeProtos.DataMode.OPTIONAL)); } else if (type.contains(Casts.LONG)) { LOG.debug("Adding LONG parse target: {}, with field name: {}", parserFieldName, drillFieldName); parser.addParseTarget(this.getClass().getMethod("set", String.class, Long.class), parserFieldName); - longs.put(parserFieldName, mapWriter.bigInt(drillFieldName)); + longs.put(parserFieldName, mapWriter.bigInt(drillFieldName, TypeProtos.DataMode.OPTIONAL)); } else { LOG.debug("Adding STRING parse target: {}, with field name: {}", parserFieldName, drillFieldName); parser.addParseTarget(this.getClass().getMethod("set", String.class, String.class), parserFieldName); - strings.put(parserFieldName, mapWriter.varChar(drillFieldName)); + strings.put(parserFieldName, mapWriter.varChar(drillFieldName, TypeProtos.DataMode.OPTIONAL)); } } -} \ No newline at end of file +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index bb0b65fc545..a4c67794ff1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -98,6 +98,7 @@ public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupS List> implicitColumns = Lists.newArrayList(); Map mapWithMaxColumns = Maps.newLinkedHashMap(); for(RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){ + String readEntryPath = e.getPath(); /* Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file TODO - to prevent reading the footer again in the parquet record reader (it is read earlier in the ParquetStorageEngine) @@ -107,34 +108,34 @@ public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupS */ try { Stopwatch timer = Stopwatch.createUnstarted(); - if (!footers.containsKey(e.getPath())){ + if (!footers.containsKey(readEntryPath)){ timer.start(); - ParquetMetadata footer = ParquetFileReader.readFooter(conf, new Path(e.getPath())); + ParquetMetadata footer = ParquetFileReader.readFooter(conf, new Path(readEntryPath)); long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS); - logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", e.getPath(), "", 0, 0, 0, timeToRead); - footers.put(e.getPath(), footer ); + logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", readEntryPath, "", 0, 0, 0, timeToRead); + footers.put(readEntryPath, footer ); } boolean autoCorrectCorruptDates = rowGroupScan.getFormatConfig().areCorruptDatesAutoCorrected(); - ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(footers.get(e.getPath()), rowGroupScan.getColumns(), - autoCorrectCorruptDates); + ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility + .detectCorruptDates(footers.get(readEntryPath), rowGroupScan.getColumns(), autoCorrectCorruptDates); if (logger.isDebugEnabled()) { logger.debug(containsCorruptDates.toString()); } - if (!context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER) && !isComplex(footers.get(e.getPath()))) { + if (context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER) || isComplex(footers.get(e.getPath()))) { + ParquetMetadata footer = footers.get(readEntryPath); + readers.add(new DrillParquetReader(context, footer, e, columnExplorer.getTableColumns(), fs, containsCorruptDates)); + } else { readers.add( new ParquetRecordReader( - context, e.getPath(), e.getRowGroupIndex(), e.getNumRecordsToRead(), fs, + context, readEntryPath, e.getRowGroupIndex(), e.getNumRecordsToRead(), fs, CodecFactory.createDirectCodecFactory( - fs.getConf(), - new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0), - footers.get(e.getPath()), + fs.getConf(), + new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0), + footers.get(readEntryPath), rowGroupScan.getColumns(), containsCorruptDates ) ); - } else { - ParquetMetadata footer = footers.get(e.getPath()); - readers.add(new DrillParquetReader(context, footer, e, columnExplorer.getTableColumns(), fs, containsCorruptDates)); } Map implicitValues = columnExplorer.populateImplicitColumns(e, rowGroupScan.getSelectionRoot()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTypeHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTypeHelper.java new file mode 100644 index 00000000000..ff296ea66fb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTypeHelper.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet; + +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.Maps; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.DecimalMetadata; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; + +import java.util.Map; + + +/** + * Util class for converting Parquet to Drill data types and vise versa. + */ +public class ParquetTypeHelper { + + private static Map typeMap; + private static ImmutableBiMap modeMap; + private static Map originalTypeMap; + + static { + typeMap = Maps.newHashMap(); + + typeMap.put(TypeProtos.MinorType.TINYINT, PrimitiveTypeName.INT32); + typeMap.put(TypeProtos.MinorType.UINT1, PrimitiveTypeName.INT32); + typeMap.put(TypeProtos.MinorType.UINT2, PrimitiveTypeName.INT32); + typeMap.put(TypeProtos.MinorType.SMALLINT, PrimitiveTypeName.INT32); + typeMap.put(TypeProtos.MinorType.INT, PrimitiveTypeName.INT32); + typeMap.put(TypeProtos.MinorType.UINT4, PrimitiveTypeName.INT32); + typeMap.put(TypeProtos.MinorType.FLOAT4, PrimitiveTypeName.FLOAT); + typeMap.put(TypeProtos.MinorType.TIME, PrimitiveTypeName.INT32); + typeMap.put(TypeProtos.MinorType.INTERVALYEAR, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY); + typeMap.put(TypeProtos.MinorType.DECIMAL9, PrimitiveTypeName.INT32); + typeMap.put(TypeProtos.MinorType.BIGINT, PrimitiveTypeName.INT64); + typeMap.put(TypeProtos.MinorType.UINT8, PrimitiveTypeName.INT64); + typeMap.put(TypeProtos.MinorType.FLOAT8, PrimitiveTypeName.DOUBLE); + typeMap.put(TypeProtos.MinorType.DATE, PrimitiveTypeName.INT32); + typeMap.put(TypeProtos.MinorType.TIMESTAMP, PrimitiveTypeName.INT64); + typeMap.put(TypeProtos.MinorType.DECIMAL18, PrimitiveTypeName.INT64); + typeMap.put(TypeProtos.MinorType.INTERVALDAY, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY); + typeMap.put(TypeProtos.MinorType.INTERVAL, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY); + typeMap.put(TypeProtos.MinorType.DECIMAL28DENSE, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY); + typeMap.put(TypeProtos.MinorType.DECIMAL38DENSE, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY); + typeMap.put(TypeProtos.MinorType.DECIMAL38SPARSE, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY); + typeMap.put(TypeProtos.MinorType.DECIMAL28SPARSE, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY); + typeMap.put(TypeProtos.MinorType.VARBINARY, PrimitiveTypeName.BINARY); + typeMap.put(TypeProtos.MinorType.VARCHAR, PrimitiveTypeName.BINARY); + typeMap.put(TypeProtos.MinorType.VAR16CHAR, PrimitiveTypeName.BINARY); + typeMap.put(TypeProtos.MinorType.BIT, PrimitiveTypeName.BOOLEAN); + + modeMap = new ImmutableBiMap.Builder() + .put(TypeProtos.DataMode.REQUIRED, Repetition.REQUIRED) + .put(TypeProtos.DataMode.OPTIONAL, Repetition.OPTIONAL) + .put(TypeProtos.DataMode.REPEATED, Repetition.REPEATED) + .build(); + + originalTypeMap = Maps.newHashMap(); + + originalTypeMap.put(TypeProtos.MinorType.DECIMAL9, OriginalType.DECIMAL); + originalTypeMap.put(TypeProtos.MinorType.DECIMAL18, OriginalType.DECIMAL); + originalTypeMap.put(TypeProtos.MinorType.DECIMAL28DENSE, OriginalType.DECIMAL); + originalTypeMap.put(TypeProtos.MinorType.DECIMAL38DENSE, OriginalType.DECIMAL); + originalTypeMap.put(TypeProtos.MinorType.DECIMAL38SPARSE, OriginalType.DECIMAL); + originalTypeMap.put(TypeProtos.MinorType.DECIMAL28SPARSE, OriginalType.DECIMAL); + originalTypeMap.put(TypeProtos.MinorType.VARCHAR, OriginalType.UTF8); + originalTypeMap.put(TypeProtos.MinorType.DATE, OriginalType.DATE); + originalTypeMap.put(TypeProtos.MinorType.TIME, OriginalType.TIME_MILLIS); + originalTypeMap.put(TypeProtos.MinorType.TIMESTAMP, OriginalType.TIMESTAMP_MILLIS); + originalTypeMap.put(TypeProtos.MinorType.INTERVALDAY, OriginalType.INTERVAL); + originalTypeMap.put(TypeProtos.MinorType.INTERVALYEAR, OriginalType.INTERVAL); + originalTypeMap.put(TypeProtos.MinorType.INTERVAL, OriginalType.INTERVAL); + } + + /** + * @param minorType Drill minor type + * @return Parquet primitive type in correspondence to Drill minor type + */ + public static PrimitiveTypeName getPrimitiveTypeNameForMinorType(TypeProtos.MinorType minorType) { + return typeMap.get(minorType); + } + + /** + * @param dataMode Drill data mode + * @return Parquet repetition type in correspondence to Drill data mode type + */ + public static Repetition getRepetitionForDataMode(TypeProtos.DataMode dataMode) { + return modeMap.get(dataMode); + } + + /** + * @param repetition Parquet repetition type + * @return Drill data mode type in correspondence to to Parquet repetition type + */ + public static TypeProtos.DataMode getDataModeForRepetition(Repetition repetition) { + return modeMap.inverse().get(repetition); + } + + /** + * @param minorType Drill minor type + * @return // Parquet original type in correspondence to Drill minor type + */ + public static OriginalType getOriginalTypeForMinorType(TypeProtos.MinorType minorType) { + return originalTypeMap.get(minorType); + } + + /** + * @param field Drill materialized field + * @return Parquet decimal metadata for Drill materialized field + */ + public static DecimalMetadata getDecimalMetadataForField(MaterializedField field) { + switch(field.getType().getMinorType()) { + case DECIMAL9: + case DECIMAL18: + case DECIMAL28SPARSE: + case DECIMAL28DENSE: + case DECIMAL38SPARSE: + case DECIMAL38DENSE: + return new DecimalMetadata(field.getPrecision(), field.getScale()); + default: + return null; + } + } + + /** + * @param minorType Drill minor type + * @return length for minor types in bytes + */ + public static int getLengthForMinorType(TypeProtos.MinorType minorType) { + switch(minorType) { + case INTERVALDAY: + case INTERVALYEAR: + case INTERVAL: + return 12; + case DECIMAL28SPARSE: + return 12; + case DECIMAL38SPARSE: + return 16; + default: + return 0; + } + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java index 495f70bc524..3a2a994908b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -14,9 +14,10 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - ******************************************************************************/ + **/ package org.apache.drill.exec.store.parquet.columnreaders; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; @@ -206,6 +207,12 @@ static VarLengthValuesColumn getReader(ParquetRecordReader parentReader, int SchemaElement schemaElement ) throws ExecutionSetupException { ConvertedType convertedType = schemaElement.getConverted_type(); + if (descriptor.getMaxRepetitionLevel() > 0) { + // TODO: Implement reading complex data types by default ParquetRecordReader. + // Shouldn't be reached until reading complex data types by regular parquet reader will be implemented. + throw new DrillRuntimeException("ParquetRecordReader doesn't support complex data types. Please set " + + "`store.parquet.use_new_reader` = true to use DrillParquetReader"); + } switch (descriptor.getMaxDefinitionLevel()) { case 0: if (convertedType == null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java index 5c7c8e112fe..d977f4dfe35 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,6 +28,7 @@ import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.expr.holders.BigIntHolder; import org.apache.drill.exec.expr.holders.BitHolder; @@ -47,6 +48,7 @@ import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; +import org.apache.drill.exec.store.parquet.ParquetTypeHelper; import org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata; import org.apache.drill.exec.util.DecimalUtility; import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl; @@ -70,7 +72,6 @@ import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.PrimitiveConverter; -import org.apache.parquet.schema.DecimalMetadata; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; @@ -91,23 +92,26 @@ public class DrillParquetGroupConverter extends GroupConverter { private final OptionManager options; // See DRILL-4203 private final ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates; + private boolean isParentNestedTypeOptional; public DrillParquetGroupConverter(OutputMutator mutator, ComplexWriterImpl complexWriter, MessageType schema, Collection columns, OptionManager options, ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) { - this(mutator, complexWriter.rootAsMap(), schema, columns, options, containsCorruptedDates); + this(mutator, complexWriter.rootAsMap(), schema, columns, options, containsCorruptedDates, false); } // This function assumes that the fields in the schema parameter are in the same order as the fields in the columns parameter. The // columns parameter may have fields that are not present in the schema, though. public DrillParquetGroupConverter(OutputMutator mutator, MapWriter mapWriter, GroupType schema, Collection columns, OptionManager options, - ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) { + ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates, + boolean isParentNestedTypeOptional) { this.mapWriter = mapWriter; this.mutator = mutator; this.containsCorruptedDates = containsCorruptedDates; converters = Lists.newArrayList(); this.options = options; + this.isParentNestedTypeOptional = isParentNestedTypeOptional; Iterator colIterator=columns.iterator(); @@ -139,6 +143,9 @@ public DrillParquetGroupConverter(OutputMutator mutator, MapWriter mapWriter, Gr } if (!isPrimitive) { + if (rep == Repetition.OPTIONAL) { + isParentNestedTypeOptional = true; + } Collection c = new ArrayList(); while(colNextChild!=null) { @@ -153,12 +160,12 @@ public DrillParquetGroupConverter(OutputMutator mutator, MapWriter mapWriter, Gr c.add(s); } if (rep != Repetition.REPEATED) { - DrillParquetGroupConverter converter = new DrillParquetGroupConverter( - mutator, mapWriter.map(name), type.asGroupType(), c, options, containsCorruptedDates); + DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.map(name), + type.asGroupType(), c, options, containsCorruptedDates, isParentNestedTypeOptional); converters.add(converter); } else { - DrillParquetGroupConverter converter = new DrillParquetGroupConverter( - mutator, mapWriter.list(name).map(), type.asGroupType(), c, options, containsCorruptedDates); + DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.list(name).map(), + type.asGroupType(), c, options, containsCorruptedDates, isParentNestedTypeOptional); converters.add(converter); } } else { @@ -171,22 +178,33 @@ public DrillParquetGroupConverter(OutputMutator mutator, MapWriter mapWriter, Gr @SuppressWarnings("resource") private PrimitiveConverter getConverterForType(String name, PrimitiveType type) { + TypeProtos.DataMode dataMode = ParquetTypeHelper.getDataModeForRepetition(type.getRepetition()); + // replace the non-nullable vectors to nullable for the case of nullable parent nested type field + if (dataMode == TypeProtos.DataMode.REQUIRED && isParentNestedTypeOptional) { + dataMode = TypeProtos.DataMode.OPTIONAL; + } switch(type.getPrimitiveTypeName()) { case INT32: { if (type.getOriginalType() == null) { - IntWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).integer() : mapWriter.integer(name); + IntWriter writer = dataMode == TypeProtos.DataMode.REPEATED + ? mapWriter.list(name).integer() + : mapWriter.integer(name, dataMode); return new DrillIntConverter(writer); } switch(type.getOriginalType()) { case DECIMAL: { ParquetReaderUtility.checkDecimalTypeEnabled(options); - Decimal9Writer writer = type.getRepetition() == Repetition.REPEATED + int scale = type.getDecimalMetadata().getScale(); + int precision = type.getDecimalMetadata().getPrecision(); + Decimal9Writer writer = dataMode == TypeProtos.DataMode.REPEATED ? mapWriter.list(name).decimal9() - : mapWriter.decimal9(name, type.getDecimalMetadata().getScale(), type.getDecimalMetadata().getPrecision()); - return new DrillDecimal9Converter(writer, type.getDecimalMetadata().getPrecision(), type.getDecimalMetadata().getScale()); + : mapWriter.decimal9(name, dataMode, scale, precision); + return new DrillDecimal9Converter(writer, precision, scale); } case DATE: { - DateWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).date() : mapWriter.date(name); + DateWriter writer = dataMode == TypeProtos.DataMode.REPEATED + ? mapWriter.list(name).date() + : mapWriter.date(name, dataMode); switch(containsCorruptedDates) { case META_SHOWS_CORRUPTION: return new DrillCorruptedDateConverter(writer); @@ -202,7 +220,9 @@ private PrimitiveConverter getConverterForType(String name, PrimitiveType type) } } case TIME_MILLIS: { - TimeWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).time() : mapWriter.time(name); + TimeWriter writer = dataMode == TypeProtos.DataMode.REPEATED + ? mapWriter.list(name).time() + : mapWriter.time(name, dataMode); return new DrillTimeConverter(writer); } default: { @@ -212,19 +232,25 @@ private PrimitiveConverter getConverterForType(String name, PrimitiveType type) } case INT64: { if (type.getOriginalType() == null) { - BigIntWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).bigInt() : mapWriter.bigInt(name); + BigIntWriter writer = dataMode == TypeProtos.DataMode.REPEATED + ? mapWriter.list(name).bigInt() + : mapWriter.bigInt(name, dataMode); return new DrillBigIntConverter(writer); } switch(type.getOriginalType()) { case DECIMAL: { ParquetReaderUtility.checkDecimalTypeEnabled(options); - Decimal18Writer writer = type.getRepetition() == Repetition.REPEATED + int scale = type.getDecimalMetadata().getScale(); + int precision = type.getDecimalMetadata().getPrecision(); + Decimal18Writer writer = dataMode == TypeProtos.DataMode.REPEATED ? mapWriter.list(name).decimal18() - : mapWriter.decimal18(name, type.getDecimalMetadata().getScale(), type.getDecimalMetadata().getPrecision()); - return new DrillDecimal18Converter(writer, type.getDecimalMetadata().getPrecision(), type.getDecimalMetadata().getScale()); + : mapWriter.decimal18(name, dataMode, scale, precision); + return new DrillDecimal18Converter(writer, precision, scale); } case TIMESTAMP_MILLIS: { - TimeStampWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).timeStamp() : mapWriter.timeStamp(name); + TimeStampWriter writer = dataMode == TypeProtos.DataMode.REPEATED + ? mapWriter.list(name).timeStamp() + : mapWriter.timeStamp(name, dataMode); return new DrillTimeStampConverter(writer); } default: { @@ -233,54 +259,70 @@ private PrimitiveConverter getConverterForType(String name, PrimitiveType type) } } case INT96: { - // TODO: replace null with TIMESTAMP_NANOS once parquet support such type annotation. + // TODO: replace null with TIMESTAMP_NANOS once parquet will support such type annotation. if (type.getOriginalType() == null) { if (options.getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) { - TimeStampWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).timeStamp() : mapWriter.timeStamp(name); + TimeStampWriter writer = dataMode == TypeProtos.DataMode.REPEATED + ? mapWriter.list(name).timeStamp() + : mapWriter.timeStamp(name, dataMode); return new DrillFixedBinaryToTimeStampConverter(writer); } else { - VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varBinary() : mapWriter.varBinary(name); - return new DrillFixedBinaryToVarbinaryConverter(writer, ParquetColumnMetadata.getTypeLengthInBits(type.getPrimitiveTypeName()) / 8, mutator.getManagedBuffer()); + VarBinaryWriter writer = dataMode == TypeProtos.DataMode.REPEATED + ? mapWriter.list(name).varBinary() + : mapWriter.varBinary(name, dataMode); + return new DrillFixedBinaryToVarbinaryConverter(writer, + ParquetColumnMetadata.getTypeLengthInBits(type.getPrimitiveTypeName()) / 8, mutator.getManagedBuffer()); } } } case FLOAT: { - Float4Writer writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).float4() : mapWriter.float4(name); + Float4Writer writer = dataMode == TypeProtos.DataMode.REPEATED + ? mapWriter.list(name).float4() + : mapWriter.float4(name, dataMode); return new DrillFloat4Converter(writer); } case DOUBLE: { - Float8Writer writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).float8() : mapWriter.float8(name); + Float8Writer writer = dataMode == TypeProtos.DataMode.REPEATED + ? mapWriter.list(name).float8() + : mapWriter.float8(name, dataMode); return new DrillFloat8Converter(writer); } case BOOLEAN: { - BitWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).bit() : mapWriter.bit(name); + BitWriter writer = dataMode == TypeProtos.DataMode.REPEATED + ? mapWriter.list(name).bit() + : mapWriter.bit(name, dataMode); return new DrillBoolConverter(writer); } case BINARY: { if (type.getOriginalType() == null) { - VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varBinary() : mapWriter.varBinary(name); + VarBinaryWriter writer = dataMode == TypeProtos.DataMode.REPEATED + ? mapWriter.list(name).varBinary() + : mapWriter.varBinary(name, dataMode); return new DrillVarBinaryConverter(writer, mutator.getManagedBuffer()); } switch(type.getOriginalType()) { case UTF8: { - VarCharWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varChar() : mapWriter.varChar(name); + VarCharWriter writer = dataMode == TypeProtos.DataMode.REPEATED + ? mapWriter.list(name).varChar() + : mapWriter.varChar(name, dataMode); return new DrillVarCharConverter(writer, mutator.getManagedBuffer()); } //TODO not sure if BINARY/DECIMAL is actually supported case DECIMAL: { ParquetReaderUtility.checkDecimalTypeEnabled(options); - DecimalMetadata metadata = type.getDecimalMetadata(); - if (metadata.getPrecision() <= 28) { - Decimal28SparseWriter writer = type.getRepetition() == Repetition.REPEATED + int scale = type.getDecimalMetadata().getScale(); + int precision = type.getDecimalMetadata().getPrecision(); + if (precision <= 28) { + Decimal28SparseWriter writer = dataMode == TypeProtos.DataMode.REPEATED ? mapWriter.list(name).decimal28Sparse() - : mapWriter.decimal28Sparse(name, metadata.getScale(), metadata.getPrecision()); - return new DrillBinaryToDecimal28Converter(writer, metadata.getPrecision(), metadata.getScale(), mutator.getManagedBuffer()); + : mapWriter.decimal28Sparse(name, dataMode, scale, precision); + return new DrillBinaryToDecimal28Converter(writer, precision, scale, mutator.getManagedBuffer()); } else { - Decimal38SparseWriter writer = type.getRepetition() == Repetition.REPEATED + Decimal38SparseWriter writer = dataMode == TypeProtos.DataMode.REPEATED ? mapWriter.list(name).decimal38Sparse() - : mapWriter.decimal38Sparse(name, metadata.getScale(), metadata.getPrecision()); - return new DrillBinaryToDecimal38Converter(writer, metadata.getPrecision(), metadata.getScale(), mutator.getManagedBuffer()); + : mapWriter.decimal38Sparse(name, dataMode, scale, precision); + return new DrillBinaryToDecimal38Converter(writer, precision, scale, mutator.getManagedBuffer()); } } default: { @@ -291,25 +333,29 @@ private PrimitiveConverter getConverterForType(String name, PrimitiveType type) case FIXED_LEN_BYTE_ARRAY: if (type.getOriginalType() == OriginalType.DECIMAL) { ParquetReaderUtility.checkDecimalTypeEnabled(options); - DecimalMetadata metadata = type.getDecimalMetadata(); - if (metadata.getPrecision() <= 28) { - Decimal28SparseWriter writer = type.getRepetition() == Repetition.REPEATED + int scale = type.getDecimalMetadata().getScale(); + int precision = type.getDecimalMetadata().getPrecision(); + if (precision <= 28) { + Decimal28SparseWriter writer = dataMode == TypeProtos.DataMode.REPEATED ? mapWriter.list(name).decimal28Sparse() - : mapWriter.decimal28Sparse(name, metadata.getScale(), metadata.getPrecision()); - return new DrillBinaryToDecimal28Converter(writer, metadata.getPrecision(), metadata.getScale(), mutator.getManagedBuffer()); + : mapWriter.decimal28Sparse(name, dataMode, scale, precision); + return new DrillBinaryToDecimal28Converter(writer, precision, scale, mutator.getManagedBuffer()); } else { - Decimal38SparseWriter writer = type.getRepetition() == Repetition.REPEATED + Decimal38SparseWriter writer = dataMode == TypeProtos.DataMode.REPEATED ? mapWriter.list(name).decimal38Sparse() - : mapWriter.decimal38Sparse(name, metadata.getScale(), metadata.getPrecision()); - return new DrillBinaryToDecimal38Converter(writer, metadata.getPrecision(), metadata.getScale(), mutator.getManagedBuffer()); + : mapWriter.decimal38Sparse(name, dataMode, scale, precision); + return new DrillBinaryToDecimal38Converter(writer, precision, scale, mutator.getManagedBuffer()); } } else if (type.getOriginalType() == OriginalType.INTERVAL) { - IntervalWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).interval() - : mapWriter.interval(name); + IntervalWriter writer = dataMode == TypeProtos.DataMode.REPEATED + ? mapWriter.list(name).interval() + : mapWriter.interval(name, dataMode); return new DrillFixedLengthByteArrayToInterval(writer); } else { - VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varBinary() : mapWriter.varBinary(name); + VarBinaryWriter writer = dataMode == TypeProtos.DataMode.REPEATED + ? mapWriter.list(name).varBinary() + : mapWriter.varBinary(name, dataMode); return new DrillFixedBinaryToVarbinaryConverter(writer, type.getTypeLength(), mutator.getManagedBuffer()); } default: diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java index f6ffdbd46bf..ad0037c4298 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -37,7 +37,8 @@ public DrillParquetRecordMaterializer(OutputMutator mutator, ComplexWriter compl Collection columns, OptionManager options, ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) { this.complexWriter = complexWriter; - root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), schema, columns, options, containsCorruptedDates); + root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), schema, columns, options, + containsCorruptedDates, false); } public void setPosition(int position) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java index 72c094a83e3..a463053eb40 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java @@ -50,161 +50,161 @@ public static void writeToMapFromReader(FieldReader fieldReader, BaseWriter.MapW if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).tinyInt()); } else { - fieldReader.copyAsValue(mapWriter.tinyInt(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.tinyInt(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case SMALLINT: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).smallInt()); } else { - fieldReader.copyAsValue(mapWriter.smallInt(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.smallInt(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case BIGINT: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).bigInt()); } else { - fieldReader.copyAsValue(mapWriter.bigInt(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.bigInt(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case INT: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).integer()); } else { - fieldReader.copyAsValue(mapWriter.integer(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.integer(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case UINT1: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).uInt1()); } else { - fieldReader.copyAsValue(mapWriter.uInt1(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.uInt1(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case UINT2: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).uInt2()); } else { - fieldReader.copyAsValue(mapWriter.uInt2(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.uInt2(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case UINT4: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).uInt4()); } else { - fieldReader.copyAsValue(mapWriter.uInt4(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.uInt4(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case UINT8: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).uInt8()); } else { - fieldReader.copyAsValue(mapWriter.uInt8(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.uInt8(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case DECIMAL9: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).decimal9()); } else { - fieldReader.copyAsValue(mapWriter.decimal9(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.decimal9(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case DECIMAL18: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).decimal18()); } else { - fieldReader.copyAsValue(mapWriter.decimal18(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.decimal18(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case DECIMAL28SPARSE: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).decimal28Sparse()); } else { - fieldReader.copyAsValue(mapWriter.decimal28Sparse(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.decimal28Sparse(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case DECIMAL38SPARSE: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).decimal38Sparse()); } else { - fieldReader.copyAsValue(mapWriter.decimal38Sparse(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.decimal38Sparse(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case DATE: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).date()); } else { - fieldReader.copyAsValue(mapWriter.date(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.date(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case TIME: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).time()); } else { - fieldReader.copyAsValue(mapWriter.time(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.time(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case TIMESTAMP: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).timeStamp()); } else { - fieldReader.copyAsValue(mapWriter.timeStamp(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.timeStamp(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case INTERVAL: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).interval()); } else { - fieldReader.copyAsValue(mapWriter.interval(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.interval(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case INTERVALDAY: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).intervalDay()); } else { - fieldReader.copyAsValue(mapWriter.intervalDay(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.intervalDay(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case INTERVALYEAR: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).intervalYear()); } else { - fieldReader.copyAsValue(mapWriter.intervalYear(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.intervalYear(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case FLOAT4: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).float4()); } else { - fieldReader.copyAsValue(mapWriter.float4(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.float4(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case FLOAT8: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).float8()); } else { - fieldReader.copyAsValue(mapWriter.float8(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.float8(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case BIT: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).bit()); } else { - fieldReader.copyAsValue(mapWriter.bit(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.bit(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case VARCHAR: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).varChar()); } else { - fieldReader.copyAsValue(mapWriter.varChar(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.varChar(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case VARBINARY: if (repeated) { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).varBinary()); } else { - fieldReader.copyAsValue(mapWriter.varBinary(MappifyUtility.fieldValue)); + fieldReader.copyAsValue(mapWriter.varBinary(MappifyUtility.fieldValue, fieldReader.getType().getMode())); } break; case MAP: diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java index 4ffbb26e01e..75b343903b8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,17 +21,15 @@ import java.io.IOException; import java.io.InputStream; -import java.util.BitSet; import java.util.List; import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor; import org.apache.drill.exec.vector.complex.fn.VectorOutput.ListVectorOutput; import org.apache.drill.exec.vector.complex.fn.VectorOutput.MapVectorOutput; -import org.apache.drill.exec.vector.complex.writer.BaseWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; @@ -333,24 +331,24 @@ private void writeData(MapWriter map, FieldSelection selection, break outside; case VALUE_FALSE: { - map.bit(fieldName).writeBit(0); + map.bit(fieldName, TypeProtos.DataMode.OPTIONAL).writeBit(0); break; } case VALUE_TRUE: { - map.bit(fieldName).writeBit(1); + map.bit(fieldName, TypeProtos.DataMode.OPTIONAL).writeBit(1); break; } case VALUE_NULL: // do nothing as we don't have a type. break; case VALUE_NUMBER_FLOAT: - map.float8(fieldName).writeFloat8(parser.getDoubleValue()); + map.float8(fieldName, TypeProtos.DataMode.OPTIONAL).writeFloat8(parser.getDoubleValue()); break; case VALUE_NUMBER_INT: if (this.readNumbersAsDouble) { - map.float8(fieldName).writeFloat8(parser.getDoubleValue()); + map.float8(fieldName, TypeProtos.DataMode.OPTIONAL).writeFloat8(parser.getDoubleValue()); } else { - map.bigInt(fieldName).writeBigInt(parser.getLongValue()); + map.bigInt(fieldName, TypeProtos.DataMode.OPTIONAL).writeBigInt(parser.getLongValue()); } break; case VALUE_STRING: @@ -471,7 +469,7 @@ private boolean writeListDataIfTyped(ListWriter writer) throws IOException { private void handleString(JsonParser parser, MapWriter writer, String fieldName) throws IOException { - writer.varChar(fieldName).writeVarChar(0, + writer.varChar(fieldName, TypeProtos.DataMode.OPTIONAL).writeVarChar(0, workingBuffer.prepareVarCharHolder(parser.getText()), workingBuffer.getBuf()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java index 775be023783..20e8efe42b2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,6 +20,7 @@ import com.google.common.collect.Lists; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.vector.complex.writer.BaseWriter; import java.util.BitSet; @@ -73,9 +74,9 @@ public static void ensureAtLeastOneField(BaseWriter.ComplexWriter writer, PathSegment fieldPath = fieldPathList.get(j); if (emptyStatus.get(j)) { if (allTextMode) { - fieldWriter.varChar(fieldPath.getNameSegment().getPath()); + fieldWriter.varChar(fieldPath.getNameSegment().getPath(), TypeProtos.DataMode.OPTIONAL); } else { - fieldWriter.integer(fieldPath.getNameSegment().getPath()); + fieldWriter.integer(fieldPath.getNameSegment().getPath(), TypeProtos.DataMode.OPTIONAL); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java index bf1448e27f4..407b376cb84 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,6 +20,7 @@ import java.io.IOException; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.expr.fn.impl.DateUtility; import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers; import org.apache.drill.exec.expr.holders.BigIntHolder; @@ -297,7 +298,7 @@ public boolean run(MapWriter writer, String fieldName) throws IOException{ @Override public void writeBinary(boolean isNull) throws IOException { - VarBinaryWriter bin = writer.varBinary(fieldName); + VarBinaryWriter bin = writer.varBinary(fieldName, TypeProtos.DataMode.OPTIONAL); if(!isNull){ byte[] binaryData = parser.getBinaryValue(); if (hasType()) { @@ -316,7 +317,7 @@ public void writeBinary(boolean isNull) throws IOException { @Override public void writeDate(boolean isNull) throws IOException { - DateWriter dt = writer.date(fieldName); + DateWriter dt = writer.date(fieldName, TypeProtos.DataMode.OPTIONAL); if(!isNull){ DateTimeFormatter f = ISODateTimeFormat.date(); DateTime date = f.parseDateTime(parser.getValueAsString()); @@ -326,7 +327,7 @@ public void writeDate(boolean isNull) throws IOException { @Override public void writeTime(boolean isNull) throws IOException { - TimeWriter t = writer.time(fieldName); + TimeWriter t = writer.time(fieldName, TypeProtos.DataMode.OPTIONAL); if(!isNull){ DateTimeFormatter f = ISODateTimeFormat.time(); t.writeTime((int) ((f.parseDateTime(parser.getValueAsString())).withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis())); @@ -335,7 +336,7 @@ public void writeTime(boolean isNull) throws IOException { @Override public void writeTimestamp(boolean isNull) throws IOException { - TimeStampWriter ts = writer.timeStamp(fieldName); + TimeStampWriter ts = writer.timeStamp(fieldName, TypeProtos.DataMode.OPTIONAL); if(!isNull){ switch (parser.getCurrentToken()) { case VALUE_NUMBER_INT: @@ -356,7 +357,7 @@ public void writeTimestamp(boolean isNull) throws IOException { @Override public void writeInterval(boolean isNull) throws IOException { - IntervalWriter intervalWriter = writer.interval(fieldName); + IntervalWriter intervalWriter = writer.interval(fieldName, TypeProtos.DataMode.OPTIONAL); if(!isNull){ final Period p = ISOPeriodFormat.standard().parsePeriod(parser.getValueAsString()); int months = DateUtility.monthsFromPeriod(p); @@ -368,7 +369,7 @@ public void writeInterval(boolean isNull) throws IOException { @Override public void writeInteger(boolean isNull) throws IOException { - BigIntWriter intWriter = writer.bigInt(fieldName); + BigIntWriter intWriter = writer.bigInt(fieldName, TypeProtos.DataMode.OPTIONAL); if(!isNull){ intWriter.writeBigInt(Long.parseLong(parser.getValueAsString())); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java index 7c148cb2a66..a45134959b2 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java @@ -17,13 +17,24 @@ */ package org.apache.drill.exec.store.parquet; +import static org.apache.drill.test.TestBuilder.mapOf; + import org.apache.drill.test.BaseTestQuery; +import org.junit.BeforeClass; import org.junit.Test; +import java.nio.file.Paths; + public class TestParquetComplex extends BaseTestQuery { private static final String DATAFILE = "cp.`store/parquet/complex/complex.parquet`"; + @BeforeClass + public static void setupTestFiles() { + dirTestWatcher.copyResourceToRoot(Paths.get("store", "parquet", "complex", "files_with_complex_and_primitive_types")); + dirTestWatcher.copyResourceToRoot(Paths.get("store", "parquet", "complex", "nested_types.parquet")); + } + @Test public void sort() throws Exception { String query = String.format("select * from %s order by amount", DATAFILE); @@ -193,4 +204,36 @@ public void notxistsField() throws Exception { .run(); } + @Test //DRILL-5970 + public void testParquetComplexTypesAggregation() throws Exception { + final String query = "select bucket, count(*) as number from " + + "dfs.`store/parquet/complex/files_with_complex_and_primitive_types` group by bucket"; + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("bucket", "number") + .baselineValues("Bucket1", 2L) + .build() + .run(); + } + + @Test //DRILL-5970 + public void testNestedOptionalRequiredTypes() throws Exception { + final String query = "select t.`map_field`.`map`[0] as optional_map from dfs.`store/parquet/complex/nested_types.parquet` t"; + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("optional_map") + .baselineValues(mapOf()) + .baselineValues(mapOf("key", "Google", "value", 1L)) + .baselineValues(mapOf()) + .baselineValues(mapOf("key", "WhatsApp", "value", 2L)) + .baselineValues(mapOf()) + .baselineValues(mapOf("key", "Facebook", "value", 3L)) + .build() + .run(); + } + } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java index 223f4edb085..38250ce7b3b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,6 +18,7 @@ package org.apache.drill.exec.vector.complex.writer; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.RootAllocatorFactory; import org.apache.drill.exec.store.TestOutputMutator; @@ -39,11 +40,11 @@ public void list() throws Exception { rootWriter.setPosition(0); { - writer.map("map").bigInt("a").writeBigInt(1); + writer.map("map").bigInt("a", TypeProtos.DataMode.OPTIONAL).writeBigInt(1); } rootWriter.setPosition(1); { - writer.map("map").float4("a").writeFloat4(2.0f); + writer.map("map").float4("a", TypeProtos.DataMode.OPTIONAL).writeFloat4(2.0f); } rootWriter.setPosition(2); { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java index bd4731a61a4..488fe20170c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,6 +21,7 @@ import org.apache.drill.common.DrillAutoCloseables; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.expr.holders.BigIntHolder; import org.apache.drill.exec.expr.holders.IntHolder; import org.apache.drill.exec.memory.BufferAllocator; @@ -166,22 +167,22 @@ public void listOfList() throws Exception { innerList.endList(); list.endList(); - final IntWriter numCol = map.integer("nums"); + final IntWriter numCol = map.integer("nums", TypeProtos.DataMode.OPTIONAL); holder.value = 14; numCol.write(holder); final MapWriter repeatedMap = map.list("b").map(); repeatedMap.start(); holder.value = 1; - repeatedMap.integer("c").write(holder); + repeatedMap.integer("c", TypeProtos.DataMode.OPTIONAL).write(holder); repeatedMap.end(); repeatedMap.start(); holder.value = 2; - repeatedMap.integer("c").write(holder); + repeatedMap.integer("c", TypeProtos.DataMode.OPTIONAL).write(holder); final BigIntHolder h = new BigIntHolder(); h.value = 15; - repeatedMap.bigInt("x").write(h); + repeatedMap.bigInt("x", TypeProtos.DataMode.OPTIONAL).write(h); repeatedMap.end(); map.end(); @@ -219,22 +220,22 @@ public void listOfList() throws Exception { innerList.endList(); list.endList(); - final IntWriter numCol = map.integer("nums"); + final IntWriter numCol = map.integer("nums", TypeProtos.DataMode.OPTIONAL); holder.value = -28; numCol.write(holder); final MapWriter repeatedMap = map.list("b").map(); repeatedMap.start(); holder.value = -1; - repeatedMap.integer("c").write(holder); + repeatedMap.integer("c", TypeProtos.DataMode.OPTIONAL).write(holder); repeatedMap.end(); repeatedMap.start(); holder.value = -2; - repeatedMap.integer("c").write(holder); + repeatedMap.integer("c", TypeProtos.DataMode.OPTIONAL).write(holder); final BigIntHolder h = new BigIntHolder(); h.value = -30; - repeatedMap.bigInt("x").write(h); + repeatedMap.bigInt("x", TypeProtos.DataMode.OPTIONAL).write(h); repeatedMap.end(); map.end(); diff --git a/exec/java-exec/src/test/resources/store/parquet/complex/files_with_complex_and_primitive_types/0_0_0.parquet b/exec/java-exec/src/test/resources/store/parquet/complex/files_with_complex_and_primitive_types/0_0_0.parquet new file mode 100644 index 00000000000..4bbed993de7 Binary files /dev/null and b/exec/java-exec/src/test/resources/store/parquet/complex/files_with_complex_and_primitive_types/0_0_0.parquet differ diff --git a/exec/java-exec/src/test/resources/store/parquet/complex/files_with_complex_and_primitive_types/0_0_1.parquet b/exec/java-exec/src/test/resources/store/parquet/complex/files_with_complex_and_primitive_types/0_0_1.parquet new file mode 100644 index 00000000000..4376e521fce Binary files /dev/null and b/exec/java-exec/src/test/resources/store/parquet/complex/files_with_complex_and_primitive_types/0_0_1.parquet differ diff --git a/exec/java-exec/src/test/resources/store/parquet/complex/nested_types.parquet b/exec/java-exec/src/test/resources/store/parquet/complex/nested_types.parquet new file mode 100644 index 00000000000..fd2458c6085 Binary files /dev/null and b/exec/java-exec/src/test/resources/store/parquet/complex/nested_types.parquet differ diff --git a/exec/vector/src/main/codegen/includes/vv_imports.ftl b/exec/vector/src/main/codegen/includes/vv_imports.ftl index efca346577a..c388fa1a7ac 100644 --- a/exec/vector/src/main/codegen/includes/vv_imports.ftl +++ b/exec/vector/src/main/codegen/includes/vv_imports.ftl @@ -33,6 +33,7 @@ import org.apache.drill.exec.vector.*; import org.apache.drill.common.exceptions.*; import org.apache.drill.exec.exception.*; import org.apache.drill.exec.expr.holders.*; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.*; import org.apache.drill.common.types.Types; import org.apache.drill.common.util.DrillStringUtils; diff --git a/exec/vector/src/main/codegen/templates/AbstractFieldWriter.java b/exec/vector/src/main/codegen/templates/AbstractFieldWriter.java index 7ab5dcef987..1b16a641787 100644 --- a/exec/vector/src/main/codegen/templates/AbstractFieldWriter.java +++ b/exec/vector/src/main/codegen/templates/AbstractFieldWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -113,14 +113,14 @@ public ListWriter list(String name) { <#assign upperName = minor.class?upper_case /> <#assign capName = minor.class?cap_first /> <#if minor.class?starts_with("Decimal") > - public ${capName}Writer ${lowerName}(String name, int scale, int precision) { + public ${capName}Writer ${lowerName}(String name, TypeProtos.DataMode dataMode, int scale, int precision) { fail("${capName}"); return null; } @Override - public ${capName}Writer ${lowerName}(String name) { + public ${capName}Writer ${lowerName}(String name, TypeProtos.DataMode dataMode) { fail("${capName}"); return null; } diff --git a/exec/vector/src/main/codegen/templates/AbstractPromotableFieldWriter.java b/exec/vector/src/main/codegen/templates/AbstractPromotableFieldWriter.java index f2f81934ce5..f67b4874bb2 100644 --- a/exec/vector/src/main/codegen/templates/AbstractPromotableFieldWriter.java +++ b/exec/vector/src/main/codegen/templates/AbstractPromotableFieldWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -120,8 +120,8 @@ public ListWriter list(String name) { <#if !minor.class?starts_with("Decimal") > @Override - public ${capName}Writer ${lowerName}(String name) { - return getWriter(MinorType.MAP).${lowerName}(name); + public ${capName}Writer ${lowerName}(String name, TypeProtos.DataMode dataMode) { + return getWriter(MinorType.MAP).${lowerName}(name, dataMode); } @Override diff --git a/exec/vector/src/main/codegen/templates/BaseWriter.java b/exec/vector/src/main/codegen/templates/BaseWriter.java index f2c6e2227b1..df01419d2cd 100644 --- a/exec/vector/src/main/codegen/templates/BaseWriter.java +++ b/exec/vector/src/main/codegen/templates/BaseWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -52,12 +52,11 @@ public interface MapWriter extends BaseWriter { <#list vv.types as type><#list type.minor as minor> <#assign lowerName = minor.class?uncap_first /> <#if lowerName == "int" ><#assign lowerName = "integer" /> - <#assign upperName = minor.class?upper_case /> <#assign capName = minor.class?cap_first /> <#if minor.class?starts_with("Decimal") > - ${capName}Writer ${lowerName}(String name, int scale, int precision); + ${capName}Writer ${lowerName}(String name, TypeProtos.DataMode dataMode, int scale, int precision); - ${capName}Writer ${lowerName}(String name); + ${capName}Writer ${lowerName}(String name, TypeProtos.DataMode dataMode); void copyReaderToField(String name, FieldReader reader); @@ -77,14 +76,23 @@ public interface ListWriter extends BaseWriter { <#list vv.types as type><#list type.minor as minor> <#assign lowerName = minor.class?uncap_first /> <#if lowerName == "int" ><#assign lowerName = "integer" /> - <#assign upperName = minor.class?upper_case /> <#assign capName = minor.class?cap_first /> ${capName}Writer ${lowerName}(); } public interface ScalarWriter extends - <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> ${name}Writer, BaseWriter {} + <#list vv.types as type><#list type.minor as minor><#list ["Required", "Optional"] as scalarMode> + <#if scalarMode == "Optional"> + <#assign name = "Nullable${minor.class?cap_first}" /> ${name}Writer, + <#else> + <#assign name = minor.class?cap_first /> ${name}Writer, + + + + + + BaseWriter {} public interface ComplexWriter { void allocate(); @@ -106,37 +114,37 @@ public interface MapOrListWriter { MapOrListWriter list(String name); boolean isMapWriter(); boolean isListWriter(); - UInt1Writer uInt1(String name); - UInt2Writer uInt2(String name); - UInt4Writer uInt4(String name); - UInt8Writer uInt8(String name); - VarCharWriter varChar(String name); - Var16CharWriter var16Char(String name); - TinyIntWriter tinyInt(String name); - SmallIntWriter smallInt(String name); - IntWriter integer(String name); - BigIntWriter bigInt(String name); - Float4Writer float4(String name); - Float8Writer float8(String name); - BitWriter bit(String name); - VarBinaryWriter varBinary(String name); + UInt1Writer uInt1(String name, TypeProtos.DataMode dataMode); + UInt2Writer uInt2(String name, TypeProtos.DataMode dataMode); + UInt4Writer uInt4(String name, TypeProtos.DataMode dataMode); + UInt8Writer uInt8(String name, TypeProtos.DataMode dataMode); + VarCharWriter varChar(String name, TypeProtos.DataMode dataMode); + Var16CharWriter var16Char(String name, TypeProtos.DataMode dataMode); + TinyIntWriter tinyInt(String name, TypeProtos.DataMode dataMode); + SmallIntWriter smallInt(String name, TypeProtos.DataMode dataMode); + IntWriter integer(String name, TypeProtos.DataMode dataMode); + BigIntWriter bigInt(String name, TypeProtos.DataMode dataMode); + Float4Writer float4(String name, TypeProtos.DataMode dataMode); + Float8Writer float8(String name, TypeProtos.DataMode dataMode); + BitWriter bit(String name, TypeProtos.DataMode dataMode); + VarBinaryWriter varBinary(String name, TypeProtos.DataMode dataMode); /** * @deprecated Use {@link #varBinary(String)} instead. */ @Deprecated - VarBinaryWriter binary(String name); - DateWriter date(String name); - TimeWriter time(String name); - TimeStampWriter timeStamp(String name); - IntervalYearWriter intervalYear(String name); - IntervalDayWriter intervalDay(String name); - IntervalWriter interval(String name); - Decimal9Writer decimal9(String name); - Decimal18Writer decimal18(String name); - Decimal28DenseWriter decimal28Dense(String name); - Decimal38DenseWriter decimal38Dense(String name); - Decimal38SparseWriter decimal38Sparse(String name); - Decimal28SparseWriter decimal28Sparse(String name); + VarBinaryWriter binary(String name, TypeProtos.DataMode dataMode); + DateWriter date(String name, TypeProtos.DataMode dataMode); + TimeWriter time(String name, TypeProtos.DataMode dataMode); + TimeStampWriter timeStamp(String name, TypeProtos.DataMode dataMode); + IntervalYearWriter intervalYear(String name, TypeProtos.DataMode dataMode); + IntervalDayWriter intervalDay(String name, TypeProtos.DataMode dataMode); + IntervalWriter interval(String name, TypeProtos.DataMode dataMode); + Decimal9Writer decimal9(String name, TypeProtos.DataMode dataMode); + Decimal18Writer decimal18(String name, TypeProtos.DataMode dataMode); + Decimal28DenseWriter decimal28Dense(String name, TypeProtos.DataMode dataMode); + Decimal38DenseWriter decimal38Dense(String name, TypeProtos.DataMode dataMode); + Decimal38SparseWriter decimal38Sparse(String name, TypeProtos.DataMode dataMode); + Decimal28SparseWriter decimal28Sparse(String name, TypeProtos.DataMode dataMode); } } diff --git a/exec/vector/src/main/codegen/templates/ComplexCopier.java b/exec/vector/src/main/codegen/templates/ComplexCopier.java index 8255489d56a..35d13fb2912 100644 --- a/exec/vector/src/main/codegen/templates/ComplexCopier.java +++ b/exec/vector/src/main/codegen/templates/ComplexCopier.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -42,7 +42,7 @@ public static void copy(FieldReader input, FieldWriter output) { } private static void writeValue(FieldReader reader, FieldWriter writer) { - final DataMode m = reader.getType().getMode(); + final TypeProtos.DataMode m = reader.getType().getMode(); final MinorType mt = reader.getType().getMinorType(); switch(m){ @@ -100,7 +100,7 @@ private static FieldWriter getMapWriterForReader(FieldReader reader, MapWriter w <#assign uncappedName = name?uncap_first/> <#if !minor.class?starts_with("Decimal")> case ${name?upper_case}: - return (FieldWriter) writer.<#if name == "Int">integer<#else>${uncappedName}(name); + return (FieldWriter) writer.<#if name == "Int">integer<#else>${uncappedName}(name, reader.getType().getMode()); case MAP: diff --git a/exec/vector/src/main/codegen/templates/ComplexReaders.java b/exec/vector/src/main/codegen/templates/ComplexReaders.java index d662a6fabdd..70a14cd5713 100644 --- a/exec/vector/src/main/codegen/templates/ComplexReaders.java +++ b/exec/vector/src/main/codegen/templates/ComplexReaders.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -116,9 +116,9 @@ public void copyAsValue(${minor.class?cap_first}Writer writer){ ${nullMode}${minor.class?cap_first}WriterImpl impl = (${nullMode}${minor.class?cap_first}WriterImpl) writer; impl.vector.copyFromSafe(idx(), impl.idx(), vector); } - + public void copyAsField(String name, MapWriter writer){ - ${nullMode}${minor.class?cap_first}WriterImpl impl = (${nullMode}${minor.class?cap_first}WriterImpl) writer.${lowerName}(name); + ${nullMode}${minor.class?cap_first}WriterImpl impl = (${nullMode}${minor.class?cap_first}WriterImpl) writer.${lowerName}(name, TypeProtos.DataMode.OPTIONAL); impl.vector.copyFromSafe(idx(), impl.idx(), vector); } diff --git a/exec/vector/src/main/codegen/templates/MapWriters.java b/exec/vector/src/main/codegen/templates/MapWriters.java index 93f2edbe8aa..f5be5eb9e53 100644 --- a/exec/vector/src/main/codegen/templates/MapWriters.java +++ b/exec/vector/src/main/codegen/templates/MapWriters.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -194,41 +194,40 @@ public void end() { <#assign lowerName = minor.class?uncap_first /> <#if lowerName == "int" ><#assign lowerName = "integer" /> <#assign upperName = minor.class?upper_case /> - <#assign capName = minor.class?cap_first /> - <#assign vectName = capName /> - <#assign vectName = "Nullable${capName}" /> + <#assign vectName = minor.class?cap_first /> + <#assign nullableVectName = "Nullable${vectName}" /> <#if minor.class?starts_with("Decimal") > - public ${minor.class}Writer ${lowerName}(String name) { + @Override + public ${vectName}Writer ${lowerName}(String name, TypeProtos.DataMode dataMode) { // returns existing writer final FieldWriter writer = fields.get(name.toLowerCase()); assert writer != null; return writer; } - public ${minor.class}Writer ${lowerName}(String name, int scale, int precision) { - final MajorType ${upperName}_TYPE = Types.withScaleAndPrecision(MinorType.${upperName}, DataMode.OPTIONAL, scale, precision); + public ${vectName}Writer ${lowerName}(String name, TypeProtos.DataMode dataMode, int scale, int precision) { + final MajorType ${upperName}_TYPE = Types.withScaleAndPrecision(MinorType.${upperName}, dataMode, scale, precision); <#else> - private static final MajorType ${upperName}_TYPE = Types.optional(MinorType.${upperName}); - @Override - public ${minor.class}Writer ${lowerName}(String name) { + + public ${vectName}Writer ${lowerName}(String name, TypeProtos.DataMode dataMode) { + final MajorType ${upperName}_TYPE = Types.withMode(MinorType.${upperName}, dataMode); FieldWriter writer = fields.get(name.toLowerCase()); if(writer == null) { ValueVector vector; ValueVector currentVector = container.getChild(name); - if (unionEnabled){ - ${vectName}Vector v = container.addOrGet(name, ${upperName}_TYPE, ${vectName}Vector.class); - writer = new PromotableWriter(v, container); - vector = v; + vector = dataMode == TypeProtos.DataMode.OPTIONAL ? container.addOrGet(name, ${upperName}_TYPE, + ${nullableVectName}Vector.class) : container.addOrGet(name, ${upperName}_TYPE, ${vectName}Vector.class); + if (unionEnabled) { + writer = new PromotableWriter(vector, container); } else { - ${vectName}Vector v = container.addOrGet(name, ${upperName}_TYPE, ${vectName}Vector.class); - writer = new ${vectName}WriterImpl(v, this); - vector = v; + writer = dataMode == TypeProtos.DataMode.OPTIONAL ? new ${nullableVectName}WriterImpl((${nullableVectName}Vector) vector, this) : + new ${vectName}WriterImpl((${vectName}Vector) vector, this); } if (currentVector == null || currentVector != vector) { vector.allocateNewSafe(); - } + } writer.setPosition(${index}); fields.put(name.toLowerCase(), writer); } diff --git a/exec/vector/src/main/codegen/templates/UnionListWriter.java b/exec/vector/src/main/codegen/templates/UnionListWriter.java index c676769f363..3e9e1aa40ad 100644 --- a/exec/vector/src/main/codegen/templates/UnionListWriter.java +++ b/exec/vector/src/main/codegen/templates/UnionListWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -90,13 +90,13 @@ public void close() throws Exception { } @Override - public ${name}Writer <#if uncappedName == "int">integer<#else>${uncappedName}(String name) { + public ${name}Writer <#if uncappedName == "int">integer<#else>${uncappedName}(String name, TypeProtos.DataMode dataMode) { assert inMap; mapName = name; final int nextOffset = offsets.getAccessor().get(idx() + 1); vector.getMutator().setNotNull(idx()); writer.setPosition(nextOffset); - ${name}Writer ${uncappedName}Writer = writer.<#if uncappedName == "int">integer<#else>${uncappedName}(name); + ${name}Writer ${uncappedName}Writer = writer.<#if uncappedName == "int">integer<#else>${uncappedName}(name, dataMode); return ${uncappedName}Writer; } diff --git a/exec/vector/src/main/codegen/templates/UnionWriter.java b/exec/vector/src/main/codegen/templates/UnionWriter.java index 7a123b4e799..18545b2ba79 100644 --- a/exec/vector/src/main/codegen/templates/UnionWriter.java +++ b/exec/vector/src/main/codegen/templates/UnionWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -186,10 +186,10 @@ public MapWriter map(String name) { <#assign capName = minor.class?cap_first /> <#if !minor.class?starts_with("Decimal")> @Override - public ${capName}Writer ${lowerName}(String name) { + public ${capName}Writer ${lowerName}(String name, TypeProtos.DataMode dataMode) { data.getMutator().setType(idx(), MinorType.MAP); getMapWriter().setPosition(idx()); - return getMapWriter().${lowerName}(name); + return getMapWriter().${lowerName}(name, dataMode); } @Override diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/MapOrListWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/MapOrListWriterImpl.java index 302d99e1e03..9e7b8e6923e 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/MapOrListWriterImpl.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/MapOrListWriterImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.vector.complex.impl; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.vector.complex.writer.BaseWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapOrListWriter; import org.apache.drill.exec.vector.complex.writer.BigIntWriter; @@ -100,136 +101,136 @@ public boolean isListWriter() { return list != null; } - public VarCharWriter varChar(final String name) { - return (map != null) ? map.varChar(name) : list.varChar(); + public VarCharWriter varChar(final String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.varChar(name, dataMode) : list.varChar(); } - public IntWriter integer(final String name) { - return (map != null) ? map.integer(name) : list.integer(); + public IntWriter integer(final String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.integer(name, dataMode) : list.integer(); } - public BigIntWriter bigInt(final String name) { - return (map != null) ? map.bigInt(name) : list.bigInt(); + public BigIntWriter bigInt(final String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.bigInt(name, dataMode) : list.bigInt(); } - public Float4Writer float4(final String name) { - return (map != null) ? map.float4(name) : list.float4(); + public Float4Writer float4(final String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.float4(name, dataMode) : list.float4(); } - public Float8Writer float8(final String name) { - return (map != null) ? map.float8(name) : list.float8(); + public Float8Writer float8(final String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.float8(name, dataMode) : list.float8(); } - public BitWriter bit(final String name) { - return (map != null) ? map.bit(name) : list.bit(); + public BitWriter bit(final String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.bit(name, dataMode) : list.bit(); } /** * {@inheritDoc} */ @Deprecated - public VarBinaryWriter binary(final String name) { - return (map != null) ? map.varBinary(name) : list.varBinary(); + public VarBinaryWriter binary(final String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.varBinary(name, dataMode) : list.varBinary(); } @Override - public TinyIntWriter tinyInt(String name) { - return (map != null) ? map.tinyInt(name) : list.tinyInt(); + public TinyIntWriter tinyInt(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.tinyInt(name, dataMode) : list.tinyInt(); } @Override - public SmallIntWriter smallInt(String name) { - return (map != null) ? map.smallInt(name) : list.smallInt(); + public SmallIntWriter smallInt(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.smallInt(name, dataMode) : list.smallInt(); } @Override - public DateWriter date(String name) { - return (map != null) ? map.date(name) : list.date(); + public DateWriter date(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.date(name, dataMode) : list.date(); } @Override - public TimeWriter time(String name) { - return (map != null) ? map.time(name) : list.time(); + public TimeWriter time(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.time(name, dataMode) : list.time(); } @Override - public TimeStampWriter timeStamp(String name) { - return (map != null) ? map.timeStamp(name) : list.timeStamp(); + public TimeStampWriter timeStamp(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.timeStamp(name, dataMode) : list.timeStamp(); } @Override - public VarBinaryWriter varBinary(String name) { - return (map != null) ? map.varBinary(name) : list.varBinary(); + public VarBinaryWriter varBinary(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.varBinary(name, dataMode) : list.varBinary(); } @Override - public Var16CharWriter var16Char(String name) { - return (map != null) ? map.var16Char(name) : list.var16Char(); + public Var16CharWriter var16Char(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.var16Char(name, dataMode) : list.var16Char(); } @Override - public UInt1Writer uInt1(String name) { - return (map != null) ? map.uInt1(name) : list.uInt1(); + public UInt1Writer uInt1(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.uInt1(name, dataMode) : list.uInt1(); } @Override - public UInt2Writer uInt2(String name) { - return (map != null) ? map.uInt2(name) : list.uInt2(); + public UInt2Writer uInt2(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.uInt2(name, dataMode) : list.uInt2(); } @Override - public UInt4Writer uInt4(String name) { - return (map != null) ? map.uInt4(name) : list.uInt4(); + public UInt4Writer uInt4(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.uInt4(name, dataMode) : list.uInt4(); } @Override - public UInt8Writer uInt8(String name) { - return (map != null) ? map.uInt8(name) : list.uInt8(); + public UInt8Writer uInt8(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.uInt8(name, dataMode) : list.uInt8(); } @Override - public IntervalYearWriter intervalYear(String name) { - return (map != null) ? map.intervalYear(name) : list.intervalYear(); + public IntervalYearWriter intervalYear(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.intervalYear(name, dataMode) : list.intervalYear(); } @Override - public IntervalDayWriter intervalDay(String name) { - return (map != null) ? map.intervalDay(name) : list.intervalDay(); + public IntervalDayWriter intervalDay(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.intervalDay(name, dataMode) : list.intervalDay(); } @Override - public IntervalWriter interval(String name) { - return (map != null) ? map.interval(name) : list.interval(); + public IntervalWriter interval(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.interval(name, dataMode) : list.interval(); } @Override - public Decimal9Writer decimal9(String name) { - return (map != null) ? map.decimal9(name) : list.decimal9(); + public Decimal9Writer decimal9(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.decimal9(name, dataMode) : list.decimal9(); } @Override - public Decimal18Writer decimal18(String name) { - return (map != null) ? map.decimal18(name) : list.decimal18(); + public Decimal18Writer decimal18(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.decimal18(name, dataMode) : list.decimal18(); } @Override - public Decimal28DenseWriter decimal28Dense(String name) { - return (map != null) ? map.decimal28Dense(name) : list.decimal28Dense(); + public Decimal28DenseWriter decimal28Dense(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.decimal28Dense(name, dataMode) : list.decimal28Dense(); } @Override - public Decimal38DenseWriter decimal38Dense(String name) { - return (map != null) ? map.decimal38Dense(name) : list.decimal38Dense(); + public Decimal38DenseWriter decimal38Dense(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.decimal38Dense(name, dataMode) : list.decimal38Dense(); } @Override - public Decimal38SparseWriter decimal38Sparse(String name) { - return (map != null) ? map.decimal38Sparse(name) : list.decimal38Sparse(); + public Decimal38SparseWriter decimal38Sparse(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.decimal38Sparse(name, dataMode) : list.decimal38Sparse(); } @Override - public Decimal28SparseWriter decimal28Sparse(String name) { - return (map != null) ? map.decimal28Sparse(name) : list.decimal28Sparse(); + public Decimal28SparseWriter decimal28Sparse(String name, TypeProtos.DataMode dataMode) { + return (map != null) ? map.decimal28Sparse(name, dataMode) : list.decimal28Sparse(); } }