Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distribution/offloaders/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>tiered-storage-jcloud</artifactId>
<artifactId>tiered-storage-opendal</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>provided</scope>
Expand Down
11 changes: 11 additions & 0 deletions distribution/offloaders/src/assemble/README
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,14 @@ contains:

* META-INF/DEPEDENCIES file with licensing information for all transitive
dependencies

Included offloaders:

* tiered-storage-opendal-<version>.nar (cloud offloaders: aws-s3/S3/aliyun-oss/google-cloud-storage/azureblob)
* tiered-storage-file-system-<version>.nar (filesystem offloader)

Migration / rollback guidance (to avoid offloader discovery conflicts):

* Migration: ensure ${PULSAR_HOME}/offloaders contains only the OpenDAL NAR
(tiered-storage-opendal-*.nar). Do not keep tiered-storage-jcloud-*.nar alongside it.
* Rollback: remove tiered-storage-opendal-*.nar and restore tiered-storage-jcloud-*.nar.
2 changes: 1 addition & 1 deletion distribution/offloaders/src/assemble/offloaders.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
</file>

<file>
<source>${basedir}/../../tiered-storage/jcloud/target/tiered-storage-jcloud-${project.version}.nar</source>
<source>${basedir}/../../tiered-storage/opendal/target/tiered-storage-opendal-${project.version}.nar</source>
<outputDirectory>offloaders</outputDirectory>
<fileMode>644</fileMode>
</file>
Expand Down
8 changes: 4 additions & 4 deletions docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ FROM apachepulsar/pulsar:${VERSION}
# Add the cassandra connector (also works with ScyllaDB)
COPY --from=pulsar-all /pulsar/connectors/pulsar-io-cassandra-*.nar /pulsar/connectors

# Add the jcloud offloader
COPY --from=pulsar-all /pulsar/connectors/tiered-storage-jcloud-*.nar /pulsar/offloaders
# Add the OpenDAL offloader
COPY --from=pulsar-all /pulsar/connectors/tiered-storage-opendal-*.nar /pulsar/offloaders
```

NOTE: the above example uses a wildcard in the `COPY` commands because argument expansion does not work for `COPY`.

Assuming that you have the above `Dockerfile` in your local directory and are running docker on your local host, you can
run the following command to build a custom image with the cassandra connector and the jcloud offloader. The cassandra connector is compatible with both Apache Cassandra and ScyllaDB.
run the following command to build a custom image with the cassandra connector and the OpenDAL offloader. The cassandra connector is compatible with both Apache Cassandra and ScyllaDB.

```shell
docker build --build-arg VERSION=2.9.1 -t pulsar-custom:2.9.1 .
Expand Down Expand Up @@ -106,4 +106,4 @@ argument, you can run as the root user.
If you're running your container on kubernetes, you can override the container's default user by setting the pod's
`securityContext`.

Bitnami provides a helpful guide here: https://engineering.bitnami.com/articles/running-non-root-containers-on-openshift.html.
Bitnami provides a helpful guide here: https://engineering.bitnami.com/articles/running-non-root-containers-on-openshift.html.
5 changes: 5 additions & 0 deletions pulsar-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,11 @@
<artifactId>tiered-storage-file-system</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>tiered-storage-opendal</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>tiered-storage-jcloud</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.containers;

import lombok.extern.slf4j.Slf4j;

/**
* Azure Blob Storage emulator container (Azurite).
*/
@Slf4j
public class AzuriteContainer extends ChaosContainer<AzuriteContainer> {

public static final String NAME = "azurite";
public static final int BLOB_PORT = 10000;
private static final String IMAGE_NAME = "mcr.microsoft.com/azure-storage/azurite:latest";

private final String hostname;

public AzuriteContainer(String clusterName, String hostname, String accountName, String accountKey) {
super(clusterName, IMAGE_NAME);
this.hostname = hostname;
// `AZURITE_ACCOUNTS` format: "<accountName>:<base64Key>[;<accountName2>:<base64Key2>...]"
this.withEnv("AZURITE_ACCOUNTS", accountName + ":" + accountKey);
this.withExposedPorts(BLOB_PORT);
}

@Override
public String getContainerName() {
return clusterName + "-" + hostname;
}

@Override
public void start() {
this.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(hostname);
createContainerCmd.withName(getContainerName());
});

super.start();
log.info("Start Azurite service");
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.containers;

import lombok.extern.slf4j.Slf4j;

/**
* Google Cloud Storage emulator container.
*/
@Slf4j
public class GcsContainer extends ChaosContainer<GcsContainer> {

public static final String NAME = "gcs";
public static final int PORT = 4443;
private static final String IMAGE_NAME = "fsouza/fake-gcs-server:latest";

private final String hostname;

public GcsContainer(String clusterName, String hostname) {
super(clusterName, IMAGE_NAME);
this.hostname = hostname;
this.withExposedPorts(PORT);
// Use HTTP to avoid TLS certificate issues inside the Pulsar broker container.
this.withCommand("-scheme", "http", "-backend", "memory");
}

@Override
public String getContainerName() {
return clusterName + "-" + hostname;
}

@Override
public void start() {
this.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(hostname);
createContainerCmd.withName(getContainerName());
});

super.start();
log.info("Start GCS emulator service");
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.offload;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.containers.AzuriteContainer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

@Slf4j
public class TestAzureBlobOffload extends TestBaseOffload {

private static final String STORAGE_ACCOUNT = "pulsar";
private static final String STORAGE_KEY_BASE64 = "cHVsc2FyLXRlc3Qta2V5LXB1bHNhci10ZXN0LWtleS0xMjM0";
private static final String CONTAINER = "pulsar-integtest";
private static final String AZURE_API_VERSION = "2020-10-02";

private AzuriteContainer azuriteContainer;

@Override
protected void beforeStartCluster() throws Exception {
super.beforeStartCluster();

log.info("Azurite init");
azuriteContainer = new AzuriteContainer(pulsarCluster.getClusterName(), AzuriteContainer.NAME,
STORAGE_ACCOUNT, STORAGE_KEY_BASE64)
.withNetwork(pulsarCluster.getNetwork())
.withNetworkAliases(AzuriteContainer.NAME);
azuriteContainer.start();
createContainerIfMissing();
log.info("Azurite start finish.");
}

@AfterClass(alwaysRun = true)
public void teardownAzurite() {
if (azuriteContainer != null) {
azuriteContainer.stop();
}
}

@Test(dataProvider = "ServiceAndAdminUrls")
public void testPublishOffloadAndConsumeViaCLI(Supplier<String> serviceUrl, Supplier<String> adminUrl)
throws Exception {
super.testPublishOffloadAndConsumeViaCLI(serviceUrl.get(), adminUrl.get());
}

@Test(dataProvider = "ServiceAndAdminUrls")
public void testPublishOffloadAndConsumeViaThreshold(Supplier<String> serviceUrl, Supplier<String> adminUrl)
throws Exception {
super.testPublishOffloadAndConsumeViaThreshold(serviceUrl.get(), adminUrl.get());
}

@Test(dataProvider = "ServiceAndAdminUrls")
public void testPublishOffloadAndConsumeDeletionLag(Supplier<String> serviceUrl, Supplier<String> adminUrl)
throws Exception {
super.testPublishOffloadAndConsumeDeletionLag(serviceUrl.get(), adminUrl.get());
}

@Override
protected Map<String, String> getEnv() {
Map<String, String> result = new HashMap<>();
result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger()));
result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
result.put("managedLedgerOffloadDriver", "azureblob");

// OpenDAL uses `managedLedgerOffloadBucket` as the container name for azblob.
result.put("managedLedgerOffloadBucket", CONTAINER);
// Azurite uses path-style endpoint: http://host:port/<accountName>
result.put("managedLedgerOffloadServiceEndpoint",
"http://" + AzuriteContainer.NAME + ":" + AzuriteContainer.BLOB_PORT + "/" + STORAGE_ACCOUNT);

// Keep compatibility with tiered-storage-jcloud behavior (env based credentials).
result.put("AZURE_STORAGE_ACCOUNT", STORAGE_ACCOUNT);
result.put("AZURE_STORAGE_ACCESS_KEY", STORAGE_KEY_BASE64);

return result;
}

private void createContainerIfMissing() throws Exception {
String host = azuriteContainer.getHost();
int port = azuriteContainer.getMappedPort(AzuriteContainer.BLOB_PORT);

String date = DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC));
String canonicalizedHeaders = "x-ms-date:" + date + "\n"
+ "x-ms-version:" + AZURE_API_VERSION + "\n";
String canonicalizedResource = "/" + STORAGE_ACCOUNT + "/" + CONTAINER + "\nrestype:container";

// See https://learn.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key
String stringToSign = "PUT\n" // VERB
+ "\n" // Content-Encoding
+ "\n" // Content-Language
+ "0\n" // Content-Length
+ "\n" // Content-MD5
+ "\n" // Content-Type
+ "\n" // Date (empty because x-ms-date is used)
+ "\n" // If-Modified-Since
+ "\n" // If-Match
+ "\n" // If-None-Match
+ "\n" // If-Unmodified-Since
+ "\n" // Range
+ canonicalizedHeaders
+ canonicalizedResource;

String signature = buildAzureSignature(STORAGE_KEY_BASE64, stringToSign);
String authorization = "SharedKey " + STORAGE_ACCOUNT + ":" + signature;

String uri = "http://" + host + ":" + port + "/" + STORAGE_ACCOUNT + "/" + CONTAINER + "?restype=container";
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(uri))
.header("x-ms-date", date)
.header("x-ms-version", AZURE_API_VERSION)
.header("Authorization", authorization)
// .header("Content-Length", "0")
.PUT(HttpRequest.BodyPublishers.ofByteArray(new byte[0]))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());

int code = response.statusCode();
if (code != 201 && code != 202 && code != 409) {
throw new RuntimeException("Failed to create Azurite container. status=" + code
+ ", body=" + response.body());
}
}

private static String buildAzureSignature(String base64Key, String stringToSign)
throws NoSuchAlgorithmException, InvalidKeyException {
byte[] key = Base64.getDecoder().decode(base64Key);
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(new SecretKeySpec(key, "HmacSHA256"));
byte[] hmac = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8));
return Base64.getEncoder().encodeToString(hmac);
}
}

Loading
Loading