Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -17,12 +17,15 @@
*/
package org.apache.drill.exec.vector.complex.fn;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import io.netty.buffer.DrillBuf;

import java.io.IOException;
import java.io.InputStream;
import java.util.BitSet;
import java.util.List;
import java.util.Map;

import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.PathSegment;
Expand Down Expand Up @@ -55,13 +58,20 @@ public class JsonReader extends BaseJsonProcessor {
private final ListVectorOutput listOutput;
private final boolean extended = true;
private final boolean readNumbersAsDouble;
private List<String> path = Lists.newArrayList();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you are adding a stateful variable 'path', could you add a test with some error json items injected in the middle to make sure it can still recover and have good status when the option introduced in DRILL-4653 is enabled ('store.json.reader.skip_invalid_records') which will skip invalid tokens.

Copy link
Copy Markdown
Contributor Author

@Serhii-Harnyk Serhii-Harnyk Dec 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed it and added test.


/**
* Collection for tracking empty array writers during reading
* and storing them for initializing empty arrays
*/
private final List<ListWriter> emptyArrayWriters = Lists.newArrayList();

/**
* Collection for tracking nullable fields during reading
* and storing them for creating default typed vectors
*/
private Map<List<String>, MapWriter> fieldPathWriter = Maps.newHashMap();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, in JSON, all scalar types can be null. Because JSON has no schema, any field can have any type. Drill assumes that a given field has a single type. But, in JSON semantics, that single type can be null. That is, the following is always valid:

{code}
{ a: 10, b: null }
{ a: null, b: "foo" }
{ a: 20 }
{code}

JSON, but not Drill, differentiates between "not present" and null.

Given this, we don't need a special map for nullable fields: all JSON fields are nullable.

Of course, JSON allows a null map, which Drill does not handle, but let's ignore that here...


/**
* Describes whether or not this reader can unwrap a single root array record
* and treat it like a set of distinct records.
Expand Down Expand Up @@ -149,11 +159,15 @@ public void ensureAtLeastOneField(ComplexWriter writer) {
for (int j = 0; j < fieldPathList.size(); j++) {
BaseWriter.MapWriter fieldWriter = writerList.get(j);
PathSegment fieldPath = fieldPathList.get(j);
if (emptyStatus.get(j)) {
if (allTextMode) {
fieldWriter.varChar(fieldPath.getNameSegment().getPath());
} else {
fieldWriter.integer(fieldPath.getNameSegment().getPath());
if (emptyStatus.get(j) && !checkNullFields(fieldPathList)) {
initializeFieldWriter(fieldWriter, fieldPath.getNameSegment().getPath());
}
}

if (checkNullFields(fieldPathList)) {
for (Map.Entry<List<String>, MapWriter> fieldPath : fieldPathWriter.entrySet()) {
if(fieldPath.getValue() != null) {
initializeFieldWriter(fieldPath.getValue(), fieldPath.getKey().get(fieldPath.getKey().size() - 1));
}
}
}
Expand All @@ -170,6 +184,21 @@ public void ensureAtLeastOneField(ComplexWriter writer) {
}
}

private void initializeFieldWriter(MapWriter fieldWriter, String path) {
if (allTextMode) {
fieldWriter.varChar(path);
} else {
fieldWriter.integer(path);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the Drill Hangout it was mentioned that this fix handles nulls for non-Varchar fields. Note that it does so using the same mechanism that Drill uses elsewhere: assume the field is of type int. However, we have many, many bugs that result from that assumption. There is simply no guarantee that, in a later batch, when we see the field, that it will, in fact, be an int.

I'm not sure whether it is OK to simply continue to propagate that well-known error here (as we have done) or to take another path that avoids the error. (Since doing so requires a design change that has, so far, always been beyond our ability to accomplish.)

}
}

/**
* Check query having a '*' and existing nullable fields in result
*/
private boolean checkNullFields(List<PathSegment> fieldPathList) {
return (fieldPathList.size() == 1) && fieldPathList.get(0).getNameSegment().getPath().equals("*") && !fieldPathWriter.isEmpty();
}

public void setSource(int start, int end, DrillBuf buf) throws IOException {
setSource(DrillBufInputStream.getStream(start, end, buf));
}
Expand Down Expand Up @@ -229,6 +258,7 @@ public ReadState write(ComplexWriter writer) throws IOException {
}
} catch (com.fasterxml.jackson.core.JsonParseException ex) {
if (ignoreJSONParseError()) {
path.clear();
if (processJSONException() == JsonExceptionProcessingState.END_OF_STREAM) {
return ReadState.JSON_RECORD_PARSE_EOF_ERROR;
} else {
Expand Down Expand Up @@ -367,12 +397,19 @@ private void writeData(MapWriter map, FieldSelection selection,
t = parser.getCurrentToken();
moveForward = true;
}
if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) {

if (t == JsonToken.NOT_AVAILABLE) {
return;
}

if (t == JsonToken.END_OBJECT) {
if (path.size() > 0) {
path.remove(path.size() - 1);
}
return;
}

assert t == JsonToken.FIELD_NAME : String.format(
"Expected FIELD_NAME but got %s.", t.name());
assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME but got %s.", t.name());

final String fieldName = parser.getText();
this.currentFieldName = fieldName;
Expand All @@ -388,6 +425,7 @@ private void writeData(MapWriter map, FieldSelection selection,
break;
case START_OBJECT:
if (!writeMapDataIfTyped(map, fieldName)) {
path.add(fieldName);
writeData(map.map(fieldName), childSelection, false);
}
break;
Expand All @@ -403,7 +441,7 @@ private void writeData(MapWriter map, FieldSelection selection,
break;
}
case VALUE_NULL:
// do nothing as we don't have a type.
putFieldPath(fieldName, map);
break;
case VALUE_NUMBER_FLOAT:
map.float8(fieldName).writeFloat8(parser.getDoubleValue());
Expand Down Expand Up @@ -446,12 +484,19 @@ private void writeDataAllText(MapWriter map, FieldSelection selection,
t = parser.getCurrentToken();
moveForward = true;
}
if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) {

if (t == JsonToken.NOT_AVAILABLE) {
return;
}

assert t == JsonToken.FIELD_NAME : String.format(
"Expected FIELD_NAME but got %s.", t.name());
if (t == JsonToken.END_OBJECT) {
if (path.size() > 0) {
path.remove(path.size() - 1);
}
return;
}

assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME but got %s.", t.name());

final String fieldName = parser.getText();
this.currentFieldName = fieldName;
Expand All @@ -467,6 +512,7 @@ private void writeDataAllText(MapWriter map, FieldSelection selection,
break;
case START_OBJECT:
if (!writeMapDataIfTyped(map, fieldName)) {
path.add(fieldName);
writeDataAllText(map.map(fieldName), childSelection, false);
}
break;
Expand All @@ -482,7 +528,7 @@ private void writeDataAllText(MapWriter map, FieldSelection selection,
handleString(parser, map, fieldName);
break;
case VALUE_NULL:
// do nothing as we don't have a type.
putFieldPath(fieldName, map);
break;

default:
Expand All @@ -492,7 +538,19 @@ private void writeDataAllText(MapWriter map, FieldSelection selection,
}
}
map.end();
}

/**
* Puts copy of field path list to fieldPathWriter map.
* @param fieldName
*/
private void putFieldPath(String fieldName, MapWriter map) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have the performance data that you had collected earlier with different percentage of NULLs, after this change ? I still feel there could be some further optimization needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attached results of testing on the last fix to the Jira.
https://drive.google.com/open?id=1l3Dg0DCV3p-OhwA0v6qdl2wGcezXDl5qaUqv56EEBn8
It should be mentioned that a lot of time goes to the creating fields and them writers. So times of execution also compared with time of querying files, in which nulls replaced by varchar.

path.add(fieldName);
if (!fieldPathWriter.containsKey(path)) {
List<String> fieldPath = ImmutableList.copyOf(path);
fieldPathWriter.put(fieldPath, map);
}
path.remove(path.size() - 1);
}

/**
Expand Down
21 changes: 8 additions & 13 deletions exec/java-exec/src/test/java/org/apache/drill/TestCTASJson.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@


import org.apache.drill.common.util.TestTools;
import org.apache.drill.exec.ExecConstants;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class TestCTASJson extends PlanTestBase {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestCTASJson.class);

Expand All @@ -38,6 +35,8 @@ public class TestCTASJson extends PlanTestBase {
public void testctas_alltypes_map() throws Exception {
String testName = "ctas_alltypes_map";
test("use dfs_test.tmp");
test("alter session set store.json.writer.skip_null_fields = true");
test("alter session set store.format = 'json' ");
test("create table " + testName + "_json as select * from cp.`json/" + testName + ".json`");

final String query = "select * from `" + testName + "_json` t1 ";
Expand All @@ -47,8 +46,6 @@ public void testctas_alltypes_map() throws Exception {
.sqlQuery(query)
.ordered()
.jsonBaselineFile("json/" + testName + ".json")
.optionSettingQueriesForTestQuery("alter session set store.format = 'json' ")
.optionSettingQueriesForTestQuery("alter session set store.json.writer.skip_null_fields = true") // DEFAULT
.build()
.run();
} finally {
Expand All @@ -66,6 +63,8 @@ public void testctas_alltypes_map() throws Exception {
public void testctas_alltypes_map_noskip() throws Exception {
String testName = "ctas_alltypes_map";
test("use dfs_test.tmp");
test("alter session set store.json.writer.skip_null_fields = false");
test("alter session set store.format = 'json' ");
test("create table " + testName + "_json as select * from cp.`json/" + testName + ".json`");

final String query = "select * from `" + testName + "_json` t1 ";
Expand All @@ -75,8 +74,6 @@ public void testctas_alltypes_map_noskip() throws Exception {
.sqlQuery(query)
.ordered()
.jsonBaselineFile("json/" + testName + "_out.json")
.optionSettingQueriesForTestQuery("alter session set store.format = 'json' ")
.optionSettingQueriesForTestQuery("alter session set store.json.writer.skip_null_fields = false") // change from DEFAULT
.build()
.run();
} finally{
Expand All @@ -95,6 +92,8 @@ public void testctas_alltypes_map_noskip() throws Exception {
public void testctas_alltypes_repeatedmap() throws Exception {
String testName = "ctas_alltypes_repeated_map";
test("use dfs_test.tmp");
test("alter session set store.json.writer.skip_null_fields = true");
test("alter session set store.format = 'json' ");
test("create table " + testName + "_json as select * from cp.`json/" + testName + ".json`");

final String query = "select * from `" + testName + "_json` t1 ";
Expand All @@ -104,9 +103,6 @@ public void testctas_alltypes_repeatedmap() throws Exception {
.sqlQuery(query)
.ordered()
.jsonBaselineFile("json/" + testName + ".json")
.optionSettingQueriesForTestQuery("alter session set store.format = 'json' ")
.optionSettingQueriesForTestQuery(
"alter session set store.json.writer.skip_null_fields = true") // DEFAULT
.build()
.run();
}finally{
Expand All @@ -125,6 +121,8 @@ public void testctas_alltypes_repeatedmap() throws Exception {
public void testctas_alltypes_repeated_map_noskip() throws Exception {
String testName = "ctas_alltypes_repeated_map";
test("use dfs_test.tmp");
test("alter session set store.json.writer.skip_null_fields = false");
test("alter session set store.format = 'json' ");
test("create table " + testName + "_json as select * from cp.`json/" + testName + ".json`");

final String query = "select * from `" + testName + "_json` t1 ";
Expand All @@ -134,9 +132,6 @@ public void testctas_alltypes_repeated_map_noskip() throws Exception {
.sqlQuery(query)
.ordered()
.jsonBaselineFile("json/" + testName + "_out.json")
.optionSettingQueriesForTestQuery("alter session set store.format = 'json' ")
.optionSettingQueriesForTestQuery(
"alter session set store.json.writer.skip_null_fields = false") // change from DEFAULT
.build()
.run();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testSelStarOrderBy() throws Exception{
.ordered()
.sqlQuery(" select * from cp.`employee.json` order by last_name")
.sqlBaselineQuery(" select employee_id, full_name,first_name,last_name,position_id,position_title,store_id," +
" department_id,birth_date,hire_date,salary,supervisor_id,education_level,marital_status,gender,management_role " +
" department_id,birth_date,hire_date,salary,supervisor_id,education_level,marital_status,gender,management_role,end_date " +
" from cp.`employee.json` " +
" order by last_name ")
.build().run();
Expand All @@ -139,7 +139,7 @@ public void testSelStarOrderByLimit() throws Exception{
.ordered()
.sqlQuery(" select * from cp.`employee.json` order by last_name limit 2")
.sqlBaselineQuery(" select employee_id, full_name,first_name,last_name,position_id,position_title,store_id," +
" department_id,birth_date,hire_date,salary,supervisor_id,education_level,marital_status,gender,management_role " +
" department_id,birth_date,hire_date,salary,supervisor_id,education_level,marital_status,gender,management_role,end_date " +
" from cp.`employee.json` " +
" order by last_name limit 2")
.build().run();
Expand All @@ -162,7 +162,7 @@ public void testSelStarWhereOrderBy() throws Exception{
.ordered()
.sqlQuery("select * from cp.`employee.json` where first_name = 'James' order by last_name")
.sqlBaselineQuery("select employee_id, full_name,first_name,last_name,position_id,position_title,store_id," +
" department_id,birth_date,hire_date,salary,supervisor_id,education_level,marital_status,gender,management_role " +
" department_id,birth_date,hire_date,salary,supervisor_id,education_level,marital_status,gender,management_role,end_date " +
" from cp.`employee.json` " +
" where first_name = 'James' order by last_name")
.build().run();
Expand Down
Loading