From 0889aa65023abf94c865589b7e4f5aae5baa9c6a Mon Sep 17 00:00:00 2001 From: Sudheesh Katkam Date: Wed, 9 Mar 2016 10:34:26 -0800 Subject: [PATCH 1/3] DRILL-4606: HYGIENE + Merge DrillAutoCloseables and AuthCloseables + Remove unused imports + Expand * imports --- .../apache/drill/common/AutoCloseables.java | 27 +++++++++- .../drill/common/DrillAutoCloseables.java | 50 ------------------- .../apache/drill/exec/client/DrillClient.java | 7 ++- .../exec/client/PrintingResultsListener.java | 4 +- .../exec/physical/impl/TopN/TopNBatch.java | 4 +- .../drill/exec/server/BootStrapContext.java | 4 +- .../drill/exec/DrillSeparatePlanningTest.java | 11 ++-- .../fn/impl/TestByteComparisonFunctions.java | 4 +- .../exec/server/TestDrillbitResilience.java | 4 +- .../vector/complex/writer/TestRepeated.java | 4 +- .../drill/exec/memory/TestEndianess.java | 4 +- 11 files changed, 51 insertions(+), 72 deletions(-) delete mode 100644 common/src/main/java/org/apache/drill/common/DrillAutoCloseables.java diff --git a/common/src/main/java/org/apache/drill/common/AutoCloseables.java b/common/src/main/java/org/apache/drill/common/AutoCloseables.java index fcdfe14de51..ede316272d1 100644 --- a/common/src/main/java/org/apache/drill/common/AutoCloseables.java +++ b/common/src/main/java/org/apache/drill/common/AutoCloseables.java @@ -66,7 +66,7 @@ public static void close(AutoCloseable... autoCloseables) throws Exception { /** * Closes all autoCloseables if not null and suppresses subsequent exceptions if more than one - * @param autoCloseables the closeables to close + * @param ac the closeables to close */ public static void close(Iterable ac) throws Exception { Exception topLevelException = null; @@ -87,4 +87,29 @@ public static void close(Iterable ac) throws Exception throw topLevelException; } } + + /** + * close() an {@see java.lang.AutoCloseable} without throwing a (checked) + * {@see java.lang.Exception}. This wraps the close() call with a + * try-catch that will rethrow an Exception wrapped with a + * {@see java.lang.RuntimeException}, providing a way to call close() + * without having to do the try-catch everywhere or propagate the Exception. + * + * @param autoCloseable the AutoCloseable to close; may be null + * @throws RuntimeException if an Exception occurs; the Exception is + * wrapped by the RuntimeException + */ + public static void closeNoChecked(final AutoCloseable autoCloseable) { + if (autoCloseable != null) { + try { + autoCloseable.close(); + } catch(final Exception e) { + throw new RuntimeException("Exception while closing", e); + } + } + } + + // prevents instantiation + private AutoCloseables() { + } } diff --git a/common/src/main/java/org/apache/drill/common/DrillAutoCloseables.java b/common/src/main/java/org/apache/drill/common/DrillAutoCloseables.java deleted file mode 100644 index 11fb9a8f579..00000000000 --- a/common/src/main/java/org/apache/drill/common/DrillAutoCloseables.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.common; - -/** - * Provides functionality comparable to Guava's Closeables for AutoCloseables. - */ -public class DrillAutoCloseables { - /** - * Constructor. Prevents construction for class of static utilities. - */ - private DrillAutoCloseables() { - } - - /** - * close() an {@see java.lang.AutoCloseable} without throwing a (checked) - * {@see java.lang.Exception}. This wraps the close() call with a - * try-catch that will rethrow an Exception wrapped with a - * {@see java.lang.RuntimeException}, providing a way to call close() - * without having to do the try-catch everywhere or propagate the Exception. - * - * @param closeable the AutoCloseable to close; may be null - * @throws RuntimeException if an Exception occurs; the Exception is - * wrapped by the RuntimeException - */ - public static void closeNoChecked(final AutoCloseable autoCloseable) { - if (autoCloseable != null) { - try { - autoCloseable.close(); - } catch(final Exception e) { - throw new RuntimeException("Exception while closing", e); - } - } - } -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index e81a4fb0ce8..a5ef99f6058 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -25,7 +25,6 @@ import io.netty.channel.EventLoopGroup; import java.io.Closeable; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -36,7 +35,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.drill.common.DrillAutoCloseables; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ExecConstants; @@ -302,8 +301,8 @@ public void close() { if (this.client != null) { this.client.close(); } - if (this.ownsAllocator && allocator != null) { - DrillAutoCloseables.closeNoChecked(allocator); + if (ownsAllocator && allocator != null) { + AutoCloseables.closeNoChecked(allocator); } if (ownsZkConnection) { if (clusterCoordinator != null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java index bdd2fab124f..9a36ead4939 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java @@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.drill.common.DrillAutoCloseables; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.client.QuerySubmitter.Format; @@ -65,7 +65,7 @@ public void submissionFailed(UserException ex) { @Override public void queryCompleted(QueryState state) { - DrillAutoCloseables.closeNoChecked(allocator); + AutoCloseables.closeNoChecked(allocator); System.out.println("Total rows returned : " + count.get() + ". Returned in " + w.elapsed(TimeUnit.MILLISECONDS) + "ms."); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 0fbcb7da083..7b8fa44f2f4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit; import org.apache.calcite.rel.RelFieldCollation.Direction; -import org.apache.drill.common.DrillAutoCloseables; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; @@ -322,7 +322,7 @@ private void purge() throws SchemaChangeException { builder.getSv4().clear(); selectionVector4.clear(); } finally { - DrillAutoCloseables.closeNoChecked(builder); + AutoCloseables.closeNoChecked(builder); } logger.debug("Took {} us to purge", watch.elapsed(TimeUnit.MICROSECONDS)); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java index 6554e3307f5..de6b6c3c0e3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java @@ -25,7 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.drill.common.DrillAutoCloseables; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.scanner.persistence.ScanResult; import org.apache.drill.exec.ExecConstants; @@ -124,6 +124,6 @@ public void close() { } } - DrillAutoCloseables.closeNoChecked(allocator); + AutoCloseables.closeNoChecked(allocator); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java index 04818258e82..e7d7e6c0cbf 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec; -import static org.junit.Assert.*; import io.netty.buffer.DrillBuf; import java.util.Iterator; @@ -28,7 +27,7 @@ import java.util.concurrent.ExecutionException; import org.apache.drill.BaseTestQuery; -import org.apache.drill.common.DrillAutoCloseables; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.util.TestTools; import org.apache.drill.exec.client.DrillClient; @@ -58,6 +57,12 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * Class to test different planning use cases (separate form query execution) * @@ -304,7 +309,7 @@ public void queryIdArrived(QueryId queryId) { @Override public void submissionFailed(UserException ex) { - DrillAutoCloseables.closeNoChecked(allocator); + AutoCloseables.closeNoChecked(allocator); this.ex = ex; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestByteComparisonFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestByteComparisonFunctions.java index 6ff2d248df8..1defba9b2a5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestByteComparisonFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestByteComparisonFunctions.java @@ -19,7 +19,7 @@ import static org.junit.Assert.assertTrue; -import org.apache.drill.common.DrillAutoCloseables; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecTest; import org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers; @@ -56,7 +56,7 @@ public static void teardown() { helloLong.buffer.release(); goodbye.buffer.release(); goodbyeLong.buffer.release(); - DrillAutoCloseables.closeNoChecked(allocator); + AutoCloseables.closeNoChecked(allocator); } @Test diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java index 4200073cda8..6217b5960f6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java @@ -34,8 +34,8 @@ import org.apache.drill.BaseTestQuery; import org.apache.drill.QueryTestUtil; import org.apache.drill.SingleRowListener; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.concurrent.ExtendedLatch; -import org.apache.drill.common.DrillAutoCloseables; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.types.TypeProtos.MinorType; @@ -275,7 +275,7 @@ public void rowArrived(final QueryDataBatch queryResultBatch) { @Override public void cleanup() { - DrillAutoCloseables.closeNoChecked(bufferAllocator); + AutoCloseables.closeNoChecked(bufferAllocator); } }; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java index bd4731a61a4..56f8b50340b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java @@ -19,7 +19,7 @@ import java.io.ByteArrayOutputStream; -import org.apache.drill.common.DrillAutoCloseables; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.expr.holders.BigIntHolder; import org.apache.drill.exec.expr.holders.IntHolder; @@ -52,7 +52,7 @@ public static void setupAllocator() { @AfterClass public static void destroyAllocator() { - DrillAutoCloseables.closeNoChecked(allocator); + AutoCloseables.closeNoChecked(allocator); } // // @Test diff --git a/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestEndianess.java b/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestEndianess.java index b312301f6af..601eb3bc56d 100644 --- a/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestEndianess.java +++ b/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestEndianess.java @@ -20,7 +20,7 @@ import static org.junit.Assert.assertEquals; import io.netty.buffer.ByteBuf; -import org.apache.drill.common.DrillAutoCloseables; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.config.DrillConfig; import org.junit.Test; @@ -37,7 +37,7 @@ public void testLittleEndian() { assertEquals(b.getByte(2), 0); assertEquals(b.getByte(3), 0); b.release(); - DrillAutoCloseables.closeNoChecked(a); + AutoCloseables.closeNoChecked(a); } } From 8cc6bc929b91a9f19f2ed8cbce293db8f86c1e48 Mon Sep 17 00:00:00 2001 From: Sudheesh Katkam Date: Thu, 14 Apr 2016 22:25:02 -0700 Subject: [PATCH 2/3] DRILL-4606: CORE + Add DrillClient.Builder helper class to create DrillClient objects + Deprecate 8 constructors and DrillClientFactory + Reorganize and document DrillClient --- .../drill/hbase/TestHBaseCFAsJSONString.java | 5 +- .../apache/drill/exec/client/DrillClient.java | 496 +++++++++++++----- .../drill/exec/client/QuerySubmitter.java | 10 +- .../rest/auth/AbstractDrillLoginService.java | 7 +- .../server/rest/auth/DrillUserPrincipal.java | 7 +- .../java/org/apache/drill/QueryTestUtil.java | 5 +- .../exec/fn/impl/TestAggregateFunction.java | 5 +- .../drill/exec/fn/impl/TestDateFunctions.java | 5 +- .../drill/exec/fn/impl/TestMultiInputAdd.java | 5 +- .../fn/impl/TestNewAggregateFunctions.java | 6 +- .../physical/impl/TestBroadcastExchange.java | 10 +- .../exec/physical/impl/TestCastFunctions.java | 5 +- .../impl/TestCastVarCharToBigInt.java | 5 +- .../drill/exec/physical/impl/TestDecimal.java | 30 +- .../impl/TestDistributedFragmentRun.java | 25 +- .../physical/impl/TestExtractFunctions.java | 5 +- .../impl/TestHashToRandomExchange.java | 7 +- .../exec/physical/impl/TestOptiqPlans.java | 25 +- .../impl/TestReverseImplicitCast.java | 5 +- .../physical/impl/TestSimpleFragmentRun.java | 12 +- .../exec/physical/impl/TestUnionExchange.java | 7 +- .../physical/impl/TopN/TestSimpleTopN.java | 7 +- .../exec/physical/impl/join/TestHashJoin.java | 25 +- .../physical/impl/join/TestMergeJoin.java | 20 +- .../impl/join/TestMergeJoinMulCondition.java | 15 +- .../mergereceiver/TestMergingReceiver.java | 22 +- .../TestOrderedPartitionExchange.java | 5 +- .../impl/xsort/TestSimpleExternalSort.java | 12 +- .../exec/record/vector/TestDateTypes.java | 30 +- .../drill/exec/server/DrillClientFactory.java | 4 + .../parquet/TestParquetPhysicalPlan.java | 10 +- .../exec/store/text/TextRecordReaderTest.java | 5 +- .../complex/writer/TestComplexToJson.java | 10 +- .../exec/work/batch/TestSpoolingBuffer.java | 6 +- .../drill/jdbc/impl/DrillConnectionImpl.java | 10 +- .../apache/drill/exec/rpc/BasicServer.java | 21 +- .../apache/drill/exec/rpc/TransportCheck.java | 40 +- 37 files changed, 704 insertions(+), 225 deletions(-) diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java index 7873b8075ee..075a44ea5b3 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java @@ -34,7 +34,10 @@ public class TestHBaseCFAsJSONString extends BaseHBaseTest { @BeforeClass public static void openMyClient() throws Exception { parent_client = client; - client = new DrillClient(config, serviceSet.getCoordinator()); + client = DrillClient.newBuilder() + .setConfig(config) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build(); client.setSupportComplexTypes(false); client.connect(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index a5ef99f6058..9ea22248597 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -17,14 +17,14 @@ */ package org.apache.drill.exec.client; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL; -import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder; +import static com.google.common.base.Preconditions.checkState; + import io.netty.buffer.DrillBuf; import io.netty.channel.EventLoopGroup; import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -41,7 +41,6 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; -import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.RootAllocatorFactory; import org.apache.drill.exec.proto.BitControl.PlanFragment; @@ -51,7 +50,6 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; import org.apache.drill.exec.proto.UserBitShared.QueryType; -import org.apache.drill.exec.proto.UserProtos; import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq; import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp; import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp; @@ -67,6 +65,7 @@ import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle; import org.apache.drill.exec.proto.UserProtos.Property; import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments; +import org.apache.drill.exec.proto.UserProtos.QueryResultsMode; import org.apache.drill.exec.proto.UserProtos.RpcType; import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.proto.UserProtos.UserProperties; @@ -89,79 +88,175 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.util.concurrent.AbstractCheckedFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; /** * Thin wrapper around a UserClient that handles connect/close and transforms * String into ByteBuf. + * + * To create non-default objects, use {@link DrillClient.Builder the builder class}. + * E.g. + * + * DrillClient client = DrillClient.newBuilder() + * .setConfig(...) + * .setIsDirectConnection(true) + * .build(); + * + * + * Except for {@link #runQuery} and {@link #cancelQuery}, this class is generally not thread safe. */ public class DrillClient implements Closeable, ConnectionThrottle { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class); private static final ObjectMapper objectMapper = new ObjectMapper(); private final DrillConfig config; - private UserClient client; - private UserProperties props = null; - private volatile ClusterCoordinator clusterCoordinator; - private volatile boolean connected = false; private final BufferAllocator allocator; - private int reconnectTimes; - private int reconnectDelay; - private boolean supportComplexTypes; - private final boolean ownsZkConnection; + private final EventLoopGroup eventLoopGroup; + private final ExecutorService executor; + private final boolean isDirectConnection; + private final int reconnectTimes; + private final int reconnectDelay; + + // TODO: clusterCoordinator should be initialized in the constructor. + // Currently, initialization is tightly coupled with #connect. + private ClusterCoordinator clusterCoordinator; + + // checks if this client owns these resources (used when closing) private final boolean ownsAllocator; - private final boolean isDirectConnection; // true if the connection bypasses zookeeper and connects directly to a drillbit - private EventLoopGroup eventLoopGroup; - private ExecutorService executor; + private final boolean ownsZkConnection; + private final boolean ownsEventLoopGroup; + private final boolean ownsExecutor; + + // once #setSupportComplexTypes() is removed, make this final + private boolean supportComplexTypes; + + private UserClient client; + private UserProperties props; + private boolean connected; - public DrillClient() throws OutOfMemoryException { - this(DrillConfig.create(), false); + public DrillClient() { + this(newBuilder()); } - public DrillClient(boolean isDirect) throws OutOfMemoryException { - this(DrillConfig.create(), isDirect); + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(boolean isDirect) { + this(newBuilder() + .setDirectConnection(isDirect)); } - public DrillClient(String fileName) throws OutOfMemoryException { - this(DrillConfig.create(fileName), false); + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(String fileName) { + this(newBuilder() + .setConfigFromFile(fileName)); } - public DrillClient(DrillConfig config) throws OutOfMemoryException { - this(config, null, false); + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(DrillConfig config) { + this(newBuilder() + .setConfig(config)); } - public DrillClient(DrillConfig config, boolean isDirect) - throws OutOfMemoryException { - this(config, null, isDirect); + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(DrillConfig config, boolean isDirect) { + this(newBuilder() + .setConfig(config) + .setDirectConnection(isDirect)); } - public DrillClient(DrillConfig config, ClusterCoordinator coordinator) - throws OutOfMemoryException { - this(config, coordinator, null, false); + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(DrillConfig config, ClusterCoordinator coordinator) { + this(newBuilder() + .setConfig(config) + .setClusterCoordinator(coordinator)); } - public DrillClient(DrillConfig config, ClusterCoordinator coordinator, boolean isDirect) - throws OutOfMemoryException { - this(config, coordinator, null, isDirect); + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(DrillConfig config, ClusterCoordinator coordinator, boolean isDirect) { + this(newBuilder() + .setConfig(config) + .setClusterCoordinator(coordinator) + .setDirectConnection(isDirect)); } - public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator) - throws OutOfMemoryException { - this(config, coordinator, allocator, false); + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator) { + this(newBuilder() + .setConfig(config) + .setClusterCoordinator(coordinator) + .setAllocator(allocator)); } - public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator, boolean isDirect) { - // if isDirect is true, the client will connect directly to the drillbit instead of - // going thru the zookeeper - this.isDirectConnection = isDirect; - this.ownsZkConnection = coordinator == null && !isDirect; - this.ownsAllocator = allocator == null; - this.allocator = ownsAllocator ? RootAllocatorFactory.newRoot(config) : allocator; - this.config = config; - this.clusterCoordinator = coordinator; + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator, + boolean isDirect) { + this(newBuilder() + .setConfig(config) + .setClusterCoordinator(coordinator) + .setAllocator(allocator) + .setDirectConnection(isDirect)); + } + + // used by DrillClient.Builder + private DrillClient(final Builder builder) { + this.config = builder.config != null ? builder.config : DrillConfig.create(); + + this.ownsAllocator = builder.allocator == null; + this.allocator = !ownsAllocator ? builder.allocator : RootAllocatorFactory.newRoot(config); + + this.isDirectConnection = builder.isDirectConnection; + this.ownsZkConnection = builder.clusterCoordinator == null && !isDirectConnection; + this.clusterCoordinator = builder.clusterCoordinator; // could be null + + this.ownsEventLoopGroup = builder.eventLoopGroup == null; + this.eventLoopGroup = !ownsEventLoopGroup ? builder.eventLoopGroup : + TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.CLIENT_RPC_THREADS), "Client-"); + + this.ownsExecutor = builder.executor == null; + this.executor = !ownsExecutor ? builder.executor : + new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, + new SynchronousQueue(), + new NamedThreadFactory("drill-client-executor-")) { + @Override + protected void afterExecute(final Runnable r, final Throwable t) { + if (t != null) { + logger.error(String.format("%s.run() leaked an exception.", r.getClass().getName()), t); + } + super.afterExecute(r, t); + } + }; + + this.supportComplexTypes = builder.supportComplexTypes; + + // These are currently not exposed to the users of this class through {@link DrillClient.Builder}. + // Reconnection is only done when a cluster coordinator is used (and not when a direct connection is + // made), and these are used only in reconnection. this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES); this.reconnectDelay = config.getInt(ExecConstants.BIT_RETRY_DELAY); - this.supportComplexTypes = config.getBoolean(ExecConstants.CLIENT_SUPPORT_COMPLEX_TYPES); } public DrillConfig getConfig() { @@ -178,10 +273,13 @@ public void setAutoRead(boolean enableAutoRead) { * Default is {@code true}. If set to {@code false}, the complex types are returned as JSON encoded VARCHAR type. * * @throws IllegalStateException if called after a connection has been established. + * + * @deprecated use {@link Builder#setSupportsComplexTypes} while building the client. */ + @Deprecated public void setSupportComplexTypes(boolean supportComplexTypes) { if (connected) { - throw new IllegalStateException("Attempted to modify client connection property after connection has been established."); + throw new IllegalStateException("Attempted to modify a property after connection has been established."); } this.supportComplexTypes = supportComplexTypes; } @@ -199,7 +297,7 @@ public void connect(Properties props) throws RpcException { connect(null, props); } - public synchronized void connect(String connect, Properties props) throws RpcException { + public void connect(String connect, Properties props) throws RpcException { if (connected) { return; } @@ -232,30 +330,24 @@ public synchronized void connect(String connect, Properties props) throws RpcExc if (props != null) { final UserProperties.Builder upBuilder = UserProperties.newBuilder(); for (final String key : props.stringPropertyNames()) { - upBuilder.addProperties(Property.newBuilder().setKey(key).setValue(props.getProperty(key))); + upBuilder.addProperties(Property.newBuilder() + .setKey(key) + .setValue(props.getProperty(key))); } this.props = upBuilder.build(); } - eventLoopGroup = createEventLoop(config.getInt(ExecConstants.CLIENT_RPC_THREADS), "Client-"); - executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, - new SynchronousQueue(), - new NamedThreadFactory("drill-client-executor-")) { - @Override - protected void afterExecute(final Runnable r, final Throwable t) { - if (t != null) { - logger.error("{}.run() leaked an exception.", r.getClass().getName(), t); - } - super.afterExecute(r, t); - } - }; client = new UserClient(config, supportComplexTypes, allocator, eventLoopGroup, executor); logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort()); connect(endpoint); connected = true; } + /** + * @deprecated use {@link TransportCheck#createEventLoopGroup} directly. + */ + @Deprecated protected static EventLoopGroup createEventLoop(int size, String prefix) { return TransportCheck.createEventLoopGroup(size, prefix); } @@ -278,6 +370,7 @@ public synchronized boolean reconnect() { connect(endpoints.iterator().next()); return true; } catch (Exception e) { + logger.info(String.format("Trying to reconnect (#%s).", retry)); } } return false; @@ -289,6 +382,28 @@ private void connect(DrillbitEndpoint endpoint) throws RpcException { f.checkedGet(); } + /* + * Helper method to generate the UserCredentials message from the properties. + */ + private UserBitShared.UserCredentials getUserCredentials() { + // If username is not propagated as one of the properties + String userName = "anonymous"; + + if (props != null) { + for (Property property: props.getPropertiesList()) { + if (property.getKey().equalsIgnoreCase("user") && + !Strings.isNullOrEmpty(property.getValue())) { + userName = property.getValue(); + break; + } + } + } + + return UserBitShared.UserCredentials.newBuilder() + .setUserName(userName) + .build(); + } + public BufferAllocator getAllocator() { return allocator; } @@ -298,36 +413,43 @@ public BufferAllocator getAllocator() { */ @Override public void close() { - if (this.client != null) { - this.client.close(); - } - if (ownsAllocator && allocator != null) { - AutoCloseables.closeNoChecked(allocator); - } - if (ownsZkConnection) { - if (clusterCoordinator != null) { - try { - clusterCoordinator.close(); - clusterCoordinator = null; - } catch (Exception e) { - logger.warn("Error while closing Cluster Coordinator.", e); - } - } - } - if (eventLoopGroup != null) { - eventLoopGroup.shutdownGracefully(); + try { + AutoCloseables.close(client, + ownsAllocator ? allocator : null, + ownsZkConnection ? clusterCoordinator : null); + } catch (Exception e) { + logger.error("Error(s) encountered while closing client.", e); } - if (executor != null) { - executor.shutdownNow(); + if (ownsEventLoopGroup) { + TransportCheck.shutDownEventLoopGroup(eventLoopGroup, "Client-", logger); + } + if (ownsExecutor) { + MoreExecutors.shutdownAndAwaitTermination(executor, 1, TimeUnit.SECONDS); } - // TODO: Did DRILL-1735 changes cover this TODO?: - // TODO: fix tests that fail when this is called. - //allocator.close(); connected = false; } + + /** + * Submits a Logical plan for direct execution (bypasses parsing) + * Runs the given plan of the given {@link QueryType type}. The {@link UserResultsListener listener} + * is notified if results arrive, query completed, etc. + * + * @param type query type + * @param plan query plan + * @param resultsListener results listener + */ + public void runQuery(QueryType type, String plan, UserResultsListener resultsListener) { + client.submitQuery(resultsListener, + RunQuery.newBuilder() + .setResultsMode(QueryResultsMode.STREAM_FULL) + .setType(type) + .setPlan(plan) + .build()); + } + /** * Submits a string based query plan for execution and returns the result batches. Supported query types are: *

    @@ -341,11 +463,16 @@ public void close() { * @return a handle for the query result * @throws RpcException */ + @VisibleForTesting public List runQuery(QueryType type, String plan) throws RpcException { checkArgument(type == QueryType.LOGICAL || type == QueryType.PHYSICAL || type == QueryType.SQL, String.format("Only query types %s, %s and %s are supported in this API", QueryType.LOGICAL, QueryType.PHYSICAL, QueryType.SQL)); - final UserProtos.RunQuery query = newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build(); + final RunQuery query = RunQuery.newBuilder() + .setResultsMode(QueryResultsMode.STREAM_FULL) + .setType(type) + .setPlan(plan) + .build(); final ListHoldingResultsListener listener = new ListHoldingResultsListener(query); client.submitQuery(listener, query); return listener.getResults(); @@ -360,7 +487,11 @@ public List runQuery(QueryType type, String plan) throws RpcExce * to run a query without additional planning */ public DrillRpcFuture planQuery(QueryType type, String query, boolean isSplitPlan) { - GetQueryPlanFragments runQuery = GetQueryPlanFragments.newBuilder().setQuery(query).setType(type).setSplitPlan(isSplitPlan).build(); + GetQueryPlanFragments runQuery = GetQueryPlanFragments.newBuilder() + .setQuery(query) + .setType(type) + .setSplitPlan(isSplitPlan) + .build(); return client.planQuery(runQuery); } @@ -394,31 +525,21 @@ public void runQuery(QueryType type, List planFragments, UserResul logger.error("Exception while trying to get JSONString from Array of individual Fragments Json for %s", e); throw new RpcException(e); } - final UserProtos.RunQuery query = newBuilder().setType(type).addAllFragments(planFragments) + final RunQuery query = RunQuery.newBuilder() + .setType(type) + .addAllFragments(planFragments) .setPlan(fragmentsToJsonString) - .setResultsMode(STREAM_FULL).build(); + .setResultsMode(QueryResultsMode.STREAM_FULL) + .build(); client.submitQuery(resultsListener, query); } - /* - * Helper method to generate the UserCredentials message from the properties. + /** + * Cancels the query with the given {@link QueryId query id}. + * + * @param id query id, received through {@link UserResultsListener#queryIdArrived}. + * @return a future that acknowledges cancellation */ - private UserBitShared.UserCredentials getUserCredentials() { - // If username is not propagated as one of the properties - String userName = "anonymous"; - - if (props != null) { - for (Property property: props.getPropertiesList()) { - if (property.getKey().equalsIgnoreCase("user") && !Strings.isNullOrEmpty(property.getValue())) { - userName = property.getValue(); - break; - } - } - } - - return UserBitShared.UserCredentials.newBuilder().setUserName(userName).build(); - } - public DrillRpcFuture cancelQuery(QueryId id) { if(logger.isDebugEnabled()) { logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id)); @@ -426,6 +547,7 @@ public DrillRpcFuture cancelQuery(QueryId id) { return client.send(RpcType.CANCEL_QUERY, id, Ack.class); } + @VisibleForTesting public DrillRpcFuture resumeQuery(final QueryId queryId) { if(logger.isDebugEnabled()) { logger.debug("Resuming query {}", QueryIdHelper.getQueryId(queryId)); @@ -549,8 +671,8 @@ public DrillRpcFuture createPreparedStatement(final */ public void executePreparedStatement(final PreparedStatementHandle preparedStatementHandle, final UserResultsListener resultsListener) { - final RunQuery runQuery = newBuilder() - .setResultsMode(STREAM_FULL) + final RunQuery runQuery = RunQuery.newBuilder() + .setResultsMode(QueryResultsMode.STREAM_FULL) .setType(QueryType.PREPARED_STATEMENT) .setPreparedStatementHandle(preparedStatementHandle) .build(); @@ -568,8 +690,8 @@ public void executePreparedStatement(final PreparedStatementHandle preparedState @VisibleForTesting public List executePreparedStatement(final PreparedStatementHandle preparedStatementHandle) throws RpcException { - final RunQuery runQuery = newBuilder() - .setResultsMode(STREAM_FULL) + final RunQuery runQuery = RunQuery.newBuilder() + .setResultsMode(QueryResultsMode.STREAM_FULL) .setType(QueryType.PREPARED_STATEMENT) .setPreparedStatementHandle(preparedStatementHandle) .build(); @@ -581,21 +703,12 @@ public List executePreparedStatement(final PreparedStatementHand return resultsListener.getResults(); } - /** - * Submits a Logical plan for direct execution (bypasses parsing) - * - * @param plan the plan to execute - */ - public void runQuery(QueryType type, String plan, UserResultsListener resultsListener) { - client.submitQuery(resultsListener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build()); - } - private class ListHoldingResultsListener implements UserResultsListener { private final Vector results = new Vector<>(); private final SettableFuture> future = SettableFuture.create(); - private final UserProtos.RunQuery query ; + private final RunQuery query; - public ListHoldingResultsListener(UserProtos.RunQuery query) { + public ListHoldingResultsListener(RunQuery query) { logger.debug( "Listener created for query \"\"\"{}\"\"\"", query ); this.query = query; } @@ -659,7 +772,8 @@ public void queryIdArrived(QueryId queryId) { } } - private class FutureHandler extends AbstractCheckedFuture implements RpcConnectionHandler, DrillRpcFuture{ + private static class FutureHandler extends AbstractCheckedFuture + implements RpcConnectionHandler, DrillRpcFuture { protected FutureHandler() { super( SettableFuture.create()); } @@ -688,4 +802,142 @@ public DrillBuf getBuffer() { return null; } } + + /** + * Return a new {@link DrillClient.Builder Drill client builder}. + * @return a new builder + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Helper class to construct a {@link DrillClient Drill client}. + */ + public static class Builder { + + private DrillConfig config; + private BufferAllocator allocator; + private ClusterCoordinator clusterCoordinator; + private EventLoopGroup eventLoopGroup; + private ExecutorService executor; + + // defaults + private boolean supportComplexTypes = true; + private boolean isDirectConnection = false; + + /** + * Sets the {@link DrillConfig configuration} for this client. + * + * @param drillConfig drill configuration + * @return this builder + */ + public Builder setConfig(DrillConfig drillConfig) { + this.config = drillConfig; + return this; + } + + /** + * Sets the {@link DrillConfig configuration} for this client based on the given file. + * + * @param fileName configuration file name + * @return this builder + */ + public Builder setConfigFromFile(final String fileName) { + this.config = DrillConfig.create(fileName); + return this; + } + + /** + * Sets the {@link BufferAllocator buffer allocator} to be used by this client. + * If this is not set, an allocator will be created based on the configuration. + * + * If this is set, the caller is responsible for closing the given allocator. + * + * @param allocator buffer allocator + * @return this builder + */ + public Builder setAllocator(final BufferAllocator allocator) { + this.allocator = allocator; + return this; + } + + /** + * Sets the {@link ClusterCoordinator cluster coordinator} that this client + * registers with. If this is not set and the this client does not use a + * {@link #setDirectConnection direct connection}, a cluster coordinator will + * be created based on the configuration. + * + * If this is set, the caller is responsible for closing the given coordinator. + * + * @param clusterCoordinator cluster coordinator + * @return this builder + */ + public Builder setClusterCoordinator(final ClusterCoordinator clusterCoordinator) { + this.clusterCoordinator = clusterCoordinator; + return this; + } + + /** + * Sets the event loop group that to be used by the client. If this is not set, + * an event loop group will be created based on the configuration. + * + * If this is set, the caller is responsible for closing the given event loop group. + * + * @param eventLoopGroup event loop group + * @return this builder + */ + public Builder setEventLoopGroup(final EventLoopGroup eventLoopGroup) { + this.eventLoopGroup = eventLoopGroup; + return this; + } + + /** + * Sets the executor service to be used by the client. If this is not set, + * an executor will be created based on the configuration. + * + * If this is set, the caller is responsible for closing the given executor. + * + * @param executor executor service + * @return this builder + */ + public Builder setExecutorService(final ExecutorService executor) { + this.executor = executor; + return this; + } + + /** + * Sets whether the application is willing to accept complex types (Map, Arrays) + * in the returned result set. Default is {@code true}. If set to {@code false}, + * the complex types are returned as JSON encoded VARCHAR type. + * + * @param supportComplexTypes if client accepts complex types + * @return this builder + */ + public Builder setSupportsComplexTypes(final boolean supportComplexTypes) { + this.supportComplexTypes = supportComplexTypes; + return this; + } + + /** + * Sets whether the client will connect directly to the drillbit instead of going + * through the cluster coordinator (zookeeper). + * + * @param isDirectConnection is direct connection + * @return this builder + */ + public Builder setDirectConnection(final boolean isDirectConnection) { + this.isDirectConnection = isDirectConnection; + return this; + } + + /** + * Builds the drill client. + * + * @return a new drill client + */ + public DrillClient build() { + return new DrillClient(Builder.this); + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java index c285fb7c699..0e9c8201ba7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java @@ -115,11 +115,17 @@ public int submitQuery(String planLocation, String queryString, String type, Str drillbits[i] = new Drillbit(config, serviceSet); drillbits[i].run(); } - client = new DrillClient(config, serviceSet.getCoordinator()); + client = DrillClient.newBuilder() + .setConfig(config) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build(); } else { ZKClusterCoordinator clusterCoordinator = new ZKClusterCoordinator(config, zkQuorum); clusterCoordinator.start(10000); - client = new DrillClient(config, clusterCoordinator); + client = DrillClient.newBuilder() + .setConfig(config) + .setClusterCoordinator(clusterCoordinator) + .build(); } client.connect(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AbstractDrillLoginService.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AbstractDrillLoginService.java index 62ddca905d8..4ba4c053373 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AbstractDrillLoginService.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AbstractDrillLoginService.java @@ -46,8 +46,11 @@ protected DrillClient createDrillClient(final String userName, final String pass try { // Create a DrillClient - drillClient = new DrillClient(drillbitContext.getConfig(), - drillbitContext.getClusterCoordinator(), drillbitContext.getAllocator()); + drillClient = DrillClient.newBuilder() + .setConfig(drillbitContext.getConfig()) + .setClusterCoordinator(drillbitContext.getClusterCoordinator()) + .setAllocator(drillbitContext.getAllocator()) + .build(); final Properties props = new Properties(); props.setProperty("user", userName); if (password != null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java index 18539ff9ffb..9b643d13a58 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java @@ -118,8 +118,11 @@ public AnonDrillUserPrincipal(final DrillbitContext drillbitContext) { public DrillClient getDrillClient() throws IOException { try { // Create a DrillClient - drillClient = new DrillClient(drillbitContext.getConfig(), - drillbitContext.getClusterCoordinator(), drillbitContext.getAllocator()); + drillClient = DrillClient.newBuilder() + .setConfig(drillbitContext.getConfig()) + .setClusterCoordinator(drillbitContext.getClusterCoordinator()) + .setAllocator(drillbitContext.getAllocator()) + .build(); drillClient.connect(); return drillClient; } catch (final Exception e) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java index 1844c32fdba..2616ce37f6a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java +++ b/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java @@ -63,7 +63,10 @@ private QueryTestUtil() { */ public static DrillClient createClient(final DrillConfig drillConfig, final RemoteServiceSet remoteServiceSet, final int maxWidth, final Properties props) throws RpcException, OutOfMemoryException { - final DrillClient drillClient = new DrillClient(drillConfig, remoteServiceSet.getCoordinator()); + final DrillClient drillClient = DrillClient.newBuilder() + .setConfig(drillConfig) + .setClusterCoordinator(remoteServiceSet.getCoordinator()) + .build(); drillClient.connect(props); final List results = drillClient.runQuery( diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java index e85ed8b3eb5..de1a48de2b0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java @@ -43,7 +43,10 @@ public void runTest(Object[] values, String planPath, String dataPath) throws Th try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestDateFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestDateFunctions.java index 4718b395a90..b80d9a63037 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestDateFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestDateFunctions.java @@ -46,7 +46,10 @@ public class TestDateFunctions extends PopUnitTestBase { public void testCommon(String[] expectedResults, String physicalPlan, String resourceFile) throws Exception { try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestMultiInputAdd.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestMultiInputAdd.java index cf5c239a699..324be73ed4c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestMultiInputAdd.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestMultiInputAdd.java @@ -53,7 +53,10 @@ public void testMultiInputAdd(@Injectable final DrillbitContext bitContext, @Inj { try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestNewAggregateFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestNewAggregateFunctions.java index 68e6eac007a..5bfbfc29931 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestNewAggregateFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestNewAggregateFunctions.java @@ -44,8 +44,10 @@ public void runTest(String physicalPlan, String inputDataFile, try (RemoteServiceSet serviceSet = RemoteServiceSet .getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, - serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java index 93a95700a6b..04377fb5946 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java @@ -40,7 +40,10 @@ public void TestSingleBroadcastExchangeWithTwoScans() throws Exception { try (Drillbit bit1 = new Drillbit(CONFIG, serviceSet); Drillbit bit2 = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); bit2.run(); @@ -68,7 +71,10 @@ public void TestMultipleSendLocationBroadcastExchange() throws Exception { try (Drillbit bit1 = new Drillbit(CONFIG, serviceSet); Drillbit bit2 = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); bit2.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java index 8cab43bf7f3..df82e77268f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java @@ -355,7 +355,10 @@ public void testCastNumException(@Injectable final DrillbitContext bitContext, public void testCastFromNullablCol() throws Throwable { final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try(final Drillbit bit = new Drillbit(CONFIG, serviceSet); - final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + final DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit.run(); client.connect(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastVarCharToBigInt.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastVarCharToBigInt.java index 8d904eb124f..74ad4e04727 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastVarCharToBigInt.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastVarCharToBigInt.java @@ -44,7 +44,10 @@ public class TestCastVarCharToBigInt extends PopUnitTestBase { public void testCastToBigInt() throws Exception { try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDecimal.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDecimal.java index 1424a088dfd..684bc5d51f1 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDecimal.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDecimal.java @@ -49,7 +49,10 @@ public void testSimpleDecimal() throws Exception { */ try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); @@ -94,7 +97,10 @@ public void testCastFromFloat() throws Exception { // Function checks for casting from Float, Double to Decimal data types try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); @@ -139,7 +145,10 @@ public void testSimpleDecimalArithmetic() throws Exception { // Function checks arithmetic operations on Decimal18 try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); @@ -190,7 +199,10 @@ public void testComplexDecimal() throws Exception { */ try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); @@ -233,7 +245,10 @@ public void testComplexDecimalSort() throws Exception { // Function checks if sort output on complex decimal type works try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); @@ -282,7 +297,10 @@ public void testSimpleDecimalMathFunc() throws Exception { try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDistributedFragmentRun.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDistributedFragmentRun.java index 099b7bd9219..a7f9a627ef8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDistributedFragmentRun.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDistributedFragmentRun.java @@ -41,7 +41,11 @@ public class TestDistributedFragmentRun extends PopUnitTestBase{ public void oneBitOneExchangeOneEntryRun() throws Exception{ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); - try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());){ + try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); List results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/physical_single_exchange.json"), Charsets.UTF_8)); @@ -61,7 +65,11 @@ public void oneBitOneExchangeOneEntryRun() throws Exception{ public void oneBitOneExchangeTwoEntryRun() throws Exception{ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); - try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());){ + try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); List results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/physical_single_exchange_double_entry.json"), Charsets.UTF_8)); @@ -80,7 +88,11 @@ public void oneBitOneExchangeTwoEntryRun() throws Exception{ public void oneBitOneExchangeTwoEntryRunLogical() throws Exception{ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); - try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());){ + try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); List results = client.runQuery(QueryType.LOGICAL, Files.toString(FileUtils.getResourceAsFile("/scan_screen_logical.json"), Charsets.UTF_8)); @@ -99,7 +111,12 @@ public void oneBitOneExchangeTwoEntryRunLogical() throws Exception{ public void twoBitOneExchangeTwoEntryRun() throws Exception{ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); - try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); Drillbit bit2 = new Drillbit(CONFIG, serviceSet); DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());){ + try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); bit2.run(); client.connect(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExtractFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExtractFunctions.java index 5740512d0d9..40ecf2b56b2 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExtractFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExtractFunctions.java @@ -105,7 +105,10 @@ private void testFrom(String fromType, String testDataFile, String columnName, long expectedValues[][]) throws Exception { try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java index 1d090ecf5cf..6d49889527d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java @@ -40,8 +40,11 @@ public void twoBitTwoExchangeTwoEntryRun() throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - Drillbit bit2 = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); bit2.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java index 74aff18168e..851dbbd1930 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java @@ -132,7 +132,10 @@ public void testFilterPlan() throws Exception { final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (final Drillbit bit1 = new Drillbit(config, serviceSet); - final DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) { + final DrillClient client = DrillClient.newBuilder() + .setConfig(config) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); final List results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, @@ -161,7 +164,10 @@ public void testJoinPlan() throws Exception { final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (final Drillbit bit1 = new Drillbit(config, serviceSet); - final DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) { + final DrillClient client = DrillClient.newBuilder() + .setConfig(config) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); final List results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, @@ -190,7 +196,10 @@ public void testFilterString() throws Exception { final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (final Drillbit bit1 = new Drillbit(config, serviceSet); - final DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) { + final DrillClient client = DrillClient.newBuilder() + .setConfig(config) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); final List results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.LOGICAL, @@ -229,7 +238,10 @@ public void testLogicalJsonScan() throws Exception { final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (final Drillbit bit1 = new Drillbit(config, serviceSet); - final DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) { + final DrillClient client = DrillClient.newBuilder() + .setConfig(config) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); final List results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.LOGICAL, @@ -268,7 +280,10 @@ public void testOrderVarbinary() throws Exception { final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (final Drillbit bit1 = new Drillbit(config, serviceSet); - final DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) { + final DrillClient client = DrillClient.newBuilder() + .setConfig(config) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); final List results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestReverseImplicitCast.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestReverseImplicitCast.java index 76c47189684..51f9e4d2b0f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestReverseImplicitCast.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestReverseImplicitCast.java @@ -50,7 +50,10 @@ public void twoWayCast(@Injectable final DrillbitContext bitContext, // Function checks for casting from Float, Double to Decimal data types try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java index 182e19e2b24..206b82f6a0f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java @@ -45,8 +45,11 @@ public class TestSimpleFragmentRun extends PopUnitTestBase { @Test public void runNoExchangeFragment() throws Exception { try (final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); - final Drillbit bit = new Drillbit(CONFIG, serviceSet); - final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + final Drillbit bit = new Drillbit(CONFIG, serviceSet); + final DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); @@ -107,7 +110,10 @@ public void runNoExchangeFragment() throws Exception { public void runJSONScanPopFragment() throws Exception { try (final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); final Drillbit bit = new Drillbit(CONFIG, serviceSet); - final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + final DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java index 9c24f79841f..e015cbea4d8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java @@ -40,8 +40,11 @@ public void twoBitTwoExchangeTwoEntryRun() throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - Drillbit bit2 = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); bit2.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestSimpleTopN.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestSimpleTopN.java index 85d62c3f366..032fca5fd43 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestSimpleTopN.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestSimpleTopN.java @@ -47,8 +47,11 @@ public void sortOneKeyAscending() throws Throwable{ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - Drillbit bit2 = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); bit2.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java index c76b39c1d9e..67350f16fb6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java @@ -111,7 +111,10 @@ public void simpleEqualityJoin() throws Throwable { // Function checks hash join with single equality condition try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); @@ -154,7 +157,10 @@ public void hjWithExchange(@Injectable final DrillbitContext bitContext, // Function tests with hash join with exchanges try (final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); final Drillbit bit = new Drillbit(CONFIG, serviceSet); - final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + final DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); @@ -182,7 +188,10 @@ public void multipleConditionJoin(@Injectable final DrillbitContext bitContext, // Function tests hash join with multiple join conditions try (final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); final Drillbit bit = new Drillbit(CONFIG, serviceSet); - final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + final DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); @@ -228,7 +237,10 @@ public void hjWithExchange1(@Injectable final DrillbitContext bitContext, // Another test for hash join with exchanges try (final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); final Drillbit bit = new Drillbit(CONFIG, serviceSet); - final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + final DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); @@ -254,7 +266,10 @@ public void testHashJoinExprInCondition() throws Exception { final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (final Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + final DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java index bb9c2bd5e96..5925b2c5b4f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java @@ -283,7 +283,10 @@ public void testMergeJoinInnerEmptyBatch() throws Exception { final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (final Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + final DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); @@ -307,7 +310,10 @@ public void testMergeJoinLeftEmptyBatch() throws Exception { final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (final Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + final DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); @@ -331,7 +337,10 @@ public void testMergeJoinRightEmptyBatch() throws Exception { final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (final Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + final DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); @@ -355,7 +364,10 @@ public void testMergeJoinExprInCondition() throws Exception { final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (final Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + final DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinMulCondition.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinMulCondition.java index 34c15c9ef94..ec8399550b7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinMulCondition.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinMulCondition.java @@ -51,7 +51,10 @@ public void testMergeJoinMultiKeys() throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); @@ -77,7 +80,10 @@ public void testMergeJoinInnerNullKey() throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); List results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, @@ -101,7 +107,10 @@ public void testMergeJoinLeftOuterNullKey() throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java index 537a58341f9..44736123d7c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java @@ -37,7 +37,6 @@ import org.junit.Test; import com.google.common.base.Charsets; -import com.google.common.collect.Lists; import com.google.common.io.Files; public class TestMergingReceiver extends PopUnitTestBase { @@ -48,8 +47,11 @@ public void twoBitTwoExchange() throws Exception { final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (final Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - final Drillbit bit2 = new Drillbit(CONFIG, serviceSet); - final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + final Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + final DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); bit2.run(); client.connect(); @@ -76,8 +78,11 @@ public void testMultipleProvidersMixedSizes() throws Exception { final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (final Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - final Drillbit bit2 = new Drillbit(CONFIG, serviceSet); - final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + final Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + final DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); bit2.run(); @@ -123,8 +128,11 @@ public void handleEmptyBatch() throws Exception { final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (final Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - final Drillbit bit2 = new Drillbit(CONFIG, serviceSet); - final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + final Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + final DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); bit2.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java index 8d690d3f058..afaa925fa32 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java @@ -67,7 +67,10 @@ public void twoBitTwoExchangeRun() throws Exception { try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); Drillbit bit2 = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); bit2.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java index b34a4667d5b..fa611b041e5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java @@ -138,8 +138,11 @@ public void sortOneKeyDescendingExternalSort() throws Throwable{ DrillConfig config = DrillConfig.create("drill-external-sort.conf"); try (Drillbit bit1 = new Drillbit(config, serviceSet); - Drillbit bit2 = new Drillbit(config, serviceSet); - DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) { + Drillbit bit2 = new Drillbit(config, serviceSet); + DrillClient client = DrillClient.newBuilder() + .setConfig(config) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); bit2.run(); @@ -192,7 +195,10 @@ public void outOfMemoryExternalSort() throws Throwable{ DrillConfig config = DrillConfig.create("drill-oom-xsort.conf"); try (Drillbit bit1 = new Drillbit(config, serviceSet); - DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) { + DrillClient client = DrillClient.newBuilder() + .setConfig(config) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestDateTypes.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestDateTypes.java index a2401e8cb8b..bd036eaecc9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestDateTypes.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestDateTypes.java @@ -48,7 +48,10 @@ public class TestDateTypes extends PopUnitTestBase { public void testDate() throws Exception { try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); @@ -82,7 +85,10 @@ public void testDate() throws Exception { public void testSortDate() throws Exception { try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); @@ -116,7 +122,10 @@ public void testSortDate() throws Exception { public void testTimeStamp() throws Exception { try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); @@ -150,7 +159,10 @@ public void testTimeStamp() throws Exception { public void testInterval() throws Exception { try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); @@ -208,7 +220,10 @@ public void testInterval() throws Exception { public void testLiterals() throws Exception { try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); @@ -247,7 +262,10 @@ public void testLiterals() throws Exception { public void testDateAdd() throws Exception { try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { // run query. bit.run(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/DrillClientFactory.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/DrillClientFactory.java index a10a76d7d44..1a289b0a4f8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/DrillClientFactory.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/DrillClientFactory.java @@ -21,6 +21,10 @@ import org.apache.drill.exec.exception.OutOfMemoryException; import org.glassfish.hk2.api.Factory; +/** + * @deprecated Does not provide any additional functionality compared to {@link DrillClient.Builder}. + */ +@Deprecated public class DrillClientFactory implements Factory { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClientFactory.class); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java index 8714b30eb79..0cee5f3c94b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java @@ -54,7 +54,11 @@ public void testParseParquetPhysicalPlan() throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); DrillConfig config = DrillConfig.create(); - try (Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) { + try (Drillbit bit1 = new Drillbit(config, serviceSet); + DrillClient client = DrillClient.newBuilder() + .setConfig(config) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); List results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, Resources.toString(Resources.getResource(fileName),Charsets.UTF_8)); @@ -124,7 +128,9 @@ public void queryIdArrived(QueryId queryId) { public void testParseParquetPhysicalPlanRemote() throws Exception { DrillConfig config = DrillConfig.create(); - try(DrillClient client = new DrillClient(config);) { + try(DrillClient client = DrillClient.newBuilder() + .setConfig(config) + .build()) { client.connect(); ParquetResultsListener listener = new ParquetResultsListener(); Stopwatch watch = Stopwatch.createStarted(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java index 5e781d2b06a..bb97f789db6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java @@ -41,7 +41,10 @@ public void testFullExecution() throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + DrillClient client = DrillClient.newBuilder() + .setConfig(CONFIG) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexToJson.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexToJson.java index 656429e2988..8da5b5ed6e7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexToJson.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexToJson.java @@ -39,7 +39,10 @@ public void test() throws Exception { List results; RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); - client = new DrillClient(config, serviceSet.getCoordinator()); + client = DrillClient.newBuilder() + .setConfig(config) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build(); client.setSupportComplexTypes(false); client.connect(); results = testSqlWithResults("select * from dfs_test.`[WORKING_PATH]/src/test/resources/store/text/data/regions.csv`"); @@ -55,7 +58,10 @@ public void test() throws Exception { } client.close(); - client = new DrillClient(config, serviceSet.getCoordinator()); + client = DrillClient.newBuilder() + .setConfig(config) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build(); client.setSupportComplexTypes(true); client.connect(); results = testSqlWithResults("select * from dfs_test.`[WORKING_PATH]/src/test/resources/store/text/data/regions.csv`"); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java index 271f29f7229..5a85a3ff8b4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java @@ -24,7 +24,6 @@ import org.apache.drill.BaseTestQuery; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.util.FileUtils; -import org.apache.drill.exec.ExecTest; import org.apache.drill.exec.client.DrillClient; import org.apache.drill.exec.rpc.user.QueryDataBatch; import org.apache.drill.exec.server.Drillbit; @@ -43,7 +42,10 @@ public void testMultipleExchangesSingleThread() throws Exception { DrillConfig conf = DrillConfig.create("drill-spool-test-module.conf"); try(Drillbit bit1 = new Drillbit(conf, serviceSet); - DrillClient client = new DrillClient(conf, serviceSet.getCoordinator());) { + DrillClient client = DrillClient.newBuilder() + .setConfig(conf) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build()) { bit1.run(); client.connect(); diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java index 855a27ef592..bb6ebed89d1 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java @@ -136,12 +136,18 @@ protected DrillConnectionImpl(DriverImpl driver, AvaticaFactory factory, makeTmpSchemaLocationsUnique(bit.getContext().getStorage(), info); - this.client = new DrillClient(dConfig, set.getCoordinator()); + this.client = DrillClient.newBuilder() + .setConfig(dConfig) + .setClusterCoordinator(set.getCoordinator()) + .build(); this.client.connect(null, info); } else if(config.isDirect()) { final DrillConfig dConfig = DrillConfig.forClient(); this.allocator = RootAllocatorFactory.newRoot(dConfig); - this.client = new DrillClient(dConfig, true); // Get a direct connection + this.client = DrillClient.newBuilder() + .setConfig(dConfig) + .setDirectConnection(true) // Get a direct connection + .build(); this.client.connect(config.getZookeeperConnectionString(), info); } else { final DrillConfig dConfig = DrillConfig.forClient(); diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java index b54d73edd1b..48db46c08fc 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.rpc; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -31,14 +30,11 @@ import java.io.IOException; import java.net.BindException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode; -import com.google.common.base.Stopwatch; import com.google.protobuf.Internal.EnumLite; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; @@ -211,22 +207,7 @@ public int bind(final int initialPort, boolean allowPortHunting) { @Override public void close() throws IOException { - try { - Stopwatch watch = Stopwatch.createStarted(); - // this takes 1s to complete - // known issue: https://github.com/netty/netty/issues/2545 - eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).get(); - long elapsed = watch.elapsed(MILLISECONDS); - if (elapsed > 500) { - logger.info("closed eventLoopGroup " + eventLoopGroup + " in " + elapsed + " ms"); - } - } catch (final InterruptedException | ExecutionException e) { - logger.warn("Failure while shutting down {}. ", this.getClass().getName(), e); - - // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the - // interruption and respond to it if it wants to. - Thread.currentThread().interrupt(); - } + TransportCheck.shutDownEventLoopGroup(eventLoopGroup, this.getClass().getName(), logger); } } diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java index 4886c989a8f..9a3ef2189ec 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc; +import com.google.common.base.Stopwatch; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; @@ -27,15 +28,17 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import org.apache.drill.exec.util.SystemPropertyUtil; +import org.slf4j.Logger; import java.util.Locale; - +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; /** * TransportCheck decides whether or not to use the native EPOLL mechanism for communication. */ public class TransportCheck { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TransportCheck.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TransportCheck.class); private static final String USE_LINUX_EPOLL = "drill.exec.enable-epoll"; @@ -77,4 +80,37 @@ public static EventLoopGroup createEventLoopGroup(int nThreads, String prefix) { return new NioEventLoopGroup(nThreads, new NamedThreadFactory(prefix)); } } + + /** + * Shuts down the given event loop group gracefully. + * + * @param eventLoopGroup event loop group to shutdown + * @param groupName group name + * @param logger logger + */ + public static void shutDownEventLoopGroup(final EventLoopGroup eventLoopGroup, + final String groupName, + final Logger logger) { + try { + final Stopwatch watch = Stopwatch.createStarted(); + // This call takes 1s to complete; known issue: https://github.com/netty/netty/issues/2545 + // From http://netty.io/wiki/user-guide-for-4.x.html#shutting-down-your-application : + // This call terminates EventLoopGroup completely and closes all Channels that belong to the group. + eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).get(); + final long elapsed = watch.elapsed(TimeUnit.MILLISECONDS); + if (elapsed > 500) { + logger.info("Closed " + groupName + " in " + elapsed + " ms."); + } + } catch (final InterruptedException | ExecutionException e) { + logger.warn("Failure while shutting down {}.", groupName, e); + + // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the + // interruption and respond to it if it wants to. + Thread.currentThread().interrupt(); + } + } + + // prevents instantiation + private TransportCheck() { + } } From b5ee0fb6e2fd99c5e700a60261d8990642350983 Mon Sep 17 00:00:00 2001 From: Sudheesh Katkam Date: Wed, 10 Aug 2016 15:39:41 -0700 Subject: [PATCH 3/3] DRILL-4841: Use server event loop for web clients --- .../org/apache/drill/exec/server/BootStrapContext.java | 7 +++++++ .../java/org/apache/drill/exec/server/DrillbitContext.java | 4 ++-- .../exec/server/rest/auth/AbstractDrillLoginService.java | 2 ++ .../drill/exec/server/rest/auth/DrillUserPrincipal.java | 2 ++ .../java/org/apache/drill/exec/service/ServiceEngine.java | 6 +----- 5 files changed, 14 insertions(+), 7 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java index de6b6c3c0e3..ac68b12fdae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java @@ -41,6 +41,7 @@ public class BootStrapContext implements AutoCloseable { private final DrillConfig config; private final EventLoopGroup loop; private final EventLoopGroup loop2; + private final EventLoopGroup userLoopGroup; private final MetricRegistry metrics; private final BufferAllocator allocator; private final ScanResult classpathScan; @@ -53,6 +54,8 @@ public BootStrapContext(DrillConfig config, ScanResult classpathScan) { this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitClient-"); // Note that metrics are stored in a static instance this.metrics = DrillMetrics.getRegistry(); + this.userLoopGroup = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.USER_SERVER_RPC_THREADS), + "UserServer-"); this.allocator = RootAllocatorFactory.newRoot(config); this.executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), @@ -83,6 +86,10 @@ public EventLoopGroup getBitClientLoopGroup() { return loop2; } + public EventLoopGroup getUserLoopGroup() { + return userLoopGroup; + } + public MetricRegistry getMetrics() { return metrics; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java index 1af6d113117..f43ec70f096 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java @@ -132,8 +132,8 @@ public StoragePluginRegistry getStorage() { return this.storagePlugins; } - public EventLoopGroup getBitLoopGroup() { - return context.getBitLoopGroup(); + public EventLoopGroup getUserLoopGroup() { + return context.getUserLoopGroup(); } public DataConnectionCreator getDataConnectionsPool() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AbstractDrillLoginService.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AbstractDrillLoginService.java index 4ba4c053373..90a40507ba4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AbstractDrillLoginService.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/AbstractDrillLoginService.java @@ -50,6 +50,8 @@ protected DrillClient createDrillClient(final String userName, final String pass .setConfig(drillbitContext.getConfig()) .setClusterCoordinator(drillbitContext.getClusterCoordinator()) .setAllocator(drillbitContext.getAllocator()) + .setEventLoopGroup(drillbitContext.getUserLoopGroup()) + .setExecutorService(drillbitContext.getExecutor()) .build(); final Properties props = new Properties(); props.setProperty("user", userName); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java index 9b643d13a58..f400bd89fe7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/DrillUserPrincipal.java @@ -122,6 +122,8 @@ public DrillClient getDrillClient() throws IOException { .setConfig(drillbitContext.getConfig()) .setClusterCoordinator(drillbitContext.getClusterCoordinator()) .setAllocator(drillbitContext.getAllocator()) + .setEventLoopGroup(drillbitContext.getUserLoopGroup()) + .setExecutorService(drillbitContext.getExecutor()) .build(); drillClient.connect(); return drillClient; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java index d5055461c96..ff826613740 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java @@ -19,7 +19,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import io.netty.buffer.PooledByteBufAllocatorL; -import io.netty.channel.EventLoopGroup; import java.net.InetAddress; import java.net.UnknownHostException; @@ -35,7 +34,6 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.rpc.TransportCheck; import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.rpc.control.ControllerImpl; import org.apache.drill.exec.rpc.control.WorkEventBus; @@ -73,13 +71,11 @@ public ServiceEngine(ControlMessageHandler controlMessageHandler, UserWorker use "drill.exec.rpc.bit.server.memory.control.reservation", "drill.exec.rpc.bit.server.memory.control.maximum"); dataAllocator = newAllocator(context, "rpc:bit-data", "drill.exec.rpc.bit.server.memory.data.reservation", "drill.exec.rpc.bit.server.memory.data.maximum"); - final EventLoopGroup eventLoopGroup = TransportCheck.createEventLoopGroup( - context.getConfig().getInt(ExecConstants.USER_SERVER_RPC_THREADS), "UserServer-"); this.userServer = new UserServer( context.getConfig(), context.getClasspathScan(), userAllocator, - eventLoopGroup, + context.getUserLoopGroup(), userWorker, context.getExecutor()); this.controller = new ControllerImpl(context, controlMessageHandler, controlAllocator, allowPortHunting);