Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;

/**
* This class is used for exporting current state of load on a RegionServer.
*/
Expand Down Expand Up @@ -112,4 +114,6 @@ default String getVersion() {
* rounded to MB
*/
Map<String, Integer> getRegionCachedInfo();

List<ClusterStatusProtos.ClientConnectionInfo> getClientConnectionsInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public static ServerMetrics toServerMetrics(ServerName serverName, int versionNu
.setRegionCachedInfo(serverLoadPB.getRegionCachedInfoMap())
.setReportTimestamp(serverLoadPB.getReportEndTime())
.setLastReportTimestamp(serverLoadPB.getReportStartTime()).setVersionNumber(versionNumber)
.setVersion(version).build();
.setClientConnectionInfos(serverLoadPB.getClientConnectionInfosList()).setVersion(version)
.build();
}

public static List<HBaseProtos.Coprocessor> toCoprocessor(Collection<String> names) {
Expand Down Expand Up @@ -145,6 +146,8 @@ public static ServerMetricsBuilder newBuilder(ServerName sn) {
private long lastReportTimestamp = 0;
private final List<ServerTask> tasks = new ArrayList<>();
private Map<String, Integer> regionCachedInfo = new HashMap<>();
private final List<ClusterStatusProtos.ClientConnectionInfo> clientConnectionInfos =
new ArrayList<>();

private ServerMetricsBuilder(ServerName serverName) {
this.serverName = serverName;
Expand Down Expand Up @@ -240,11 +243,17 @@ public ServerMetricsBuilder setRegionCachedInfo(Map<String, Integer> value) {
return this;
}

public ServerMetricsBuilder
setClientConnectionInfos(List<ClusterStatusProtos.ClientConnectionInfo> value) {
clientConnectionInfos.addAll(value);
return this;
}

public ServerMetrics build() {
return new ServerMetricsImpl(serverName, versionNumber, version, requestCountPerSecond,
requestCount, readRequestCount, writeRequestCount, usedHeapSize, maxHeapSize, infoServerPort,
sources, sink, regionStatus, coprocessorNames, reportTimestamp, lastReportTimestamp,
userMetrics, tasks, regionCachedInfo);
userMetrics, tasks, regionCachedInfo, clientConnectionInfos);
}

private static class ServerMetricsImpl implements ServerMetrics {
Expand All @@ -268,14 +277,16 @@ private static class ServerMetricsImpl implements ServerMetrics {
private final Map<byte[], UserMetrics> userMetrics;
private final List<ServerTask> tasks;
private final Map<String, Integer> regionCachedInfo;
private final List<ClusterStatusProtos.ClientConnectionInfo> clientConnectionInfos;

ServerMetricsImpl(ServerName serverName, int versionNumber, String version,
long requestCountPerSecond, long requestCount, long readRequestsCount,
long writeRequestsCount, Size usedHeapSize, Size maxHeapSize, int infoServerPort,
List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, long reportTimestamp,
long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics, List<ServerTask> tasks,
Map<String, Integer> regionCachedInfo) {
Map<String, Integer> regionCachedInfo,
List<ClusterStatusProtos.ClientConnectionInfo> clientConnectionInfos) {
this.serverName = Preconditions.checkNotNull(serverName);
this.versionNumber = versionNumber;
this.version = version;
Expand All @@ -295,6 +306,7 @@ private static class ServerMetricsImpl implements ServerMetrics {
this.lastReportTimestamp = lastReportTimestamp;
this.tasks = tasks;
this.regionCachedInfo = regionCachedInfo;
this.clientConnectionInfos = clientConnectionInfos;
}

@Override
Expand Down Expand Up @@ -402,6 +414,11 @@ public Map<String, Integer> getRegionCachedInfo() {
return Collections.unmodifiableMap(regionCachedInfo);
}

@Override
public List<ClusterStatusProtos.ClientConnectionInfo> getClientConnectionsInfo() {
return Collections.unmodifiableList(clientConnectionInfos);
}

@Override
public String toString() {
int storeCount = 0;
Expand Down
10 changes: 10 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ message ServerTask {
}
}

message ClientConnectionInfo {
optional string host_address = 1;
optional string user_name = 2;
optional string client_version = 3;
optional string service_name = 4;
optional string port = 5;
}

message ServerLoad {
/** Number of requests since last report. */
optional uint64 number_of_requests = 1;
Expand Down Expand Up @@ -326,6 +334,8 @@ message ServerLoad {
* The metrics for region cached on this region server
*/
map<string, uint32> regionCachedInfo = 16;

repeated ClientConnectionInfo clientConnectionInfos = 17;
}

message LiveServerInfo {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Holds information about a client connection including IP address, username, client version, and
* service name.
*/
@InterfaceAudience.Private
public class ClientConnectionInfo {
private final String hostAddress;
private final int port;
private final String userName;
private final String clientVersion;
private final String serviceName;

private ClientConnectionInfo(Builder builder) {
this.hostAddress = builder.hostAddress;
this.port = builder.port;
this.userName = builder.userName;
this.clientVersion = builder.clientVersion;
this.serviceName = builder.serviceName;
}

public String getHostAddress() {
return hostAddress;
}

public String getUserName() {
return userName;
}

public String getClientVersion() {
return clientVersion;
}

public String getServiceName() {
return serviceName;
}

public String getClientId() {
return hostAddress + ":" + port;
}

public String getClientPort() {
return String.valueOf(port);
}

@Override
public String toString() {
return "ClientConnectionInfo{" + "hostAddress='" + hostAddress + '\'' + ", port='" + port + '\''
+ ", userName='" + userName + '\'' + ", clientVersion='" + clientVersion + '\''
+ ", serviceName='" + serviceName + '\'' + '}';
}

public static class Builder {
private String hostAddress;
private int port;
private String userName;
private String clientVersion;
private String serviceName;

public Builder hostAddress(String hostAddress) {
this.hostAddress = hostAddress;
return this;
}

public Builder port(int port) {
this.port = port;
return this;
}

public Builder userName(String userName) {
this.userName = userName;
return this;
}

public Builder clientVersion(String clientVersion) {
this.clientVersion = clientVersion;
return this;
}

public Builder serviceName(String serviceName) {
this.serviceName = serviceName;
return this;
}

public ClientConnectionInfo build() {
return new ClientConnectionInfo(this);
}
}

public static Builder newBuilder() {
return new Builder();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Registry for tracking active client connections to the RPC server. Maintains a thread-safe map of
* client connection information.
*/
@InterfaceAudience.Private
public class ClientConnectionRegistry {

private final ConcurrentHashMap<String, ClientConnectionInfo> clientConnections;

public ClientConnectionRegistry() {
this.clientConnections = new ConcurrentHashMap<>();
}

public void registerClientConnection(ClientConnectionInfo connectionInfo) {
clientConnections.put(connectionInfo.getClientId(), connectionInfo);
}

public void unregisterClientConnection(String clientId) {
clientConnections.remove(clientId);
}

public Collection<ClientConnectionInfo> getClientConnections() {
return clientConnections.values();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ public class NettyRpcServer extends RpcServer {
private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>();
private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>();

private final ClientConnectionRegistry clientConnectionRegistry;

private volatile int writeBufferFatalThreshold;
private volatile WriteBufferWaterMark writeBufferWaterMark;

Expand All @@ -141,6 +143,7 @@ public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterfa
super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
this.bindAddress = bindAddress;
this.channelAllocator = getChannelAllocator(conf);
this.clientConnectionRegistry = new ClientConnectionRegistry();
// Get the event loop group configuration from the server class if available.
NettyEventLoopGroupConfig config = null;
if (server instanceof HBaseServerBase) {
Expand Down Expand Up @@ -486,4 +489,9 @@ public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() {
}
return Pair.newPair(total, max);
}

@Override
public ClientConnectionRegistry getClientConnectionRegistry() {
return clientConnectionRegistry;
}
}
Loading