diff --git a/common/src/main/java/org/apache/drill/common/AutoCloseables.java b/common/src/main/java/org/apache/drill/common/AutoCloseables.java index fcdfe14de51..ede316272d1 100644 --- a/common/src/main/java/org/apache/drill/common/AutoCloseables.java +++ b/common/src/main/java/org/apache/drill/common/AutoCloseables.java @@ -66,7 +66,7 @@ public static void close(AutoCloseable... autoCloseables) throws Exception { /** * Closes all autoCloseables if not null and suppresses subsequent exceptions if more than one - * @param autoCloseables the closeables to close + * @param ac the closeables to close */ public static void close(Iterable ac) throws Exception { Exception topLevelException = null; @@ -87,4 +87,29 @@ public static void close(Iterable ac) throws Exception throw topLevelException; } } + + /** + * close() an {@see java.lang.AutoCloseable} without throwing a (checked) + * {@see java.lang.Exception}. This wraps the close() call with a + * try-catch that will rethrow an Exception wrapped with a + * {@see java.lang.RuntimeException}, providing a way to call close() + * without having to do the try-catch everywhere or propagate the Exception. + * + * @param autoCloseable the AutoCloseable to close; may be null + * @throws RuntimeException if an Exception occurs; the Exception is + * wrapped by the RuntimeException + */ + public static void closeNoChecked(final AutoCloseable autoCloseable) { + if (autoCloseable != null) { + try { + autoCloseable.close(); + } catch(final Exception e) { + throw new RuntimeException("Exception while closing", e); + } + } + } + + // prevents instantiation + private AutoCloseables() { + } } diff --git a/common/src/main/java/org/apache/drill/common/DrillAutoCloseables.java b/common/src/main/java/org/apache/drill/common/DrillAutoCloseables.java deleted file mode 100644 index 11fb9a8f579..00000000000 --- a/common/src/main/java/org/apache/drill/common/DrillAutoCloseables.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.common; - -/** - * Provides functionality comparable to Guava's Closeables for AutoCloseables. - */ -public class DrillAutoCloseables { - /** - * Constructor. Prevents construction for class of static utilities. - */ - private DrillAutoCloseables() { - } - - /** - * close() an {@see java.lang.AutoCloseable} without throwing a (checked) - * {@see java.lang.Exception}. This wraps the close() call with a - * try-catch that will rethrow an Exception wrapped with a - * {@see java.lang.RuntimeException}, providing a way to call close() - * without having to do the try-catch everywhere or propagate the Exception. - * - * @param closeable the AutoCloseable to close; may be null - * @throws RuntimeException if an Exception occurs; the Exception is - * wrapped by the RuntimeException - */ - public static void closeNoChecked(final AutoCloseable autoCloseable) { - if (autoCloseable != null) { - try { - autoCloseable.close(); - } catch(final Exception e) { - throw new RuntimeException("Exception while closing", e); - } - } - } -} diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java index 7873b8075ee..075a44ea5b3 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java @@ -34,7 +34,10 @@ public class TestHBaseCFAsJSONString extends BaseHBaseTest { @BeforeClass public static void openMyClient() throws Exception { parent_client = client; - client = new DrillClient(config, serviceSet.getCoordinator()); + client = DrillClient.newBuilder() + .setConfig(config) + .setClusterCoordinator(serviceSet.getCoordinator()) + .build(); client.setSupportComplexTypes(false); client.connect(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index e81a4fb0ce8..9ea22248597 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -17,10 +17,9 @@ */ package org.apache.drill.exec.client; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL; -import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder; +import static com.google.common.base.Preconditions.checkState; + import io.netty.buffer.DrillBuf; import io.netty.channel.EventLoopGroup; @@ -36,13 +35,12 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.drill.common.DrillAutoCloseables; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; -import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.RootAllocatorFactory; import org.apache.drill.exec.proto.BitControl.PlanFragment; @@ -52,7 +50,6 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; import org.apache.drill.exec.proto.UserBitShared.QueryType; -import org.apache.drill.exec.proto.UserProtos; import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq; import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp; import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp; @@ -68,6 +65,7 @@ import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle; import org.apache.drill.exec.proto.UserProtos.Property; import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments; +import org.apache.drill.exec.proto.UserProtos.QueryResultsMode; import org.apache.drill.exec.proto.UserProtos.RpcType; import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.proto.UserProtos.UserProperties; @@ -90,79 +88,175 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.util.concurrent.AbstractCheckedFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; /** * Thin wrapper around a UserClient that handles connect/close and transforms * String into ByteBuf. + * + * To create non-default objects, use {@link DrillClient.Builder the builder class}. + * E.g. + * + * DrillClient client = DrillClient.newBuilder() + * .setConfig(...) + * .setIsDirectConnection(true) + * .build(); + * + * + * Except for {@link #runQuery} and {@link #cancelQuery}, this class is generally not thread safe. */ public class DrillClient implements Closeable, ConnectionThrottle { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class); private static final ObjectMapper objectMapper = new ObjectMapper(); private final DrillConfig config; - private UserClient client; - private UserProperties props = null; - private volatile ClusterCoordinator clusterCoordinator; - private volatile boolean connected = false; private final BufferAllocator allocator; - private int reconnectTimes; - private int reconnectDelay; - private boolean supportComplexTypes; - private final boolean ownsZkConnection; + private final EventLoopGroup eventLoopGroup; + private final ExecutorService executor; + private final boolean isDirectConnection; + private final int reconnectTimes; + private final int reconnectDelay; + + // TODO: clusterCoordinator should be initialized in the constructor. + // Currently, initialization is tightly coupled with #connect. + private ClusterCoordinator clusterCoordinator; + + // checks if this client owns these resources (used when closing) private final boolean ownsAllocator; - private final boolean isDirectConnection; // true if the connection bypasses zookeeper and connects directly to a drillbit - private EventLoopGroup eventLoopGroup; - private ExecutorService executor; + private final boolean ownsZkConnection; + private final boolean ownsEventLoopGroup; + private final boolean ownsExecutor; + + // once #setSupportComplexTypes() is removed, make this final + private boolean supportComplexTypes; + + private UserClient client; + private UserProperties props; + private boolean connected; - public DrillClient() throws OutOfMemoryException { - this(DrillConfig.create(), false); + public DrillClient() { + this(newBuilder()); } - public DrillClient(boolean isDirect) throws OutOfMemoryException { - this(DrillConfig.create(), isDirect); + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(boolean isDirect) { + this(newBuilder() + .setDirectConnection(isDirect)); } - public DrillClient(String fileName) throws OutOfMemoryException { - this(DrillConfig.create(fileName), false); + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(String fileName) { + this(newBuilder() + .setConfigFromFile(fileName)); } - public DrillClient(DrillConfig config) throws OutOfMemoryException { - this(config, null, false); + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(DrillConfig config) { + this(newBuilder() + .setConfig(config)); } - public DrillClient(DrillConfig config, boolean isDirect) - throws OutOfMemoryException { - this(config, null, isDirect); + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(DrillConfig config, boolean isDirect) { + this(newBuilder() + .setConfig(config) + .setDirectConnection(isDirect)); } - public DrillClient(DrillConfig config, ClusterCoordinator coordinator) - throws OutOfMemoryException { - this(config, coordinator, null, false); + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(DrillConfig config, ClusterCoordinator coordinator) { + this(newBuilder() + .setConfig(config) + .setClusterCoordinator(coordinator)); } - public DrillClient(DrillConfig config, ClusterCoordinator coordinator, boolean isDirect) - throws OutOfMemoryException { - this(config, coordinator, null, isDirect); + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(DrillConfig config, ClusterCoordinator coordinator, boolean isDirect) { + this(newBuilder() + .setConfig(config) + .setClusterCoordinator(coordinator) + .setDirectConnection(isDirect)); } - public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator) - throws OutOfMemoryException { - this(config, coordinator, allocator, false); + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator) { + this(newBuilder() + .setConfig(config) + .setClusterCoordinator(coordinator) + .setAllocator(allocator)); } - public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator, boolean isDirect) { - // if isDirect is true, the client will connect directly to the drillbit instead of - // going thru the zookeeper - this.isDirectConnection = isDirect; - this.ownsZkConnection = coordinator == null && !isDirect; - this.ownsAllocator = allocator == null; - this.allocator = ownsAllocator ? RootAllocatorFactory.newRoot(config) : allocator; - this.config = config; - this.clusterCoordinator = coordinator; + /** + * @deprecated Create a DrillClient using {@link DrillClient.Builder}. + */ + @Deprecated + public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator, + boolean isDirect) { + this(newBuilder() + .setConfig(config) + .setClusterCoordinator(coordinator) + .setAllocator(allocator) + .setDirectConnection(isDirect)); + } + + // used by DrillClient.Builder + private DrillClient(final Builder builder) { + this.config = builder.config != null ? builder.config : DrillConfig.create(); + + this.ownsAllocator = builder.allocator == null; + this.allocator = !ownsAllocator ? builder.allocator : RootAllocatorFactory.newRoot(config); + + this.isDirectConnection = builder.isDirectConnection; + this.ownsZkConnection = builder.clusterCoordinator == null && !isDirectConnection; + this.clusterCoordinator = builder.clusterCoordinator; // could be null + + this.ownsEventLoopGroup = builder.eventLoopGroup == null; + this.eventLoopGroup = !ownsEventLoopGroup ? builder.eventLoopGroup : + TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.CLIENT_RPC_THREADS), "Client-"); + + this.ownsExecutor = builder.executor == null; + this.executor = !ownsExecutor ? builder.executor : + new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, + new SynchronousQueue(), + new NamedThreadFactory("drill-client-executor-")) { + @Override + protected void afterExecute(final Runnable r, final Throwable t) { + if (t != null) { + logger.error(String.format("%s.run() leaked an exception.", r.getClass().getName()), t); + } + super.afterExecute(r, t); + } + }; + + this.supportComplexTypes = builder.supportComplexTypes; + + // These are currently not exposed to the users of this class through {@link DrillClient.Builder}. + // Reconnection is only done when a cluster coordinator is used (and not when a direct connection is + // made), and these are used only in reconnection. this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES); this.reconnectDelay = config.getInt(ExecConstants.BIT_RETRY_DELAY); - this.supportComplexTypes = config.getBoolean(ExecConstants.CLIENT_SUPPORT_COMPLEX_TYPES); } public DrillConfig getConfig() { @@ -179,10 +273,13 @@ public void setAutoRead(boolean enableAutoRead) { * Default is {@code true}. If set to {@code false}, the complex types are returned as JSON encoded VARCHAR type. * * @throws IllegalStateException if called after a connection has been established. + * + * @deprecated use {@link Builder#setSupportsComplexTypes} while building the client. */ + @Deprecated public void setSupportComplexTypes(boolean supportComplexTypes) { if (connected) { - throw new IllegalStateException("Attempted to modify client connection property after connection has been established."); + throw new IllegalStateException("Attempted to modify a property after connection has been established."); } this.supportComplexTypes = supportComplexTypes; } @@ -200,7 +297,7 @@ public void connect(Properties props) throws RpcException { connect(null, props); } - public synchronized void connect(String connect, Properties props) throws RpcException { + public void connect(String connect, Properties props) throws RpcException { if (connected) { return; } @@ -233,30 +330,24 @@ public synchronized void connect(String connect, Properties props) throws RpcExc if (props != null) { final UserProperties.Builder upBuilder = UserProperties.newBuilder(); for (final String key : props.stringPropertyNames()) { - upBuilder.addProperties(Property.newBuilder().setKey(key).setValue(props.getProperty(key))); + upBuilder.addProperties(Property.newBuilder() + .setKey(key) + .setValue(props.getProperty(key))); } this.props = upBuilder.build(); } - eventLoopGroup = createEventLoop(config.getInt(ExecConstants.CLIENT_RPC_THREADS), "Client-"); - executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, - new SynchronousQueue(), - new NamedThreadFactory("drill-client-executor-")) { - @Override - protected void afterExecute(final Runnable r, final Throwable t) { - if (t != null) { - logger.error("{}.run() leaked an exception.", r.getClass().getName(), t); - } - super.afterExecute(r, t); - } - }; client = new UserClient(config, supportComplexTypes, allocator, eventLoopGroup, executor); logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort()); connect(endpoint); connected = true; } + /** + * @deprecated use {@link TransportCheck#createEventLoopGroup} directly. + */ + @Deprecated protected static EventLoopGroup createEventLoop(int size, String prefix) { return TransportCheck.createEventLoopGroup(size, prefix); } @@ -279,6 +370,7 @@ public synchronized boolean reconnect() { connect(endpoints.iterator().next()); return true; } catch (Exception e) { + logger.info(String.format("Trying to reconnect (#%s).", retry)); } } return false; @@ -290,6 +382,28 @@ private void connect(DrillbitEndpoint endpoint) throws RpcException { f.checkedGet(); } + /* + * Helper method to generate the UserCredentials message from the properties. + */ + private UserBitShared.UserCredentials getUserCredentials() { + // If username is not propagated as one of the properties + String userName = "anonymous"; + + if (props != null) { + for (Property property: props.getPropertiesList()) { + if (property.getKey().equalsIgnoreCase("user") && + !Strings.isNullOrEmpty(property.getValue())) { + userName = property.getValue(); + break; + } + } + } + + return UserBitShared.UserCredentials.newBuilder() + .setUserName(userName) + .build(); + } + public BufferAllocator getAllocator() { return allocator; } @@ -299,36 +413,43 @@ public BufferAllocator getAllocator() { */ @Override public void close() { - if (this.client != null) { - this.client.close(); - } - if (this.ownsAllocator && allocator != null) { - DrillAutoCloseables.closeNoChecked(allocator); - } - if (ownsZkConnection) { - if (clusterCoordinator != null) { - try { - clusterCoordinator.close(); - clusterCoordinator = null; - } catch (Exception e) { - logger.warn("Error while closing Cluster Coordinator.", e); - } - } - } - if (eventLoopGroup != null) { - eventLoopGroup.shutdownGracefully(); + try { + AutoCloseables.close(client, + ownsAllocator ? allocator : null, + ownsZkConnection ? clusterCoordinator : null); + } catch (Exception e) { + logger.error("Error(s) encountered while closing client.", e); } - if (executor != null) { - executor.shutdownNow(); + if (ownsEventLoopGroup) { + TransportCheck.shutDownEventLoopGroup(eventLoopGroup, "Client-", logger); + } + if (ownsExecutor) { + MoreExecutors.shutdownAndAwaitTermination(executor, 1, TimeUnit.SECONDS); } - // TODO: Did DRILL-1735 changes cover this TODO?: - // TODO: fix tests that fail when this is called. - //allocator.close(); connected = false; } + + /** + * Submits a Logical plan for direct execution (bypasses parsing) + * Runs the given plan of the given {@link QueryType type}. The {@link UserResultsListener listener} + * is notified if results arrive, query completed, etc. + * + * @param type query type + * @param plan query plan + * @param resultsListener results listener + */ + public void runQuery(QueryType type, String plan, UserResultsListener resultsListener) { + client.submitQuery(resultsListener, + RunQuery.newBuilder() + .setResultsMode(QueryResultsMode.STREAM_FULL) + .setType(type) + .setPlan(plan) + .build()); + } + /** * Submits a string based query plan for execution and returns the result batches. Supported query types are: *