Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;


public class StarRocksStreamLoadVisitor {

private static final Logger LOG = LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class);
Expand Down Expand Up @@ -104,16 +104,35 @@ private String getAvailableHost() {
List<String> hostList = writerOptions.getLoadUrlList();
long tmp = pos + hostList.size();
for (; pos < tmp; pos++) {
String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString();
String rawHost = hostList.get((int) (pos % hostList.size()));
String host = normalizeHostScheme(rawHost);
if (tryHttpConnection(host)) {
return host;
}
}
return null;
}

private String normalizeHostScheme(String host) {
if (host == null) {
return null;
}
String trimmed = host.trim();
if (trimmed.isEmpty()) {
return trimmed;
}
if (hasScheme(trimmed)) {
return trimmed;
}
return "http://" + trimmed;
}

private boolean hasScheme(String host) {
return host.matches("^[a-zA-Z][a-zA-Z0-9+\\-.]*://.*");
}

private boolean tryHttpConnection(String host) {
try {
try {
URL url = new URL(host);
HttpURLConnection co = (HttpURLConnection) url.openConnection();
co.setConnectTimeout(1000);
Expand All @@ -137,7 +156,7 @@ private byte[] joinRows(List<byte[]> rows, int totalBytes) {
}
return bos.array();
}

if (StarRocksWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
bos.put("[".getBytes(StandardCharsets.UTF_8));
Expand Down Expand Up @@ -274,7 +293,7 @@ private HttpEntity getHttpEntity(CloseableHttpResponse resp) {
}
return respEntity;
}

private String doHttpGet(String getUrl) throws IOException {
LOG.info("Executing GET from {}.", getUrl);
try (CloseableHttpClient httpclient = buildHttpClient()) {
Expand Down