entryOffsetsCache;
+
+ public OffsetsCache() {
+ if (CACHE_MAX_SIZE > 0) {
+ entryOffsetsCache = CacheBuilder
+ .newBuilder()
+ .expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS)
+ .maximumSize(CACHE_MAX_SIZE)
+ .build();
+ cacheEvictionExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("jcloud-offsets-cache-eviction").build());
+ int period = Math.max(CACHE_TTL_SECONDS / 2, 1);
+ cacheEvictionExecutor.scheduleAtFixedRate(() -> {
+ entryOffsetsCache.cleanUp();
+ }, period, period, TimeUnit.SECONDS);
+ } else {
+ cacheEvictionExecutor = null;
+ entryOffsetsCache = null;
+ }
+ }
+
+ public void put(long ledgerId, long entryId, long currentPosition) {
+ if (entryOffsetsCache != null) {
+ entryOffsetsCache.put(new Key(ledgerId, entryId), currentPosition);
+ }
+ }
+
+ public Long getIfPresent(long ledgerId, long entryId) {
+ return entryOffsetsCache != null ? entryOffsetsCache.getIfPresent(new Key(ledgerId, entryId)) : null;
+ }
+
+ public void clear() {
+ if (entryOffsetsCache != null) {
+ entryOffsetsCache.invalidateAll();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (cacheEvictionExecutor != null) {
+ cacheEvictionExecutor.shutdownNow();
+ }
+ }
+}
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingDataBlockHeaderImpl.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingDataBlockHeaderImpl.java
new file mode 100644
index 0000000000000..3ce1e1fecf025
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingDataBlockHeaderImpl.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.io.CountingInputStream;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+
+/**
+ * The data block header in tiered storage for each data block.
+ */
+public class StreamingDataBlockHeaderImpl implements DataBlockHeader {
+ // Magic Word for streaming data block.
+ // It is a sequence of bytes used to identify the start of a block.
+ static final int MAGIC_WORD = 0x26A66D32;
+ // This is bigger than header size. Leaving some place for alignment and future enhancement.
+ // Payload use this as the start offset.
+ public static final int HEADER_MAX_SIZE = 128;
+ private static final int HEADER_BYTES_USED = 4 /* magic */
+ + 8 /* header len */
+ + 8 /* block len */
+ + 8 /* first entry id */
+ + 8 /* ledger id */;
+ private static final byte[] PADDING = new byte[HEADER_MAX_SIZE - HEADER_BYTES_USED];
+
+ public long getLedgerId() {
+ return ledgerId;
+ }
+
+ private final long ledgerId;
+
+ public static StreamingDataBlockHeaderImpl of(int blockLength, long ledgerId, long firstEntryId) {
+ return new StreamingDataBlockHeaderImpl(HEADER_MAX_SIZE, blockLength, ledgerId, firstEntryId);
+ }
+
+ private final long headerLength;
+ private final long blockLength;
+ private final long firstEntryId;
+
+ public static int getBlockMagicWord() {
+ return MAGIC_WORD;
+ }
+
+ public static int getDataStartOffset() {
+ return HEADER_MAX_SIZE;
+ }
+
+ @Override
+ public long getBlockLength() {
+ return this.blockLength;
+ }
+
+ @Override
+ public long getHeaderLength() {
+ return this.headerLength;
+ }
+
+ @Override
+ public long getFirstEntryId() {
+ return this.firstEntryId;
+ }
+
+ public StreamingDataBlockHeaderImpl(long headerLength, long blockLength, long ledgerId, long firstEntryId) {
+ this.headerLength = headerLength;
+ this.blockLength = blockLength;
+ this.firstEntryId = firstEntryId;
+ this.ledgerId = ledgerId;
+ }
+
+ // Construct DataBlockHeader from InputStream, which contains `HEADER_MAX_SIZE` bytes readable.
+ public static StreamingDataBlockHeaderImpl fromStream(InputStream stream) throws IOException {
+ CountingInputStream countingStream = new CountingInputStream(stream);
+ DataInputStream dis = new DataInputStream(countingStream);
+ int magic = dis.readInt();
+ if (magic != MAGIC_WORD) {
+ throw new IOException("Data block header magic word not match. read: " + magic
+ + " expected: " + MAGIC_WORD);
+ }
+
+ long headerLen = dis.readLong();
+ long blockLen = dis.readLong();
+ long firstEntryId = dis.readLong();
+ long ledgerId = dis.readLong();
+ long toSkip = headerLen - countingStream.getCount();
+ if (dis.skip(toSkip) != toSkip) {
+ throw new EOFException("Header was too small");
+ }
+
+ return new StreamingDataBlockHeaderImpl(headerLen, blockLen, ledgerId, firstEntryId);
+ }
+
+ /**
+ * Get the content of the data block header as InputStream.
+ * Read out in format:
+ * [ magic_word -- int ][ block_len -- int ][ first_entry_id -- long] [padding zeros]
+ */
+ @Override
+ public InputStream toStream() {
+ ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
+ out.writeInt(MAGIC_WORD)
+ .writeLong(headerLength)
+ .writeLong(blockLength)
+ .writeLong(firstEntryId)
+ .writeLong(ledgerId)
+ .writeBytes(PADDING);
+
+ // true means the input stream will release the ByteBuf on close
+ return new ByteBufInputStream(out, true);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("StreamingDataBlockHeader(len:%d,hlen:%d,firstEntry:%d,ledger:%d)",
+ blockLength, headerLength, firstEntryId, ledgerId);
+ }
+}
+
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/package-info.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/package-info.java
new file mode 100644
index 0000000000000..346fe3262b163
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/package-info.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/package-info.java
new file mode 100644
index 0000000000000..4daf3fff26f78
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud;
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/OpenDALLedgerOffloaderFactory.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/OpenDALLedgerOffloaderFactory.java
new file mode 100644
index 0000000000000..a89352d754a7b
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/OpenDALLedgerOffloaderFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStatsDisable;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffsetsCache;
+import org.apache.bookkeeper.mledger.offload.opendal.impl.OpenDALManagedLedgerOffloader;
+import org.apache.bookkeeper.mledger.offload.opendal.provider.OpenDALOperatorProvider;
+import org.apache.bookkeeper.mledger.offload.opendal.provider.OpenDALTieredStorageConfiguration;
+import org.apache.bookkeeper.mledger.offload.opendal.provider.OperatorCache;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+
+/**
+ * OpenDAL based offloader factory.
+ *
+ * This module is introduced as a skeleton first, to make sure the NAR can be discovered and loaded.
+ * The actual offloader implementation will be provided in follow-up PRs.
+ */
+public class OpenDALLedgerOffloaderFactory implements LedgerOffloaderFactory {
+
+ // Keep aligned with the existing driver names so that broker configs don't need to change.
+ private static final Set SUPPORTED_DRIVERS = Set.of(
+ "aws-s3",
+ "S3",
+ "aliyun-oss",
+ "google-cloud-storage",
+ "azureblob",
+ "transient"
+ );
+
+ private final OffsetsCache entryOffsetsCache = new OffsetsCache();
+ private final OperatorCache operatorCache = new OperatorCache();
+
+ @Override
+ public boolean isDriverSupported(String driverName) {
+ return driverName != null && SUPPORTED_DRIVERS.stream().anyMatch(d -> d.equalsIgnoreCase(driverName));
+ }
+
+ @Override
+ public OpenDALManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies,
+ Map userMetadata,
+ OrderedScheduler scheduler) throws IOException {
+ return create(offloadPolicies, userMetadata, scheduler, LedgerOffloaderStatsDisable.INSTANCE);
+ }
+
+ @Override
+ public OpenDALManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies,
+ Map userMetadata,
+ OrderedScheduler scheduler,
+ LedgerOffloaderStats offloaderStats) throws IOException {
+ return create(offloadPolicies, userMetadata, scheduler, scheduler, offloaderStats);
+ }
+
+ @Override
+ public OpenDALManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies,
+ Map userMetadata,
+ OrderedScheduler scheduler,
+ OrderedScheduler readExecutor,
+ LedgerOffloaderStats offloaderStats) throws IOException {
+ OpenDALTieredStorageConfiguration config =
+ OpenDALTieredStorageConfiguration.create(offloadPolicies.toProperties());
+ OpenDALOperatorProvider operatorProvider = new OpenDALOperatorProvider(config, operatorCache);
+ return OpenDALManagedLedgerOffloader.create(config, userMetadata, scheduler, readExecutor, offloaderStats,
+ entryOffsetsCache, operatorProvider);
+ }
+
+ @Override
+ public void close() throws Exception {
+ operatorCache.close();
+ entryOffsetsCache.close();
+ }
+}
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/DataBlockUtils.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/DataBlockUtils.java
new file mode 100644
index 0000000000000..8e19f36c6d119
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/DataBlockUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Format and naming utilities that must remain compatible with the existing tiered-storage-jcloud implementation.
+ */
+public final class DataBlockUtils {
+
+ /**
+ * Keep the same metadata key used by tiered-storage-jcloud.
+ *
+ * Note: Some backends normalize user-metadata keys; callers should treat it as case-insensitive.
+ */
+ public static final String METADATA_FORMAT_VERSION_KEY = "S3ManagedLedgerOffloaderFormatVersion";
+
+ /**
+ * Keep the same format version written by tiered-storage-jcloud.
+ */
+ static final String CURRENT_VERSION = String.valueOf(1);
+
+ private DataBlockUtils() {
+ }
+
+ public static String dataBlockOffloadKey(long ledgerId, UUID uuid) {
+ return String.format("%s-ledger-%d", uuid.toString(), ledgerId);
+ }
+
+ public static String indexBlockOffloadKey(long ledgerId, UUID uuid) {
+ return String.format("%s-ledger-%d-index", uuid.toString(), ledgerId);
+ }
+
+ public static String indexBlockOffloadKey(UUID uuid) {
+ return String.format("%s-index", uuid.toString());
+ }
+
+ public static Map withVersionInfo(Map userMetadata) {
+ Map metadata = new HashMap<>();
+ if (userMetadata != null && !userMetadata.isEmpty()) {
+ metadata.putAll(userMetadata);
+ }
+ // Follow tiered-storage-jcloud behavior: write the version key using lower-case.
+ metadata.put(METADATA_FORMAT_VERSION_KEY.toLowerCase(), CURRENT_VERSION);
+ return metadata;
+ }
+
+ public static Long parseLedgerId(String name) {
+ if (name == null || name.isEmpty()) {
+ return null;
+ }
+ if (name.endsWith("-index")) {
+ name = name.substring(0, name.length() - "-index".length());
+ }
+ int pos = name.indexOf("-ledger-");
+ if (pos < 0) {
+ return null;
+ }
+ try {
+ return Long.parseLong(name.substring(pos + 8));
+ } catch (NumberFormatException err) {
+ return null;
+ }
+ }
+
+ public static String parseContextUuid(String name, Long ledgerId) {
+ if (ledgerId == null || name == null) {
+ return null;
+ }
+ int pos = name.indexOf("-ledger-" + ledgerId);
+ if (pos <= 0) {
+ return null;
+ }
+ return name.substring(0, pos);
+ }
+}
+
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALBackedInputStream.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALBackedInputStream.java
new file mode 100644
index 0000000000000..d4d2abd833f40
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALBackedInputStream.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.impl;
+
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
+import org.apache.bookkeeper.mledger.offload.opendal.storage.OpenDALStorage;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+class OpenDALBackedInputStream extends BackedInputStream {
+
+ private final OpenDALStorage storage;
+ private final String key;
+ private final ByteBuf buffer;
+ private final long objectLen;
+ private final int bufferSize;
+ private final LedgerOffloaderStats offloaderStats;
+ private final String topicName;
+
+ private long cursor;
+ private long bufferOffsetStart;
+ private long bufferOffsetEnd;
+
+ OpenDALBackedInputStream(OpenDALStorage storage,
+ String key,
+ long objectLen,
+ int bufferSize,
+ LedgerOffloaderStats offloaderStats,
+ String managedLedgerName) {
+ this.storage = storage;
+ this.key = key;
+ this.buffer = PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
+ this.objectLen = objectLen;
+ this.bufferSize = bufferSize;
+ this.offloaderStats = offloaderStats;
+ this.topicName = managedLedgerName != null ? TopicName.fromPersistenceNamingEncoding(managedLedgerName) : null;
+ this.cursor = 0;
+ this.bufferOffsetStart = this.bufferOffsetEnd = -1;
+ }
+
+ private boolean refillBufferIfNeeded() throws IOException {
+ if (buffer.readableBytes() != 0) {
+ return true;
+ }
+ if (cursor >= objectLen) {
+ return false;
+ }
+ long startRange = cursor;
+ long endRange = Math.min(cursor + bufferSize - 1, objectLen - 1);
+ long startReadTime = System.nanoTime();
+ try (InputStream stream = storage.readRange(key, startRange, endRange)) {
+ buffer.clear();
+ bufferOffsetStart = startRange;
+ bufferOffsetEnd = endRange;
+ int bytesToCopy = (int) (endRange - startRange + 1);
+ fillBuffer(stream, bytesToCopy);
+ cursor += buffer.readableBytes();
+ } catch (Throwable t) {
+ if (offloaderStats != null && topicName != null) {
+ offloaderStats.recordReadOffloadError(topicName);
+ }
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ }
+ throw new IOException("Error reading from OpenDAL", t);
+ } finally {
+ if (offloaderStats != null && topicName != null) {
+ offloaderStats.recordReadOffloadDataLatency(topicName,
+ System.nanoTime() - startReadTime, TimeUnit.NANOSECONDS);
+ offloaderStats.recordReadOffloadBytes(topicName, endRange - startRange + 1);
+ }
+ }
+ return true;
+ }
+
+ private void fillBuffer(InputStream is, int bytesToCopy) throws IOException {
+ while (bytesToCopy > 0) {
+ int writeBytes = buffer.writeBytes(is, bytesToCopy);
+ if (writeBytes < 0) {
+ break;
+ }
+ bytesToCopy -= writeBytes;
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (refillBufferIfNeeded()) {
+ return buffer.readUnsignedByte();
+ }
+ return -1;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (refillBufferIfNeeded()) {
+ int bytesToRead = Math.min(len, buffer.readableBytes());
+ buffer.readBytes(b, off, bytesToRead);
+ return bytesToRead;
+ }
+ return -1;
+ }
+
+ @Override
+ public void seek(long position) {
+ if (log.isDebugEnabled()) {
+ log.debug("Seeking to {} on {}, current position {} (bufStart:{}, bufEnd:{})",
+ position, key, cursor, bufferOffsetStart, bufferOffsetEnd);
+ }
+ if (position >= bufferOffsetStart && position <= bufferOffsetEnd) {
+ long newIndex = position - bufferOffsetStart;
+ buffer.readerIndex((int) newIndex);
+ } else {
+ bufferOffsetStart = bufferOffsetEnd = -1;
+ cursor = position;
+ buffer.clear();
+ }
+ }
+
+ @Override
+ public void seekForward(long position) throws IOException {
+ if (position >= cursor) {
+ seek(position);
+ } else {
+ throw new IOException(String.format("Error seeking, new position %d < current position %d",
+ position, cursor));
+ }
+ }
+
+ @Override
+ public long getCurrentPosition() {
+ if (bufferOffsetStart != -1) {
+ return bufferOffsetStart + buffer.readerIndex();
+ }
+ return cursor + buffer.readerIndex();
+ }
+
+ @Override
+ public void close() {
+ buffer.release();
+ }
+
+ @Override
+ public int available() {
+ long available = objectLen - cursor + buffer.readableBytes();
+ return available > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) available;
+ }
+}
+
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALBackedReadHandleImpl.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALBackedReadHandleImpl.java
new file mode 100644
index 0000000000000..c2ba6114e03e6
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALBackedReadHandleImpl.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.OffloadedLedgerHandle;
+import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffsetsCache;
+import org.apache.bookkeeper.mledger.offload.opendal.storage.OpenDALStorage;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class OpenDALBackedReadHandleImpl implements ReadHandle, OffloadedLedgerHandle {
+
+ protected static final AtomicIntegerFieldUpdater PENDING_READ_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(OpenDALBackedReadHandleImpl.class, "pendingRead");
+
+ private final long ledgerId;
+ private final OffloadIndexBlock index;
+ private final BackedInputStream inputStream;
+ private final DataInputStream dataStream;
+ private final ExecutorService executor;
+ private final OffsetsCache entryOffsetsCache;
+ private final AtomicReference> closeFuture = new AtomicReference<>();
+
+ enum State {
+ Opened,
+ Closed
+ }
+
+ private volatile State state;
+ private volatile int pendingRead;
+ private volatile long lastAccessTimestamp = System.currentTimeMillis();
+
+ @VisibleForTesting
+ OpenDALBackedReadHandleImpl(long ledgerId,
+ OffloadIndexBlock index,
+ BackedInputStream inputStream,
+ ExecutorService executor,
+ OffsetsCache entryOffsetsCache) {
+ this.ledgerId = ledgerId;
+ this.index = index;
+ this.inputStream = inputStream;
+ this.dataStream = new DataInputStream(inputStream);
+ this.executor = executor;
+ this.entryOffsetsCache = entryOffsetsCache;
+ state = State.Opened;
+ }
+
+ @Override
+ public long getId() {
+ return ledgerId;
+ }
+
+ @Override
+ public LedgerMetadata getLedgerMetadata() {
+ return index.getLedgerMetadata();
+ }
+
+ @Override
+ public CompletableFuture closeAsync() {
+ if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
+ return closeFuture.get();
+ }
+
+ CompletableFuture promise = closeFuture.get();
+ executor.execute(() -> {
+ try {
+ index.close();
+ inputStream.close();
+ state = State.Closed;
+ promise.complete(null);
+ } catch (IOException t) {
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ private class ReadTask implements Runnable {
+ private final long firstEntry;
+ private final long lastEntry;
+ private final CompletableFuture promise;
+ private int seekedAndTryTimes = 0;
+
+ ReadTask(long firstEntry, long lastEntry, CompletableFuture promise) {
+ this.firstEntry = firstEntry;
+ this.lastEntry = lastEntry;
+ this.promise = promise;
+ }
+
+ @Override
+ public void run() {
+ if (state == State.Closed) {
+ log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
+ ledgerId, firstEntry, lastEntry);
+ promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException());
+ return;
+ }
+
+ List entryCollector = new ArrayList<>();
+ try {
+ if (firstEntry > lastEntry || firstEntry < 0 || lastEntry > getLastAddConfirmed()) {
+ promise.completeExceptionally(new BKException.BKIncorrectParameterException());
+ return;
+ }
+ long entriesToRead = (lastEntry - firstEntry) + 1;
+ long expectedEntryId = firstEntry;
+ seekToEntryOffset(firstEntry);
+ seekedAndTryTimes++;
+
+ while (entriesToRead > 0) {
+ long currentPosition = inputStream.getCurrentPosition();
+ int length = dataStream.readInt();
+ if (length < 0) { // hit padding or new block
+ seekToEntryOffset(expectedEntryId);
+ continue;
+ }
+ long entryId = dataStream.readLong();
+ if (entryId == expectedEntryId) {
+ entryOffsetsCache.put(ledgerId, entryId, currentPosition);
+ ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
+ entryCollector.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
+ int toWrite = length;
+ while (toWrite > 0) {
+ toWrite -= buf.writeBytes(dataStream, toWrite);
+ }
+ entriesToRead--;
+ expectedEntryId++;
+ } else {
+ handleUnexpectedEntryId(expectedEntryId, entryId);
+ }
+ }
+ promise.complete(LedgerEntriesImpl.create(entryCollector));
+ } catch (Throwable t) {
+ log.error("Failed to read entries {} - {} from the offloader in ledger {}, current position of input"
+ + " stream is {}", firstEntry, lastEntry, ledgerId, inputStream.getCurrentPosition(), t);
+ if (t instanceof FileNotFoundException) {
+ promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsException());
+ } else {
+ promise.completeExceptionally(t);
+ }
+ entryCollector.forEach(LedgerEntry::close);
+ }
+ }
+
+ private void handleUnexpectedEntryId(long expectedId, long actEntryId) throws Exception {
+ LedgerMetadata ledgerMetadata = getLedgerMetadata();
+ OffloadIndexEntry offsetOfExpectedId = index.getIndexEntryForEntry(expectedId);
+ OffloadIndexEntry offsetOfActId = actEntryId <= getLedgerMetadata().getLastEntryId() && actEntryId >= 0
+ ? index.getIndexEntryForEntry(actEntryId) : null;
+ String logLine = String.format("Failed to read [ %s ~ %s ] of the ledger %s."
+ + " Because got a incorrect entry id %s, the offset is %s."
+ + " The expected entry id is %s, the offset is %s."
+ + " Have seeked and retry read times: %s. LAC is %s.",
+ firstEntry, lastEntry, ledgerId,
+ actEntryId, offsetOfActId == null ? "null because it does not exist"
+ : String.valueOf(offsetOfActId),
+ expectedId, String.valueOf(offsetOfExpectedId),
+ seekedAndTryTimes, ledgerMetadata != null ? ledgerMetadata.getLastEntryId() : "unknown");
+ long maxTryTimes = Math.max(3, (lastEntry - firstEntry + 1) >> 2);
+ if (seekedAndTryTimes > maxTryTimes) {
+ log.error(logLine);
+ throw new BKException.BKUnexpectedConditionException();
+ } else {
+ log.warn(logLine);
+ }
+ seekToEntryOffset(expectedId);
+ seekedAndTryTimes++;
+ }
+
+ private void skipPreviousEntry(long startEntryId, long expectedEntryId) throws IOException, BKException {
+ long nextExpectedEntryId = startEntryId;
+ while (nextExpectedEntryId < expectedEntryId) {
+ long offset = inputStream.getCurrentPosition();
+ int len = dataStream.readInt();
+ if (len < 0) {
+ LedgerMetadata ledgerMetadata = getLedgerMetadata();
+ OffloadIndexEntry offsetOfExpectedId = index.getIndexEntryForEntry(expectedEntryId);
+ log.error("Failed to read [ {} ~ {} ] of the ledger {}."
+ + " Because failed to skip a previous entry {}, len: {}, got a negative len."
+ + " The expected entry id is {}, the offset is {}."
+ + " Have seeked and retry read times: {}. LAC is {}.",
+ firstEntry, lastEntry, ledgerId,
+ nextExpectedEntryId, len,
+ expectedEntryId, String.valueOf(offsetOfExpectedId),
+ seekedAndTryTimes, ledgerMetadata != null ? ledgerMetadata.getLastEntryId() : "unknown");
+ throw new BKException.BKUnexpectedConditionException();
+ }
+ long entryId = dataStream.readLong();
+ if (entryId == nextExpectedEntryId) {
+ long skipped = inputStream.skip(len);
+ if (skipped != len) {
+ LedgerMetadata ledgerMetadata = getLedgerMetadata();
+ OffloadIndexEntry offsetOfExpectedId = index.getIndexEntryForEntry(expectedEntryId);
+ log.error("Failed to read [ {} ~ {} ] of the ledger {}."
+ + " Because failed to skip a previous entry {}, offset: {}, len: {},"
+ + " there is no more data."
+ + " The expected entry id is {}, the offset is {}."
+ + " Have seeked and retry read times: {}. LAC is {}.",
+ firstEntry, lastEntry, ledgerId,
+ entryId, offset, len,
+ expectedEntryId, String.valueOf(offsetOfExpectedId),
+ seekedAndTryTimes, ledgerMetadata != null ? ledgerMetadata.getLastEntryId()
+ : "unknown");
+ throw new BKException.BKUnexpectedConditionException();
+ }
+ nextExpectedEntryId++;
+ } else {
+ LedgerMetadata ledgerMetadata = getLedgerMetadata();
+ OffloadIndexEntry offsetOfExpectedId = index.getIndexEntryForEntry(expectedEntryId);
+ log.error("Failed to read [ {} ~ {} ] of the ledger {}."
+ + " Because got a incorrect entry id {},."
+ + " The expected entry id is {}, the offset is {}."
+ + " Have seeked and retry read times: {}. LAC is {}.",
+ firstEntry, lastEntry, ledgerId,
+ entryId, expectedEntryId, String.valueOf(offsetOfExpectedId),
+ seekedAndTryTimes, ledgerMetadata != null ? ledgerMetadata.getLastEntryId() : "unknown");
+ throw new BKException.BKUnexpectedConditionException();
+ }
+ }
+ }
+
+ private void seekToEntryOffset(long expectedEntryId) throws IOException, BKException {
+ Long cachedPreciseIndex = entryOffsetsCache.getIfPresent(ledgerId, expectedEntryId);
+ if (cachedPreciseIndex != null) {
+ inputStream.seek(cachedPreciseIndex);
+ return;
+ }
+
+ OffloadIndexEntry indexOfNearestEntry = index.getIndexEntryForEntry(expectedEntryId);
+ if (indexOfNearestEntry.getEntryId() == expectedEntryId) {
+ inputStream.seek(indexOfNearestEntry.getDataOffset());
+ return;
+ }
+
+ Long cachedPreviousKnownOffset = entryOffsetsCache.getIfPresent(ledgerId, expectedEntryId - 1);
+ if (cachedPreviousKnownOffset != null) {
+ inputStream.seek(cachedPreviousKnownOffset);
+ skipPreviousEntry(expectedEntryId - 1, expectedEntryId);
+ return;
+ }
+
+ if (indexOfNearestEntry.getEntryId() < expectedEntryId) {
+ inputStream.seek(indexOfNearestEntry.getDataOffset());
+ skipPreviousEntry(indexOfNearestEntry.getEntryId(), expectedEntryId);
+ } else {
+ LedgerMetadata ledgerMetadata = getLedgerMetadata();
+ log.error("Failed to read [ {} ~ {} ] of the ledger {}."
+ + " Because got a incorrect index {} of the entry {}, which is greater than expected."
+ + " Have seeked and retry read times: {}. LAC is {}.",
+ firstEntry, lastEntry, ledgerId,
+ String.valueOf(indexOfNearestEntry), expectedEntryId,
+ seekedAndTryTimes, ledgerMetadata != null ? ledgerMetadata.getLastEntryId() : "unknown");
+ throw new BKException.BKUnexpectedConditionException();
+ }
+ }
+ }
+
+ @Override
+ public CompletableFuture readAsync(long firstEntry, long lastEntry) {
+ if (log.isDebugEnabled()) {
+ log.debug("Ledger {}: reading {} - {} ({} entries}",
+ getId(), firstEntry, lastEntry, (1 + lastEntry - firstEntry));
+ }
+ CompletableFuture promise = new CompletableFuture<>();
+
+ PENDING_READ_UPDATER.incrementAndGet(this);
+ promise.whenComplete((__, ex) -> {
+ lastAccessTimestamp = System.currentTimeMillis();
+ PENDING_READ_UPDATER.decrementAndGet(OpenDALBackedReadHandleImpl.this);
+ });
+ executor.execute(new ReadTask(firstEntry, lastEntry, promise));
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) {
+ return readAsync(firstEntry, lastEntry);
+ }
+
+ @Override
+ public CompletableFuture readLastAddConfirmedAsync() {
+ return CompletableFuture.completedFuture(getLastAddConfirmed());
+ }
+
+ @Override
+ public CompletableFuture tryReadLastAddConfirmedAsync() {
+ return CompletableFuture.completedFuture(getLastAddConfirmed());
+ }
+
+ @Override
+ public long getLastAddConfirmed() {
+ return getLedgerMetadata().getLastEntryId();
+ }
+
+ @Override
+ public long getLength() {
+ return getLedgerMetadata().getLength();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return getLedgerMetadata().isClosed();
+ }
+
+ @Override
+ public CompletableFuture readLastAddConfirmedAndEntryAsync(long entryId,
+ long timeOutInMillis,
+ boolean parallel) {
+ CompletableFuture promise = new CompletableFuture<>();
+ promise.completeExceptionally(new UnsupportedOperationException());
+ return promise;
+ }
+
+ public static ReadHandle open(ScheduledExecutorService executor,
+ OpenDALStorage storage,
+ String dataKey,
+ String indexKey,
+ long ledgerId,
+ int readBufferSize,
+ LedgerOffloaderStats offloaderStats,
+ String managedLedgerName,
+ OffsetsCache entryOffsetsCache)
+ throws IOException, BKException.BKNoSuchLedgerExistsException {
+ int retryCount = 3;
+ OffloadIndexBlock index = null;
+ IOException lastException = null;
+ String topicName = TopicName.fromPersistenceNamingEncoding(managedLedgerName);
+ while (retryCount-- > 0) {
+ long readIndexStartTime = System.nanoTime();
+ try {
+ OpenDALStorage.ObjectMetadata meta = storage.stat(indexKey);
+ if (meta.getSize() <= 0) {
+ throw new IOException("Index object is empty: " + indexKey);
+ }
+ try (InputStream in = storage.readRange(indexKey, 0, meta.getSize() - 1)) {
+ OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
+ index = (OffloadIndexBlock) indexBuilder.fromStream(in);
+ }
+ offloaderStats.recordReadOffloadIndexLatency(topicName,
+ System.nanoTime() - readIndexStartTime, TimeUnit.NANOSECONDS);
+ lastException = null;
+ break;
+ } catch (FileNotFoundException notFound) {
+ log.error("{} not found for ledger {}", indexKey, ledgerId);
+ throw new BKException.BKNoSuchLedgerExistsException();
+ } catch (IOException e) {
+ log.warn("Failed to get index block from the offloaded index file {}, still have {} times to retry",
+ indexKey, retryCount, e);
+ lastException = e;
+ }
+ }
+ if (lastException != null) {
+ throw lastException;
+ }
+ if (index == null) {
+ throw new IOException("Failed to open offloaded index " + indexKey);
+ }
+
+ BackedInputStream inputStream = new OpenDALBackedInputStream(storage, dataKey,
+ index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName);
+ return new OpenDALBackedReadHandleImpl(ledgerId, index, inputStream, executor, entryOffsetsCache);
+ }
+
+ @VisibleForTesting
+ State getState() {
+ return this.state;
+ }
+
+ @Override
+ public long lastAccessTimestamp() {
+ return lastAccessTimestamp;
+ }
+
+ @Override
+ public int getPendingRead() {
+ return PENDING_READ_UPDATER.get(this);
+ }
+}
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALBackedReadHandleImplV2.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALBackedReadHandleImplV2.java
new file mode 100644
index 0000000000000..c6d9bf663fec5
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALBackedReadHandleImplV2.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import io.netty.buffer.ByteBuf;
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.OffloadedLedgerHandle;
+import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
+import org.apache.bookkeeper.mledger.offload.opendal.storage.OpenDALStorage;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class OpenDALBackedReadHandleImplV2 implements ReadHandle, OffloadedLedgerHandle {
+
+ private static final AtomicIntegerFieldUpdater PENDING_READ_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(OpenDALBackedReadHandleImplV2.class, "pendingRead");
+
+ private final long ledgerId;
+ private final List indices;
+ private final List inputStreams;
+ private final List dataStreams;
+ private final ExecutorService executor;
+ private final AtomicReference> closeFuture = new AtomicReference<>();
+
+ private volatile State state;
+ private volatile int pendingRead;
+ private volatile long lastAccessTimestamp = System.currentTimeMillis();
+
+ enum State {
+ Opened,
+ Closed
+ }
+
+ static class GroupedReader {
+ public final long ledgerId;
+ public final long firstEntry;
+ public final long lastEntry;
+ OffloadIndexBlockV2 index;
+ BackedInputStream inputStream;
+ DataInputStream dataStream;
+
+ GroupedReader(long ledgerId,
+ long firstEntry,
+ long lastEntry,
+ OffloadIndexBlockV2 index,
+ BackedInputStream inputStream,
+ DataInputStream dataStream) {
+ this.ledgerId = ledgerId;
+ this.firstEntry = firstEntry;
+ this.lastEntry = lastEntry;
+ this.index = index;
+ this.inputStream = inputStream;
+ this.dataStream = dataStream;
+ }
+
+ @Override
+ public String toString() {
+ return "GroupedReader{ledgerId=" + ledgerId + ", firstEntry=" + firstEntry + ", lastEntry=" + lastEntry
+ + '}';
+ }
+ }
+
+ private OpenDALBackedReadHandleImplV2(long ledgerId,
+ List indices,
+ List inputStreams,
+ ExecutorService executor) {
+ this.ledgerId = ledgerId;
+ this.indices = indices;
+ this.inputStreams = inputStreams;
+ this.dataStreams = new LinkedList<>();
+ for (BackedInputStream inputStream : inputStreams) {
+ dataStreams.add(new DataInputStream(inputStream));
+ }
+ this.executor = executor;
+ this.state = State.Opened;
+ }
+
+ @Override
+ public long getId() {
+ return ledgerId;
+ }
+
+ @Override
+ public LedgerMetadata getLedgerMetadata() {
+ // Return the most complete one.
+ return indices.get(indices.size() - 1).getLedgerMetadata(ledgerId);
+ }
+
+ @Override
+ public CompletableFuture closeAsync() {
+ if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
+ return closeFuture.get();
+ }
+
+ CompletableFuture promise = closeFuture.get();
+ executor.execute(() -> {
+ try {
+ for (OffloadIndexBlockV2 indexBlock : indices) {
+ indexBlock.close();
+ }
+ for (DataInputStream dataStream : dataStreams) {
+ dataStream.close();
+ }
+ state = State.Closed;
+ promise.complete(null);
+ } catch (IOException t) {
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture readAsync(long firstEntry, long lastEntry) {
+ if (log.isDebugEnabled()) {
+ log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
+ }
+ lastAccessTimestamp = System.currentTimeMillis();
+
+ CompletableFuture promise = new CompletableFuture<>();
+ executor.execute(() -> {
+ if (state == State.Closed) {
+ log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
+ ledgerId, firstEntry, lastEntry);
+ promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException());
+ return;
+ }
+
+ if (firstEntry > lastEntry
+ || firstEntry < 0
+ || lastEntry > getLastAddConfirmed()) {
+ promise.completeExceptionally(new BKException.BKIncorrectParameterException());
+ return;
+ }
+
+ List entries = new ArrayList<>();
+ List groupedReaders;
+ try {
+ groupedReaders = getGroupedReader(firstEntry, lastEntry);
+ } catch (Exception e) {
+ promise.completeExceptionally(e);
+ return;
+ }
+
+ PENDING_READ_UPDATER.incrementAndGet(this);
+ try {
+ for (GroupedReader groupedReader : groupedReaders) {
+ long entriesToRead = (groupedReader.lastEntry - groupedReader.firstEntry) + 1;
+ long nextExpectedId = groupedReader.firstEntry;
+ while (entriesToRead > 0) {
+ int length = groupedReader.dataStream.readInt();
+ if (length < 0) { // hit padding or new block
+ groupedReader.inputStream.seek(groupedReader.index
+ .getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+ .getDataOffset());
+ continue;
+ }
+ long entryId = groupedReader.dataStream.readLong();
+
+ if (entryId == nextExpectedId) {
+ ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
+ entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
+ int toWrite = length;
+ while (toWrite > 0) {
+ toWrite -= buf.writeBytes(groupedReader.dataStream, toWrite);
+ }
+ entriesToRead--;
+ nextExpectedId++;
+ } else if (entryId > nextExpectedId) {
+ groupedReader.inputStream.seek(groupedReader.index
+ .getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+ .getDataOffset());
+ } else if (entryId < nextExpectedId
+ && !groupedReader.index.getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+ .equals(groupedReader.index.getIndexEntryForEntry(groupedReader.ledgerId, entryId))) {
+ groupedReader.inputStream.seek(groupedReader.index
+ .getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+ .getDataOffset());
+ } else if (entryId > groupedReader.lastEntry) {
+ log.info("Expected to read {}, but read {}, which is greater than last entry {}",
+ nextExpectedId, entryId, groupedReader.lastEntry);
+ throw new BKException.BKUnexpectedConditionException();
+ } else {
+ skipFully(groupedReader.inputStream, length);
+ }
+ }
+ }
+ promise.complete(LedgerEntriesImpl.create(entries));
+ } catch (Throwable t) {
+ if (t instanceof FileNotFoundException) {
+ promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsException());
+ } else {
+ promise.completeExceptionally(t);
+ }
+ entries.forEach(LedgerEntry::close);
+ } finally {
+ PENDING_READ_UPDATER.decrementAndGet(this);
+ }
+ });
+ return promise;
+ }
+
+ private List getGroupedReader(long firstEntry, long lastEntry) throws Exception {
+ List groupedReaders = new LinkedList<>();
+ for (int i = indices.size() - 1; i >= 0 && firstEntry <= lastEntry; i--) {
+ OffloadIndexBlockV2 index = indices.get(i);
+ long startEntryId = index.getStartEntryId(ledgerId);
+ if (startEntryId > lastEntry) {
+ log.debug("Entries are in earlier indices, skip this segment. ledgerId={}, beginEntryId={}",
+ ledgerId, startEntryId);
+ } else {
+ groupedReaders.add(new GroupedReader(ledgerId, startEntryId, lastEntry,
+ index, inputStreams.get(i), dataStreams.get(i)));
+ lastEntry = startEntryId - 1;
+ }
+ }
+
+ checkArgument(firstEntry > lastEntry);
+ for (int i = 0; i < groupedReaders.size() - 1; i++) {
+ GroupedReader readerI = groupedReaders.get(i);
+ GroupedReader readerII = groupedReaders.get(i + 1);
+ checkArgument(readerI.ledgerId == readerII.ledgerId);
+ checkArgument(readerI.firstEntry >= readerII.lastEntry);
+ }
+ return groupedReaders;
+ }
+
+ @Override
+ public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) {
+ return readAsync(firstEntry, lastEntry);
+ }
+
+ @Override
+ public CompletableFuture readLastAddConfirmedAsync() {
+ return CompletableFuture.completedFuture(getLastAddConfirmed());
+ }
+
+ @Override
+ public CompletableFuture tryReadLastAddConfirmedAsync() {
+ return CompletableFuture.completedFuture(getLastAddConfirmed());
+ }
+
+ @Override
+ public long getLastAddConfirmed() {
+ return getLedgerMetadata().getLastEntryId();
+ }
+
+ @Override
+ public long getLength() {
+ return getLedgerMetadata().getLength();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return getLedgerMetadata().isClosed();
+ }
+
+ @Override
+ public CompletableFuture readLastAddConfirmedAndEntryAsync(long entryId,
+ long timeOutInMillis,
+ boolean parallel) {
+ CompletableFuture promise = new CompletableFuture<>();
+ promise.completeExceptionally(new UnsupportedOperationException());
+ return promise;
+ }
+
+ public static ReadHandle open(ScheduledExecutorService executor,
+ OpenDALStorage storage,
+ List keys,
+ List indexKeys,
+ long ledgerId,
+ int readBufferSize,
+ LedgerOffloaderStats offloaderStats,
+ String managedLedgerName)
+ throws IOException, BKException.BKNoSuchLedgerExistsException {
+ List inputStreams = new LinkedList<>();
+ List indices = new LinkedList<>();
+ String topicName = managedLedgerName != null
+ ? TopicName.fromPersistenceNamingEncoding(managedLedgerName)
+ : null;
+
+ for (int i = 0; i < indexKeys.size(); i++) {
+ String indexKey = indexKeys.get(i);
+ String key = keys.get(i);
+
+ long startTime = System.nanoTime();
+ OpenDALStorage.ObjectMetadata meta;
+ try {
+ meta = storage.stat(indexKey);
+ } catch (FileNotFoundException notFound) {
+ log.error("{} not found while opening V2 offloaded ledger {}", indexKey, ledgerId);
+ throw new BKException.BKNoSuchLedgerExistsException();
+ }
+ if (offloaderStats != null && topicName != null) {
+ offloaderStats.recordReadOffloadIndexLatency(topicName,
+ System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+ }
+ if (meta.getSize() <= 0) {
+ throw new IOException("Index object is empty: " + indexKey);
+ }
+
+ OffloadIndexBlockV2Builder indexBuilder = OffloadIndexBlockV2Builder.create();
+ OffloadIndexBlockV2 index;
+ try (InputStream payloadStream = storage.readRange(indexKey, 0, meta.getSize() - 1)) {
+ index = indexBuilder.fromStream(payloadStream);
+ } catch (FileNotFoundException notFound) {
+ log.error("{} not found while opening V2 offloaded ledger {}", indexKey, ledgerId);
+ throw new BKException.BKNoSuchLedgerExistsException();
+ }
+
+ BackedInputStream inputStream = new OpenDALBackedInputStream(storage, key,
+ index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName);
+ inputStreams.add(inputStream);
+ indices.add(index);
+ }
+
+ return new OpenDALBackedReadHandleImplV2(ledgerId, indices, inputStreams, executor);
+ }
+
+ @Override
+ public long lastAccessTimestamp() {
+ return lastAccessTimestamp;
+ }
+
+ @Override
+ public int getPendingRead() {
+ return PENDING_READ_UPDATER.get(this);
+ }
+
+ private static void skipFully(InputStream in, long bytes) throws IOException {
+ long remaining = bytes;
+ while (remaining > 0) {
+ long skipped = in.skip(remaining);
+ if (skipped > 0) {
+ remaining -= skipped;
+ continue;
+ }
+ if (in.read() < 0) {
+ throw new IOException("Unexpected EOF while skipping " + bytes + " bytes");
+ }
+ remaining--;
+ }
+ }
+}
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALManagedLedgerOffloader.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALManagedLedgerOffloader.java
new file mode 100644
index 0000000000000..503926b85858d
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALManagedLedgerOffloader.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.LedgerOffloader.OffloadHandle;
+import org.apache.bookkeeper.mledger.LedgerOffloader.OffloadHandle.OfferEntryResult;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.OffloadedLedgerMetadata;
+import org.apache.bookkeeper.mledger.OffloadedLedgerMetadataConsumer;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.OffloadSegmentInfoImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock.IndexInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlockAwareSegmentInputStreamImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.BufferedOffloadStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffsetsCache;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.StreamingDataBlockHeaderImpl;
+import org.apache.bookkeeper.mledger.offload.opendal.provider.OpenDALOperatorProvider;
+import org.apache.bookkeeper.mledger.offload.opendal.provider.OpenDALTieredStorageConfiguration;
+import org.apache.bookkeeper.mledger.offload.opendal.storage.OpenDALStorage;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+
+/**
+ * Tiered Storage Offloader backed by OpenDAL.
+ *
+ * This implementation keeps the same object key naming and on-disk formats as the existing jcloud implementation.
+ */
+@Slf4j
+public class OpenDALManagedLedgerOffloader implements LedgerOffloader {
+
+ private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName";
+
+ private final OrderedScheduler scheduler;
+ private final OrderedScheduler readExecutor;
+ private final OpenDALTieredStorageConfiguration config;
+ private final OffloadPolicies policies;
+ private final Map userMetadata;
+ private final OpenDALOperatorProvider operatorProvider;
+ private final OffsetsCache entryOffsetsCache;
+ private final LedgerOffloaderStats offloaderStats;
+
+ private final AtomicLong bufferLength = new AtomicLong(0);
+ private final AtomicLong segmentLength = new AtomicLong(0);
+ private final long maxBufferLength;
+ private final ConcurrentLinkedQueue offloadBuffer = new ConcurrentLinkedQueue<>();
+ private final Duration maxSegmentCloseTime;
+ private final long minSegmentCloseTimeMillis;
+ private final long segmentBeginTimeMillis;
+ private final long maxSegmentLength;
+ private final int streamingBlockSize;
+
+ private CompletableFuture offloadResult;
+ private volatile Position lastOfferedPosition = PositionFactory.LATEST;
+ private OffloadIndexBlockV2Builder streamingIndexBuilder;
+ private OffloadSegmentInfoImpl segmentInfo;
+ private OpenDALStorage streamingStorage;
+ private OutputStream streamingDataOut;
+ private String streamingDataBlockKey;
+ private String streamingDataIndexKey;
+
+ public static OpenDALManagedLedgerOffloader create(OpenDALTieredStorageConfiguration config,
+ Map userMetadata,
+ OrderedScheduler scheduler,
+ OrderedScheduler readExecutor,
+ LedgerOffloaderStats offloaderStats,
+ OffsetsCache entryOffsetsCache,
+ OpenDALOperatorProvider operatorProvider) throws IOException {
+ config.validate();
+ return new OpenDALManagedLedgerOffloader(config, userMetadata, scheduler, readExecutor, offloaderStats,
+ entryOffsetsCache, operatorProvider);
+ }
+
+ OpenDALManagedLedgerOffloader(OpenDALTieredStorageConfiguration config,
+ Map userMetadata,
+ OrderedScheduler scheduler,
+ OrderedScheduler readExecutor,
+ LedgerOffloaderStats offloaderStats,
+ OffsetsCache entryOffsetsCache,
+ OpenDALOperatorProvider operatorProvider) {
+ this.scheduler = scheduler;
+ this.readExecutor = readExecutor;
+ this.userMetadata = userMetadata != null ? userMetadata : Collections.emptyMap();
+ this.config = config;
+ Properties properties = new Properties();
+ properties.putAll(config.getConfigProperties());
+ this.policies = OffloadPoliciesImpl.create(properties);
+ this.entryOffsetsCache = entryOffsetsCache;
+ this.operatorProvider = operatorProvider;
+ this.offloaderStats = offloaderStats;
+ this.streamingBlockSize = config.getMinBlockSizeInBytes();
+ this.maxSegmentCloseTime = Duration.ofSeconds(config.getMaxSegmentTimeInSecond());
+ this.maxSegmentLength = config.getMaxSegmentSizeInBytes();
+ this.minSegmentCloseTimeMillis = Duration.ofSeconds(config.getMinSegmentTimeInSecond()).toMillis();
+ this.maxBufferLength = Math.max(config.getWriteBufferSizeInBytes(), config.getMinBlockSizeInBytes());
+ this.segmentBeginTimeMillis = System.currentTimeMillis();
+ log.info("OpenDAL offloader created. driver={}, scheme={}, bucket={}, endpoint={}, region={}",
+ config.getDriver(), config.getScheme(), config.getBucket(), config.getServiceEndpoint(),
+ config.getRegion());
+ }
+
+ @Override
+ public String getOffloadDriverName() {
+ return config.getDriver();
+ }
+
+ @Override
+ public Map getOffloadDriverMetadata() {
+ return config.getOffloadDriverMetadata();
+ }
+
+ @Override
+ public CompletableFuture offload(ReadHandle readHandle, UUID uuid, Map extraMetadata) {
+ final String managedLedgerName = extraMetadata != null ? extraMetadata.get(MANAGED_LEDGER_NAME) : null;
+ final String topicName = managedLedgerName != null
+ ? TopicName.fromPersistenceNamingEncoding(managedLedgerName)
+ : "unknown";
+
+ CompletableFuture promise = new CompletableFuture<>();
+ scheduler.chooseThread(readHandle.getId()).execute(() -> {
+ OpenDALStorage storage = new OpenDALStorage(operatorProvider, config.getOffloadDriverMetadata());
+ String dataKey = DataBlockUtils.dataBlockOffloadKey(readHandle.getId(), uuid);
+ String indexKey = DataBlockUtils.indexBlockOffloadKey(readHandle.getId(), uuid);
+
+ if (!readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) {
+ promise.completeExceptionally(
+ new IllegalArgumentException("An empty or open ledger should never be offloaded"));
+ return;
+ }
+
+ long dataObjectLength = 0;
+ try {
+ OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create()
+ .withLedgerMetadata(readHandle.getLedgerMetadata())
+ .withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize());
+
+ long startEntry = 0;
+ int partId = 1;
+ long entryBytesWritten = 0;
+ try (OutputStream dataOut = storage.openOutputStream(dataKey)) {
+ while (startEntry <= readHandle.getLastAddConfirmed()) {
+ int blockSize = BlockAwareSegmentInputStreamImpl.calculateBlockSize(
+ config.getMaxBlockSizeInBytes(), readHandle, startEntry, entryBytesWritten);
+
+ try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl(
+ readHandle, startEntry, blockSize, this.offloaderStats, managedLedgerName)) {
+ copyStream(blockStream, dataOut);
+ indexBuilder.addBlock(startEntry, partId, blockSize);
+
+ if (blockStream.getEndEntryId() != -1) {
+ startEntry = blockStream.getEndEntryId() + 1;
+ } else {
+ break;
+ }
+ entryBytesWritten += blockStream.getBlockEntryBytesCount();
+ partId++;
+ dataObjectLength += blockSize;
+ this.offloaderStats.recordOffloadBytes(topicName, blockStream.getBlockEntryBytesCount());
+ }
+ }
+ }
+
+ try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build();
+ IndexInputStream indexStream = index.toStream()) {
+ byte[] indexBytes = readAllBytes(indexStream, indexStream.getStreamSize());
+ Map objectMetadata = new HashMap<>(userMetadata);
+ objectMetadata.put("role", "index");
+ if (extraMetadata != null) {
+ objectMetadata.putAll(extraMetadata);
+ }
+ storage.writeBytes(indexKey, indexBytes, DataBlockUtils.withVersionInfo(objectMetadata));
+ }
+
+ promise.complete(null);
+ } catch (Throwable t) {
+ try {
+ storage.delete(dataKey);
+ } catch (Throwable t2) {
+ log.warn("Failed to cleanup data object {}", dataKey, t2);
+ }
+ try {
+ storage.delete(indexKey);
+ } catch (Throwable t2) {
+ log.warn("Failed to cleanup index object {}", indexKey, t2);
+ }
+ this.offloaderStats.recordWriteToStorageError(topicName);
+ this.offloaderStats.recordOffloadError(topicName);
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture readOffloaded(long ledgerId, UUID uid,
+ Map offloadDriverMetadata) {
+ CompletableFuture promise = new CompletableFuture<>();
+ String dataKey = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid);
+ String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid);
+ String managedLedgerName = offloadDriverMetadata != null
+ ? offloadDriverMetadata.get(MANAGED_LEDGER_NAME)
+ : null;
+
+ readExecutor.chooseThread(ledgerId).execute(() -> {
+ try {
+ OpenDALStorage storage = new OpenDALStorage(operatorProvider,
+ offloadDriverMetadata != null ? offloadDriverMetadata : Collections.emptyMap());
+ promise.complete(OpenDALBackedReadHandleImpl.open(readExecutor.chooseThread(ledgerId), storage,
+ dataKey, indexKey, ledgerId, config.getReadBufferSizeInBytes(), this.offloaderStats,
+ managedLedgerName, this.entryOffsetsCache));
+ } catch (BKException.BKNoSuchLedgerExistsException e) {
+ promise.completeExceptionally(e);
+ } catch (Throwable t) {
+ log.error("Failed readOffloaded ledger {}", ledgerId, t);
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture readOffloaded(long ledgerId,
+ MLDataFormats.OffloadContext ledgerContext,
+ Map offloadDriverMetadata) {
+ CompletableFuture promise = new CompletableFuture<>();
+
+ List keys = new LinkedList<>();
+ List indexKeys = new LinkedList<>();
+ for (MLDataFormats.OffloadSegment seg : ledgerContext.getOffloadSegmentList()) {
+ UUID uuid = new UUID(seg.getUidMsb(), seg.getUidLsb());
+ keys.add(uuid.toString());
+ indexKeys.add(DataBlockUtils.indexBlockOffloadKey(uuid));
+ }
+
+ String managedLedgerName = offloadDriverMetadata != null
+ ? offloadDriverMetadata.get(MANAGED_LEDGER_NAME)
+ : null;
+ readExecutor.chooseThread(ledgerId).execute(() -> {
+ try {
+ OpenDALStorage storage = new OpenDALStorage(operatorProvider,
+ offloadDriverMetadata != null ? offloadDriverMetadata : Collections.emptyMap());
+ promise.complete(OpenDALBackedReadHandleImplV2.open(readExecutor.chooseThread(ledgerId), storage,
+ keys, indexKeys, ledgerId, config.getReadBufferSizeInBytes(), this.offloaderStats,
+ managedLedgerName));
+ } catch (BKException.BKNoSuchLedgerExistsException e) {
+ promise.completeExceptionally(e);
+ } catch (Throwable t) {
+ log.error("Failed readOffloaded (V2) ledger {}", ledgerId, t);
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture streamingOffload(ManagedLedger ml,
+ UUID uuid,
+ long beginLedger,
+ long beginEntry,
+ Map driverMetadata) {
+ if (this.segmentInfo != null) {
+ CompletableFuture result = new CompletableFuture<>();
+ result.completeExceptionally(new IllegalStateException("streamingOffload should only be called once"));
+ return result;
+ }
+
+ this.segmentInfo = new OffloadSegmentInfoImpl(uuid, beginLedger, beginEntry, config.getDriver(),
+ driverMetadata != null ? driverMetadata : Collections.emptyMap());
+ this.offloadResult = new CompletableFuture<>();
+
+ this.streamingIndexBuilder = OffloadIndexBlockV2Builder.create()
+ .withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ this.streamingDataBlockKey = segmentInfo.uuid.toString();
+ this.streamingDataIndexKey = DataBlockUtils.indexBlockOffloadKey(segmentInfo.uuid);
+ this.streamingStorage = new OpenDALStorage(operatorProvider, config.getOffloadDriverMetadata());
+
+ try {
+ this.streamingDataOut = streamingStorage.openOutputStream(streamingDataBlockKey);
+ } catch (IOException e) {
+ CompletableFuture result = new CompletableFuture<>();
+ result.completeExceptionally(e);
+ return result;
+ }
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("Start streaming offload segment: {}", segmentInfo);
+ streamingOffloadLoop(ml, 1, 0);
+ });
+ scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(), TimeUnit.MILLISECONDS);
+
+ return CompletableFuture.completedFuture(new OffloadHandle() {
+ @Override
+ public Position lastOffered() {
+ return OpenDALManagedLedgerOffloader.this.lastOffered();
+ }
+
+ @Override
+ public CompletableFuture lastOfferedAsync() {
+ return CompletableFuture.completedFuture(lastOffered());
+ }
+
+ @Override
+ public OfferEntryResult offerEntry(Entry entry) {
+ return OpenDALManagedLedgerOffloader.this.offerEntry(entry);
+ }
+
+ @Override
+ public CompletableFuture offerEntryAsync(Entry entry) {
+ return CompletableFuture.completedFuture(offerEntry(entry));
+ }
+
+ @Override
+ public CompletableFuture getOffloadResultAsync() {
+ return OpenDALManagedLedgerOffloader.this.getOffloadResultAsync();
+ }
+
+ @Override
+ public boolean close() {
+ return OpenDALManagedLedgerOffloader.this.closeSegment();
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture deleteOffloaded(long ledgerId, UUID uid, Map offloadDriverMetadata) {
+ CompletableFuture promise = new CompletableFuture<>();
+ String dataKey = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid);
+ String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid);
+ final String managedLedgerName =
+ offloadDriverMetadata != null ? offloadDriverMetadata.get(MANAGED_LEDGER_NAME) : null;
+ final String topicName = managedLedgerName != null
+ ? TopicName.fromPersistenceNamingEncoding(managedLedgerName)
+ : "unknown";
+ scheduler.execute(() -> {
+ try {
+ OpenDALStorage storage = new OpenDALStorage(operatorProvider,
+ offloadDriverMetadata != null ? offloadDriverMetadata : Collections.emptyMap());
+ storage.delete(dataKey);
+ storage.delete(indexKey);
+ promise.complete(null);
+ } catch (Throwable t) {
+ log.error("Failed delete offloaded objects for ledger {}", ledgerId, t);
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise.whenComplete((__, t) -> this.offloaderStats.recordDeleteOffloadOps(topicName, t == null));
+ }
+
+ @Override
+ public CompletableFuture deleteOffloaded(UUID uid, Map offloadDriverMetadata) {
+ CompletableFuture promise = new CompletableFuture<>();
+ String dataKey = uid.toString();
+ String indexKey = DataBlockUtils.indexBlockOffloadKey(uid);
+ scheduler.execute(() -> {
+ try {
+ OpenDALStorage storage = new OpenDALStorage(operatorProvider,
+ offloadDriverMetadata != null ? offloadDriverMetadata : Collections.emptyMap());
+ storage.delete(dataKey);
+ storage.delete(indexKey);
+ promise.complete(null);
+ } catch (Throwable t) {
+ log.error("Failed delete offloaded objects for uuid {}", uid, t);
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public OffloadPolicies getOffloadPolicies() {
+ return policies;
+ }
+
+ @Override
+ public void close() {
+ // All shared resources are managed by the factory (e.g. OperatorCache, OffsetsCache).
+ }
+
+ @Override
+ public void scanLedgers(OffloadedLedgerMetadataConsumer consumer,
+ Map offloadDriverMetadata) throws ManagedLedgerException {
+ int batchSize = 100;
+ String marker = null;
+ OpenDALStorage storage = new OpenDALStorage(operatorProvider,
+ offloadDriverMetadata != null ? offloadDriverMetadata : Collections.emptyMap());
+ do {
+ OpenDALStorage.ListResult list;
+ try {
+ list = storage.list("", marker, batchSize);
+ } catch (IOException e) {
+ throw ManagedLedgerException.getManagedLedgerException(e);
+ }
+ for (OpenDALStorage.Item item : list.getItems()) {
+ String name = item.getPath();
+ Long ledgerId = DataBlockUtils.parseLedgerId(name);
+ if (ledgerId == null) {
+ continue;
+ }
+ String contextUuid = DataBlockUtils.parseContextUuid(name, ledgerId);
+ Instant lastModified = item.getMetadata().getLastModified();
+ long lastModifiedMillis = lastModified != null ? lastModified.toEpochMilli() : 0;
+ OffloadedLedgerMetadata offloadedLedgerMetadata = OffloadedLedgerMetadata.builder()
+ .name(name)
+ .bucketName(config.getBucket())
+ .uuid(contextUuid)
+ .ledgerId(ledgerId)
+ .lastModified(lastModifiedMillis)
+ .size(item.getMetadata().getSize())
+ .uri(null)
+ .userMetadata(Collections.emptyMap())
+ .build();
+ try {
+ boolean canContinue = consumer.accept(offloadedLedgerMetadata);
+ if (!canContinue) {
+ log.info("Iteration stopped by the OffloadedLedgerMetadataConsumer");
+ return;
+ }
+ } catch (Exception err) {
+ if (err instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw ManagedLedgerException.getManagedLedgerException(err);
+ }
+ }
+ marker = list.getNextMarker();
+ } while (marker != null);
+ }
+
+ private void streamingOffloadLoop(ManagedLedger ml, int partId, long dataObjectLength) {
+ if (offloadResult != null && offloadResult.isDone()) {
+ return;
+ }
+ if (segmentInfo == null) {
+ return;
+ }
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ buildIndexAndCompleteResult(dataObjectLength);
+ return;
+ }
+
+ if ((segmentInfo.isClosed() && !offloadBuffer.isEmpty()) || bufferLength.get() >= streamingBlockSize) {
+ List entries = new LinkedList<>();
+ int blockEntrySize = 0;
+
+ Entry firstEntry = offloadBuffer.poll();
+ if (firstEntry == null) {
+ scheduler.chooseThread(segmentInfo).schedule(() -> streamingOffloadLoop(ml, partId, dataObjectLength),
+ 100, TimeUnit.MILLISECONDS);
+ return;
+ }
+ int firstEntrySize = firstEntry.getLength();
+ bufferLength.addAndGet(-firstEntrySize);
+ blockEntrySize += firstEntrySize;
+ entries.add(firstEntry);
+ long blockLedgerId = firstEntry.getLedgerId();
+ long blockEntryId = firstEntry.getEntryId();
+
+ while (!offloadBuffer.isEmpty()
+ && offloadBuffer.peek().getLedgerId() == blockLedgerId
+ && blockEntrySize <= streamingBlockSize) {
+ Entry entryInBlock = offloadBuffer.poll();
+ if (entryInBlock == null) {
+ break;
+ }
+ int entrySize = entryInBlock.getLength();
+ bufferLength.addAndGet(-entrySize);
+ blockEntrySize += entrySize;
+ entries.add(entryInBlock);
+ }
+
+ int blockSize = BufferedOffloadStream.calculateBlockSize(
+ streamingBlockSize, entries.size(), blockEntrySize);
+ buildBlockAndWrite(ml, blockSize, entries, blockLedgerId, blockEntryId, partId);
+ streamingOffloadLoop(ml, partId + 1, dataObjectLength + blockSize);
+ } else {
+ scheduler.chooseThread(segmentInfo).schedule(() -> streamingOffloadLoop(ml, partId, dataObjectLength),
+ 100, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private void buildBlockAndWrite(ManagedLedger ml,
+ int blockSize,
+ List entries,
+ long blockLedgerId,
+ long beginEntryId,
+ int partId) {
+ try (BufferedOffloadStream payloadStream = new BufferedOffloadStream(blockSize, entries,
+ blockLedgerId, beginEntryId)) {
+ copyStream(payloadStream, streamingDataOut);
+ streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ streamingIndexBuilder.addBlock(blockLedgerId, beginEntryId, partId, blockSize);
+
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ml.getLedgerInfo(blockLedgerId).get();
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo.Builder ledgerInfoBuilder =
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder();
+ if (ledgerInfo != null) {
+ ledgerInfoBuilder.mergeFrom(ledgerInfo);
+ }
+ if (ledgerInfoBuilder.getEntries() == 0) {
+ ledgerInfoBuilder.setEntries(payloadStream.getEndEntryId() + 1);
+ }
+ streamingIndexBuilder.addLedgerMeta(blockLedgerId, ledgerInfoBuilder.build());
+ } catch (Throwable e) {
+ failStreamingOffload(e);
+ }
+ }
+
+ private void buildIndexAndCompleteResult(long dataObjectLength) {
+ try {
+ if (streamingDataOut != null) {
+ streamingDataOut.close();
+ streamingDataOut = null;
+ }
+
+ streamingIndexBuilder.withDataObjectLength(dataObjectLength);
+ OffloadIndexBlockV2 index = streamingIndexBuilder.buildV2();
+ try (IndexInputStream indexStream = index.toStream()) {
+ byte[] indexBytes = readAllBytes(indexStream, indexStream.getStreamSize());
+ streamingStorage.writeBytes(streamingDataIndexKey, indexBytes,
+ DataBlockUtils.withVersionInfo(userMetadata));
+ }
+
+ offloadResult.complete(segmentInfo.result());
+ log.info("Streaming offload segment completed {}", segmentInfo.result());
+ } catch (Throwable t) {
+ failStreamingOffload(t);
+ }
+ }
+
+ private void failStreamingOffload(Throwable error) {
+ if (offloadResult != null && offloadResult.isDone()) {
+ return;
+ }
+ try {
+ if (streamingDataOut != null) {
+ streamingDataOut.close();
+ streamingDataOut = null;
+ }
+ } catch (Throwable t) {
+ log.warn("Failed to close streaming data output stream", t);
+ }
+ try {
+ if (streamingStorage != null && streamingDataBlockKey != null) {
+ streamingStorage.delete(streamingDataBlockKey);
+ }
+ } catch (Throwable t) {
+ log.warn("Failed to cleanup streaming data object {}", streamingDataBlockKey, t);
+ }
+ try {
+ if (streamingStorage != null && streamingDataIndexKey != null) {
+ streamingStorage.delete(streamingDataIndexKey);
+ }
+ } catch (Throwable t) {
+ log.warn("Failed to cleanup streaming index object {}", streamingDataIndexKey, t);
+ }
+
+ offloadResult.completeExceptionally(error);
+ }
+
+ private CompletableFuture getOffloadResultAsync() {
+ return this.offloadResult;
+ }
+
+ private synchronized OfferEntryResult offerEntry(Entry entry) {
+ if (segmentInfo == null) {
+ return OfferEntryResult.FAIL_SEGMENT_CLOSED;
+ }
+ if (segmentInfo.isClosed()) {
+ return OfferEntryResult.FAIL_SEGMENT_CLOSED;
+ }
+ if (maxBufferLength <= bufferLength.get()) {
+ return OfferEntryResult.FAIL_BUFFER_FULL;
+ }
+
+ EntryImpl entryImpl = EntryImpl.create(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer());
+ offloadBuffer.add(entryImpl);
+ bufferLength.getAndAdd(entryImpl.getLength());
+ segmentLength.getAndAdd(entryImpl.getLength());
+ lastOfferedPosition = entryImpl.getPosition();
+ if (segmentLength.get() >= maxSegmentLength
+ && System.currentTimeMillis() - segmentBeginTimeMillis >= minSegmentCloseTimeMillis) {
+ closeSegment();
+ }
+ return OfferEntryResult.SUCCESS;
+ }
+
+ private synchronized boolean closeSegment() {
+ if (segmentInfo == null) {
+ return false;
+ }
+ boolean result = !segmentInfo.isClosed();
+ if (result) {
+ segmentInfo.closeSegment(lastOfferedPosition.getLedgerId(), lastOfferedPosition.getEntryId());
+ }
+ return result;
+ }
+
+ private Position lastOffered() {
+ return lastOfferedPosition;
+ }
+
+ private static void copyStream(InputStream in, OutputStream out) throws IOException {
+ byte[] buffer = new byte[1024 * 64];
+ int read;
+ while ((read = in.read(buffer)) >= 0) {
+ if (read == 0) {
+ continue;
+ }
+ out.write(buffer, 0, read);
+ }
+ }
+
+ private static byte[] readAllBytes(InputStream in, long size) throws IOException {
+ if (size > Integer.MAX_VALUE) {
+ throw new IOException("Stream too large: " + size);
+ }
+ byte[] out = new byte[(int) size];
+ int offset = 0;
+ while (offset < out.length) {
+ int read = in.read(out, offset, out.length - offset);
+ if (read < 0) {
+ break;
+ }
+ offset += read;
+ }
+ if (offset != out.length) {
+ throw new IOException("Unexpected EOF: expected " + out.length + " bytes, got " + offset);
+ }
+ return out;
+ }
+}
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/package-info.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/package-info.java
new file mode 100644
index 0000000000000..eb26e27aed303
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/impl/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.impl;
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/package-info.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/package-info.java
new file mode 100644
index 0000000000000..82b800e57e2fc
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal;
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/provider/OpenDALOperatorProvider.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/provider/OpenDALOperatorProvider.java
new file mode 100644
index 0000000000000..33cd3a54e606d
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/provider/OpenDALOperatorProvider.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.provider;
+
+import java.util.HashMap;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.opendal.Operator;
+import org.apache.opendal.ServiceConfig;
+
+@RequiredArgsConstructor
+public class OpenDALOperatorProvider {
+
+ private static final String GCS_SERVICE_ACCOUNT_KEY_FILE = "gcsManagedLedgerOffloadServiceAccountKeyFile";
+ private static final String ENV_AZURE_STORAGE_ACCOUNT = "AZURE_STORAGE_ACCOUNT";
+ private static final String ENV_AZURE_STORAGE_ACCESS_KEY = "AZURE_STORAGE_ACCESS_KEY";
+
+ private final OpenDALTieredStorageConfiguration config;
+ private final OperatorCache operatorCache;
+
+ public Operator getOperator(Map offloadDriverMetadata) {
+ Map effective = effectiveLocation(offloadDriverMetadata);
+ String scheme = config.getScheme();
+
+ Map operatorConfig = new HashMap<>();
+ if ("s3".equalsIgnoreCase(scheme)) {
+ operatorConfig.putAll(buildS3Config(effective));
+ } else if ("gcs".equalsIgnoreCase(scheme)) {
+ operatorConfig.putAll(buildGcsConfig(effective));
+ } else if ("azblob".equalsIgnoreCase(scheme)) {
+ operatorConfig.putAll(buildAzblobConfig(effective));
+ } else if ("memory".equalsIgnoreCase(scheme)) {
+ operatorConfig.putAll(ServiceConfig.Memory.builder().build().configMap());
+ } else {
+ throw new IllegalArgumentException("Unsupported OpenDAL scheme: " + scheme);
+ }
+
+ // Apply universal extra config passthrough last so that it can override defaults.
+ operatorConfig.putAll(config.getExtraConfig());
+
+ OperatorCacheKey cacheKey = OperatorCacheKey.of(
+ scheme,
+ effective.get(OpenDALTieredStorageConfiguration.METADATA_FIELD_BUCKET),
+ effective.get(OpenDALTieredStorageConfiguration.METADATA_FIELD_REGION),
+ effective.get(OpenDALTieredStorageConfiguration.METADATA_FIELD_ENDPOINT),
+ operatorConfig);
+
+ return operatorCache.getOrCreate(cacheKey, () -> Operator.of(scheme, operatorConfig));
+ }
+
+ private Map buildS3Config(Map effectiveLocation) {
+ String bucket = effectiveLocation.get(OpenDALTieredStorageConfiguration.METADATA_FIELD_BUCKET);
+ String region = effectiveLocation.get(OpenDALTieredStorageConfiguration.METADATA_FIELD_REGION);
+ String endpoint = effectiveLocation.get(OpenDALTieredStorageConfiguration.METADATA_FIELD_ENDPOINT);
+
+ ServiceConfig.S3.S3Builder builder = ServiceConfig.S3.builder().bucket(bucket);
+
+ if (StringUtils.isNotBlank(region)) {
+ builder.region(region);
+ }
+ if (StringUtils.isNotBlank(endpoint)) {
+ builder.endpoint(endpoint);
+ }
+
+ // Align with tiered-storage-jcloud behavior: if a custom endpoint is configured,
+ // disable virtual host style by default to avoid DNS issues.
+ if (StringUtils.isNotBlank(endpoint) && !config.getExtraConfig().containsKey("enable_virtual_host_style")) {
+ builder.enableVirtualHostStyle(false);
+ }
+
+ String accessKeyId = config.getConfigProperties().get("s3ManagedLedgerOffloadCredentialId");
+ String secretAccessKey = config.getConfigProperties().get("s3ManagedLedgerOffloadCredentialSecret");
+ boolean hasExplicitCredentials = StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretAccessKey);
+ if (hasExplicitCredentials) {
+ builder.accessKeyId(accessKeyId).secretAccessKey(secretAccessKey);
+ }
+
+ String roleArn = config.getConfigProperties().get("s3ManagedLedgerOffloadRole");
+ boolean hasRole = StringUtils.isNotBlank(roleArn);
+ if (StringUtils.isNotBlank(roleArn)) {
+ builder.roleArn(roleArn);
+ }
+ String roleSessionName = config.getConfigProperties().get("s3ManagedLedgerOffloadRoleSessionName");
+ if (StringUtils.isNotBlank(roleSessionName)) {
+ builder.roleSessionName(roleSessionName);
+ }
+
+ // Keep behavior compatible with the existing S3 integration tests that use a mock endpoint without credentials.
+ // IMPORTANT: do NOT default to anonymous for real AWS S3 (endpoint not explicitly set).
+ String driver = config.getDriver();
+ boolean hasEnvAwsCredentials = StringUtils.isNotBlank(System.getenv("AWS_ACCESS_KEY_ID"))
+ && StringUtils.isNotBlank(System.getenv("AWS_SECRET_ACCESS_KEY"));
+ boolean shouldDefaultAllowAnonymous = "aws-s3".equalsIgnoreCase(driver)
+ && StringUtils.isNotBlank(endpoint)
+ && !hasExplicitCredentials
+ && !hasRole
+ && !hasEnvAwsCredentials
+ && !config.getExtraConfig().containsKey("allow_anonymous");
+ if (shouldDefaultAllowAnonymous) {
+ builder.allowAnonymous(true);
+ }
+
+ return builder.build().configMap();
+ }
+
+ private Map buildGcsConfig(Map effectiveLocation) {
+ String bucket = effectiveLocation.get(OpenDALTieredStorageConfiguration.METADATA_FIELD_BUCKET);
+ String endpoint = effectiveLocation.get(OpenDALTieredStorageConfiguration.METADATA_FIELD_ENDPOINT);
+
+ ServiceConfig.Gcs.GcsBuilder builder = ServiceConfig.Gcs.builder().bucket(bucket);
+ if (StringUtils.isNotBlank(endpoint)) {
+ builder.endpoint(endpoint);
+ }
+
+ // Keep compatibility with tiered-storage-jcloud: allow providing the service account key via file path.
+ // If users explicitly pass credential settings via managedLedgerOffloadExtraConfig*, prefer those.
+ Map extra = config.getExtraConfig();
+ boolean hasExplicitCredential = extra.containsKey("credential")
+ || extra.containsKey("credential_path")
+ || extra.containsKey("service_account")
+ || extra.containsKey("token");
+ boolean isHttpEndpoint = endpoint != null && endpoint.regionMatches(true, 0, "http://", 0, "http://".length());
+ String keyFilePath = StringUtils.trimToNull(config.getConfigProperties().get(GCS_SERVICE_ACCOUNT_KEY_FILE));
+ if (!hasExplicitCredential) {
+ if (keyFilePath != null) {
+ builder.credentialPath(keyFilePath);
+ } else if (isHttpEndpoint) {
+ // Emulator-friendly defaults:
+ // - Fake GCS server typically does not require auth.
+ // - Disable local config load / VM metadata to avoid slow fallbacks when running in containers.
+ if (!extra.containsKey("allow_anonymous")) {
+ builder.allowAnonymous(true);
+ }
+ if (!extra.containsKey("disable_config_load")) {
+ builder.disableConfigLoad(true);
+ }
+ if (!extra.containsKey("disable_vm_metadata")) {
+ builder.disableVmMetadata(true);
+ }
+ }
+ }
+ return builder.build().configMap();
+ }
+
+ private Map buildAzblobConfig(Map effectiveLocation) {
+ // In OpenDAL, azblob uses "container" instead of "bucket".
+ String container = effectiveLocation.get(OpenDALTieredStorageConfiguration.METADATA_FIELD_BUCKET);
+ String endpoint = effectiveLocation.get(OpenDALTieredStorageConfiguration.METADATA_FIELD_ENDPOINT);
+
+ ServiceConfig.Azblob.AzblobBuilder builder = ServiceConfig.Azblob.builder().container(container);
+ if (StringUtils.isNotBlank(endpoint)) {
+ builder.endpoint(endpoint);
+ }
+
+ // Keep compatibility with tiered-storage-jcloud: by default Azure credentials are sourced from env vars.
+ // Users can override by passing managedLedgerOffloadExtraConfigaccountName/accountKey/sasToken, etc.
+ Map extra = config.getExtraConfig();
+ boolean hasAccountName = extra.containsKey("account_name");
+ boolean hasAccountKey = extra.containsKey("account_key");
+ boolean hasSasToken = extra.containsKey("sas_token");
+
+ String accountName = StringUtils.trimToNull(System.getenv(ENV_AZURE_STORAGE_ACCOUNT));
+ String accountKey = StringUtils.trimToNull(System.getenv(ENV_AZURE_STORAGE_ACCESS_KEY));
+ if (!hasAccountName && accountName != null) {
+ builder.accountName(accountName);
+ }
+ // Don't apply account key when SAS auth is explicitly configured.
+ if (!hasSasToken && !hasAccountKey && accountKey != null) {
+ builder.accountKey(accountKey);
+ }
+ return builder.build().configMap();
+ }
+
+ private Map effectiveLocation(Map offloadDriverMetadata) {
+ Map effective = new HashMap<>();
+
+ effective.put(OpenDALTieredStorageConfiguration.METADATA_FIELD_BUCKET,
+ firstNonBlank(offloadDriverMetadata.get(OpenDALTieredStorageConfiguration.METADATA_FIELD_BUCKET),
+ config.getBucket()));
+ effective.put(OpenDALTieredStorageConfiguration.METADATA_FIELD_REGION,
+ firstNonBlank(offloadDriverMetadata.get(OpenDALTieredStorageConfiguration.METADATA_FIELD_REGION),
+ config.getRegion()));
+ effective.put(OpenDALTieredStorageConfiguration.METADATA_FIELD_ENDPOINT,
+ firstNonBlank(offloadDriverMetadata.get(OpenDALTieredStorageConfiguration.METADATA_FIELD_ENDPOINT),
+ config.getServiceEndpoint()));
+
+ return effective;
+ }
+
+ private static String firstNonBlank(String first, String second) {
+ if (StringUtils.isNotBlank(first)) {
+ return first;
+ }
+ return StringUtils.defaultString(second);
+ }
+}
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/provider/OpenDALTieredStorageConfiguration.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/provider/OpenDALTieredStorageConfiguration.java
new file mode 100644
index 0000000000000..d0321559a0f4b
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/provider/OpenDALTieredStorageConfiguration.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.provider;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+
+/**
+ * A configuration wrapper for OpenDAL based tiered storage offloaders.
+ *
+ * The goal is to keep the same semantics as the current jcloud implementation:
+ * resolve both "new" (managedLedgerOffload*) and legacy (s3ManagedLedgerOffload* and gcsManagedLedgerOffload*) keys.
+ */
+public class OpenDALTieredStorageConfiguration {
+
+ public static final String BLOB_STORE_PROVIDER_KEY = "managedLedgerOffloadDriver";
+ public static final String METADATA_FIELD_BUCKET = "bucket";
+ public static final String METADATA_FIELD_REGION = "region";
+ public static final String METADATA_FIELD_ENDPOINT = "serviceEndpoint";
+ public static final String METADATA_FIELD_MAX_BLOCK_SIZE = "maxBlockSizeInBytes";
+ public static final String METADATA_FIELD_MIN_BLOCK_SIZE = "minBlockSizeInBytes";
+ public static final String METADATA_FIELD_READ_BUFFER_SIZE = "readBufferSizeInBytes";
+ public static final String METADATA_FIELD_WRITE_BUFFER_SIZE = "writeBufferSizeInBytes";
+
+ public static final String OFFLOADER_PROPERTY_PREFIX = "managedLedgerOffload";
+ public static final String MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC = "maxOffloadSegmentRolloverTimeInSeconds";
+ public static final String MIN_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC = "minOffloadSegmentRolloverTimeInSeconds";
+ public static final long DEFAULT_MAX_SEGMENT_TIME_IN_SECOND = 600;
+ public static final long DEFAULT_MIN_SEGMENT_TIME_IN_SECOND = 0;
+ public static final String MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES = "maxOffloadSegmentSizeInBytes";
+ public static final long DEFAULT_MAX_SEGMENT_SIZE_IN_BYTES = 1024L * 1024 * 1024;
+ public static final String EXTRA_CONFIG_PREFIX = OffloadPoliciesImpl.EXTRA_CONFIG_PREFIX;
+
+ private static final int MB = 1024 * 1024;
+
+ private final Map configProperties;
+
+ public static OpenDALTieredStorageConfiguration create(Properties props) {
+ Map map = props.entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString()));
+ return new OpenDALTieredStorageConfiguration(map);
+ }
+
+ public static OpenDALTieredStorageConfiguration create(Map props) {
+ return new OpenDALTieredStorageConfiguration(props);
+ }
+
+ public OpenDALTieredStorageConfiguration(Map configProperties) {
+ this.configProperties = new HashMap<>(Objects.requireNonNull(configProperties, "configProperties"));
+ }
+
+ public Map getConfigProperties() {
+ return new HashMap<>(configProperties);
+ }
+
+ public String getDriver() {
+ // Keep the same behavior as tiered-storage-jcloud: default to aws-s3 if not explicitly set.
+ return configProperties.getOrDefault(BLOB_STORE_PROVIDER_KEY, "aws-s3");
+ }
+
+ public String getScheme() {
+ String driver = getDriver();
+ if ("aws-s3".equalsIgnoreCase(driver)
+ || "s3".equalsIgnoreCase(driver)
+ || "aliyun-oss".equalsIgnoreCase(driver)) {
+ return "s3";
+ }
+ if ("google-cloud-storage".equalsIgnoreCase(driver)) {
+ return "gcs";
+ }
+ if ("azureblob".equalsIgnoreCase(driver)) {
+ return "azblob";
+ }
+ if ("transient".equalsIgnoreCase(driver)) {
+ return "memory";
+ }
+ throw new IllegalArgumentException("Unsupported OpenDAL offload driver: " + driver);
+ }
+
+ public String getBucket() {
+ for (String key : getKeys(METADATA_FIELD_BUCKET)) {
+ if (configProperties.containsKey(key)) {
+ return configProperties.get(key);
+ }
+ }
+ return null;
+ }
+
+ public String getRegion() {
+ for (String key : getKeys(METADATA_FIELD_REGION)) {
+ if (configProperties.containsKey(key)) {
+ return configProperties.get(key);
+ }
+ }
+ return null;
+ }
+
+ public String getServiceEndpoint() {
+ for (String key : getKeys(METADATA_FIELD_ENDPOINT)) {
+ if (configProperties.containsKey(key)) {
+ return configProperties.get(key);
+ }
+ }
+ return null;
+ }
+
+ public long getMaxSegmentTimeInSecond() {
+ Long value = getLongFromKeys(
+ MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC,
+ getKeyName(MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC),
+ EXTRA_CONFIG_PREFIX + MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC);
+ return value != null ? value : DEFAULT_MAX_SEGMENT_TIME_IN_SECOND;
+ }
+
+ public long getMinSegmentTimeInSecond() {
+ Long value = getLongFromKeys(
+ MIN_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC,
+ getKeyName(MIN_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC),
+ EXTRA_CONFIG_PREFIX + MIN_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC);
+ return value != null ? value : DEFAULT_MIN_SEGMENT_TIME_IN_SECOND;
+ }
+
+ public long getMaxSegmentSizeInBytes() {
+ Long value = getLongFromKeys(
+ MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES,
+ getKeyName(MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES),
+ EXTRA_CONFIG_PREFIX + MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES);
+ return value != null ? value : DEFAULT_MAX_SEGMENT_SIZE_IN_BYTES;
+ }
+
+ public int getMaxBlockSizeInBytes() {
+ for (String key : getKeys(METADATA_FIELD_MAX_BLOCK_SIZE)) {
+ if (configProperties.containsKey(key)) {
+ return Integer.parseInt(configProperties.get(key));
+ }
+ }
+ return 64 * MB;
+ }
+
+ public int getMinBlockSizeInBytes() {
+ for (String key : getKeys(METADATA_FIELD_MIN_BLOCK_SIZE)) {
+ if (configProperties.containsKey(key)) {
+ return Integer.parseInt(configProperties.get(key));
+ }
+ }
+ return 5 * MB;
+ }
+
+ public int getReadBufferSizeInBytes() {
+ for (String key : getKeys(METADATA_FIELD_READ_BUFFER_SIZE)) {
+ if (configProperties.containsKey(key)) {
+ return Integer.parseInt(configProperties.get(key));
+ }
+ }
+ return MB;
+ }
+
+ public int getWriteBufferSizeInBytes() {
+ for (String key : getKeys(METADATA_FIELD_WRITE_BUFFER_SIZE)) {
+ if (configProperties.containsKey(key)) {
+ return Integer.parseInt(configProperties.get(key));
+ }
+ }
+ return 10 * MB;
+ }
+
+ public Map getOffloadDriverMetadata() {
+ Map metadata = new HashMap<>();
+ metadata.put(BLOB_STORE_PROVIDER_KEY, getDriver());
+ metadata.put(METADATA_FIELD_BUCKET, StringUtils.defaultString(getBucket()));
+ metadata.put(METADATA_FIELD_REGION, StringUtils.defaultString(getRegion()));
+ metadata.put(METADATA_FIELD_ENDPOINT, StringUtils.defaultString(getServiceEndpoint()));
+ return metadata;
+ }
+
+ public Map getExtraConfig() {
+ // Keep stable ordering for hashing/logging.
+ Map extra = new TreeMap<>();
+ configProperties.forEach((key, value) -> {
+ if (!key.startsWith(EXTRA_CONFIG_PREFIX)) {
+ return;
+ }
+ String raw = key.substring(EXTRA_CONFIG_PREFIX.length());
+ if (raw.isEmpty()) {
+ return;
+ }
+ extra.put(normalizeExtraConfigKey(raw), value);
+ });
+ return Collections.unmodifiableMap(extra);
+ }
+
+ public void validate() throws IOException {
+ String driver = getDriver();
+ String bucket = getBucket();
+ String region = getRegion();
+ String endpoint = getServiceEndpoint();
+
+ if (StringUtils.isBlank(bucket)
+ && !"filesystem".equalsIgnoreCase(driver)
+ && !"transient".equalsIgnoreCase(driver)) {
+ throw new IOException("Bucket cannot be empty for driver " + driver + " offload");
+ }
+
+ if ("aws-s3".equalsIgnoreCase(driver)) {
+ if (StringUtils.isBlank(region) && StringUtils.isBlank(endpoint)) {
+ throw new IOException("Either Region or ServiceEndpoint must be specified for aws-s3 offload");
+ }
+ } else if ("s3".equalsIgnoreCase(driver) || "aliyun-oss".equalsIgnoreCase(driver)) {
+ if (StringUtils.isBlank(endpoint)) {
+ throw new IOException("ServiceEndpoint must be specified for driver " + driver + " offload");
+ }
+ }
+
+ if (getMaxBlockSizeInBytes() < 5 * MB) {
+ throw new IOException("managedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB");
+ }
+ }
+
+ List getKeys(String property) {
+ String backwardCompatible = getBackwardCompatibleKey(property);
+ String modern = getKeyName(property);
+ if (StringUtils.isBlank(backwardCompatible)) {
+ return List.of(modern);
+ }
+ return List.of(backwardCompatible, modern);
+ }
+
+ private String getKeyName(String property) {
+ return OFFLOADER_PROPERTY_PREFIX + StringUtils.capitalize(property);
+ }
+
+ private String getBackwardCompatibleKey(String property) {
+ String driver = getDriver();
+ if ("aws-s3".equalsIgnoreCase(driver)
+ || "s3".equalsIgnoreCase(driver)
+ || "aliyun-oss".equalsIgnoreCase(driver)) {
+ return "s3ManagedLedgerOffload" + StringUtils.capitalize(property);
+ }
+ if ("google-cloud-storage".equalsIgnoreCase(driver)) {
+ return "gcsManagedLedgerOffload" + StringUtils.capitalize(property);
+ }
+ return null;
+ }
+
+ private Long getLongFromKeys(String... keys) {
+ for (String key : keys) {
+ String value = configProperties.get(key);
+ if (value == null) {
+ continue;
+ }
+ return Long.parseLong(value);
+ }
+ return null;
+ }
+
+ static String normalizeExtraConfigKey(String rawKeySuffix) {
+ // Some users prefer camelCase keys in broker.conf, while OpenDAL configs are snake_case.
+ // We accept both:
+ // - if key already contains '_' assume it's snake_case and keep as-is.
+ // - otherwise, convert camelCase/PascalCase to snake_case.
+ if (rawKeySuffix.indexOf('_') >= 0) {
+ return rawKeySuffix;
+ }
+ StringBuilder out = new StringBuilder(rawKeySuffix.length() + 8);
+ for (int i = 0; i < rawKeySuffix.length(); i++) {
+ char c = rawKeySuffix.charAt(i);
+ if (Character.isUpperCase(c)) {
+ if (i > 0) {
+ out.append('_');
+ }
+ out.append(Character.toLowerCase(c));
+ } else {
+ out.append(Character.toLowerCase(c));
+ }
+ }
+ return out.toString().toLowerCase(Locale.ROOT);
+ }
+}
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/provider/OperatorCache.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/provider/OperatorCache.java
new file mode 100644
index 0000000000000..2af06cbb78d5d
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/provider/OperatorCache.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.provider;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.opendal.Operator;
+
+@Slf4j
+public class OperatorCache implements AutoCloseable {
+
+ private final ConcurrentMap operators = new ConcurrentHashMap<>();
+
+ public Operator getOrCreate(OperatorCacheKey key, Supplier creator) {
+ Operator base = operators.computeIfAbsent(key, __ -> creator.get());
+ return base.duplicate();
+ }
+
+ @Override
+ public void close() {
+ operators.forEach((key, operator) -> {
+ try {
+ operator.close();
+ } catch (Throwable t) {
+ log.warn("Failed to close OpenDAL operator for {}", key, t);
+ }
+ });
+ operators.clear();
+ }
+}
+
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/provider/OperatorCacheKey.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/provider/OperatorCacheKey.java
new file mode 100644
index 0000000000000..7c78ef976fded
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/provider/OperatorCacheKey.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.provider;
+
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.HexFormat;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+final class OperatorCacheKey {
+ private static final HexFormat HEX = HexFormat.of();
+
+ private final String scheme;
+ private final String bucket;
+ private final String region;
+ private final String endpoint;
+ private final String configHash;
+
+ static OperatorCacheKey of(String scheme,
+ String bucket,
+ String region,
+ String endpoint,
+ Map operatorConfig) {
+ return new OperatorCacheKey(scheme, bucket, region, endpoint, sha256Hex(operatorConfig));
+ }
+
+ private OperatorCacheKey(String scheme, String bucket, String region, String endpoint, String configHash) {
+ this.scheme = scheme;
+ this.bucket = bucket;
+ this.region = region;
+ this.endpoint = endpoint;
+ this.configHash = configHash;
+ }
+
+ private static String sha256Hex(Map operatorConfig) {
+ Map sorted = new TreeMap<>(operatorConfig);
+ MessageDigest digest;
+ try {
+ digest = MessageDigest.getInstance("SHA-256");
+ } catch (NoSuchAlgorithmException e) {
+ // SHA-256 is mandatory in every Java runtime.
+ throw new RuntimeException(e);
+ }
+ sorted.forEach((key, value) -> {
+ digest.update(key.getBytes(StandardCharsets.UTF_8));
+ digest.update((byte) '=');
+ if (value != null) {
+ digest.update(value.getBytes(StandardCharsets.UTF_8));
+ }
+ digest.update((byte) '\n');
+ });
+ return HEX.formatHex(digest.digest());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof OperatorCacheKey)) {
+ return false;
+ }
+ OperatorCacheKey that = (OperatorCacheKey) o;
+ return Objects.equals(scheme, that.scheme)
+ && Objects.equals(bucket, that.bucket)
+ && Objects.equals(region, that.region)
+ && Objects.equals(endpoint, that.endpoint)
+ && Objects.equals(configHash, that.configHash);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(scheme, bucket, region, endpoint, configHash);
+ }
+
+ @Override
+ public String toString() {
+ return "OperatorCacheKey{"
+ + "scheme='" + scheme + '\''
+ + ", bucket='" + bucket + '\''
+ + ", region='" + region + '\''
+ + ", endpoint='" + endpoint + '\''
+ + ", configHash='" + configHash + '\''
+ + '}';
+ }
+}
+
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/provider/package-info.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/provider/package-info.java
new file mode 100644
index 0000000000000..e62a66d4b5065
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/provider/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.provider;
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/storage/OpenDALStorage.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/storage/OpenDALStorage.java
new file mode 100644
index 0000000000000..150a66353797e
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/storage/OpenDALStorage.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import lombok.Value;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.offload.opendal.provider.OpenDALOperatorProvider;
+import org.apache.opendal.Entry;
+import org.apache.opendal.ListOptions;
+import org.apache.opendal.Metadata;
+import org.apache.opendal.OpenDALException;
+import org.apache.opendal.Operator;
+import org.apache.opendal.WriteOptions;
+
+/**
+ * A thin wrapper around the OpenDAL Java binding.
+ *
+ * All OpenDAL calls should be kept inside this class so that the offloader logic layer can stay clean
+ * (and provider backends can be changed without touching core offload algorithms).
+ */
+@Slf4j
+public class OpenDALStorage {
+
+ @Value
+ public static class ObjectMetadata {
+ long size;
+ Instant lastModified;
+ }
+
+ @Value
+ public static class ListResult {
+ List- items;
+ String nextMarker;
+ }
+
+ @Value
+ public static class Item {
+ String path;
+ ObjectMetadata metadata;
+ }
+
+ private final OpenDALOperatorProvider operatorProvider;
+ private final Map offloadDriverMetadata;
+
+ public OpenDALStorage(OpenDALOperatorProvider operatorProvider, Map offloadDriverMetadata) {
+ this.operatorProvider = Objects.requireNonNull(operatorProvider, "operatorProvider");
+ this.offloadDriverMetadata = Objects.requireNonNull(offloadDriverMetadata, "offloadDriverMetadata");
+ }
+
+ public OutputStream openOutputStream(String key) throws IOException {
+ Operator operator = operatorProvider.getOperator(offloadDriverMetadata);
+ try {
+ OutputStream out = operator.createOutputStream(key);
+ return new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ out.close();
+ } finally {
+ operator.close();
+ }
+ }
+ };
+ } catch (Throwable t) {
+ operator.close();
+ throw toIOException("openOutputStream", key, t);
+ }
+ }
+
+ public void writeBytes(String key, byte[] data, Map userMetadata) throws IOException {
+ Map safeUserMeta = (userMetadata != null) ? userMetadata : Collections.emptyMap();
+ WriteOptions options = WriteOptions.builder()
+ .contentType("application/octet-stream")
+ .userMetadata(safeUserMeta)
+ .build();
+
+ try (Operator operator = operatorProvider.getOperator(offloadDriverMetadata)) {
+ operator.write(key, data, options);
+ } catch (Throwable t) {
+ throw toIOException("writeBytes", key, t);
+ }
+ }
+
+ public InputStream readRange(String key, long startInclusive, long endInclusive) throws IOException {
+ if (startInclusive < 0 || endInclusive < startInclusive) {
+ throw new IllegalArgumentException("Invalid range: " + startInclusive + "-" + endInclusive);
+ }
+ long length = endInclusive - startInclusive + 1;
+ if (length > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("Range too large: " + length);
+ }
+ try (Operator operator = operatorProvider.getOperator(offloadDriverMetadata)) {
+ byte[] data = operator.read(key, startInclusive, length);
+ return new ByteArrayInputStream(data);
+ } catch (Throwable t) {
+ throw toIOException("readRange", key, t);
+ }
+ }
+
+ public ObjectMetadata stat(String key) throws IOException {
+ try (Operator operator = operatorProvider.getOperator(offloadDriverMetadata)) {
+ Metadata md = operator.stat(key);
+ return new ObjectMetadata(md.getContentLength(), md.getLastModified());
+ } catch (Throwable t) {
+ throw toIOException("stat", key, t);
+ }
+ }
+
+ public void delete(String key) throws IOException {
+ try (Operator operator = operatorProvider.getOperator(offloadDriverMetadata)) {
+ operator.delete(key);
+ } catch (Throwable t) {
+ throw toIOException("delete", key, t);
+ }
+ }
+
+ public void delete(List keys) throws IOException {
+ for (String key : keys) {
+ delete(key);
+ }
+ }
+
+ public ListResult list(String prefix, String marker, long limit) throws IOException {
+ if (limit <= 0) {
+ throw new IllegalArgumentException("limit must be > 0");
+ }
+ try (Operator operator = operatorProvider.getOperator(offloadDriverMetadata)) {
+ ListOptions.ListOptionsBuilder options = ListOptions.builder().recursive(true).limit(limit);
+ if (marker != null && !marker.isEmpty()) {
+ options.startAfter(marker);
+ }
+ List entries = operator.list(prefix == null ? "" : prefix, options.build());
+ List
- items = entries.stream()
+ .filter(e -> e.getMetadata() != null && e.getMetadata().isFile())
+ .map(e -> new Item(e.getPath(), new ObjectMetadata(
+ e.getMetadata().getContentLength(), e.getMetadata().getLastModified())))
+ .collect(Collectors.toList());
+
+ String nextMarker = null;
+ if (items.size() == limit) {
+ nextMarker = Optional.ofNullable(items.get(items.size() - 1).getPath()).orElse(null);
+ }
+ return new ListResult(items, nextMarker);
+ } catch (Throwable t) {
+ throw toIOException("list", prefix, t);
+ }
+ }
+
+ private static IOException toIOException(String op, String key, Throwable t) {
+ if (t instanceof OpenDALException) {
+ OpenDALException e = (OpenDALException) t;
+ if (e.getCode() == OpenDALException.Code.NotFound) {
+ return new FileNotFoundException(op + " not found: " + key);
+ }
+ }
+ if (t instanceof IOException) {
+ return (IOException) t;
+ }
+ return new IOException("OpenDAL " + op + " failed for key " + key, t);
+ }
+}
diff --git a/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/storage/package-info.java b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/storage/package-info.java
new file mode 100644
index 0000000000000..c675781e8d562
--- /dev/null
+++ b/tiered-storage/opendal/src/main/java/org/apache/bookkeeper/mledger/offload/opendal/storage/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.storage;
diff --git a/tiered-storage/opendal/src/main/resources/META-INF/services/pulsar-offloader.yaml b/tiered-storage/opendal/src/main/resources/META-INF/services/pulsar-offloader.yaml
new file mode 100644
index 0000000000000..0315080e3062d
--- /dev/null
+++ b/tiered-storage/opendal/src/main/resources/META-INF/services/pulsar-offloader.yaml
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: opendal
+description: OpenDAL based offloader implementation
+offloaderFactoryClass: org.apache.bookkeeper.mledger.offload.opendal.OpenDALLedgerOffloaderFactory
+
diff --git a/tiered-storage/opendal/src/main/resources/findbugsExclude.xml b/tiered-storage/opendal/src/main/resources/findbugsExclude.xml
new file mode 100644
index 0000000000000..55ecc896f01a4
--- /dev/null
+++ b/tiered-storage/opendal/src/main/resources/findbugsExclude.xml
@@ -0,0 +1,23 @@
+
+
+
+
diff --git a/tiered-storage/opendal/src/test/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALBackedReadHandleImplTest.java b/tiered-storage/opendal/src/test/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALBackedReadHandleImplTest.java
new file mode 100644
index 0000000000000..9d45fffa5f177
--- /dev/null
+++ b/tiered-storage/opendal/src/test/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALBackedReadHandleImplTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.LedgerMetadataBuilder;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffloadIndexEntryImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffsetsCache;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.commons.lang3.tuple.Pair;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class OpenDALBackedReadHandleImplTest {
+
+ private final OffsetsCache offsetsCache = new OffsetsCache();
+ private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ offsetsCache.close();
+ }
+
+ private String getExpectedEntryContent(int entryId) {
+ return "Entry " + entryId;
+ }
+
+ private Pair createReadHandle(
+ long ledgerId, int entries, boolean hasDirtyData) throws Exception {
+ List> offsets = new ArrayList<>();
+ int totalLen = 0;
+ ByteBuf data = ByteBufAllocator.DEFAULT.heapBuffer(1024);
+ data.writeInt(0);
+ data.writerIndex(128);
+ for (int i = 0; i < entries; i++) {
+ if (hasDirtyData && i == 1) {
+ data.writeBytes("dirty data".getBytes(UTF_8));
+ }
+ offsets.add(Pair.of(i, data.writerIndex()));
+ offsetsCache.put(ledgerId, i, data.writerIndex());
+ byte[] entryContent = getExpectedEntryContent(i).getBytes(UTF_8);
+ totalLen += entryContent.length;
+ data.writeInt(entryContent.length);
+ data.writeLong(i);
+ data.writeBytes(entryContent);
+ }
+
+ LedgerMetadata metadata = LedgerMetadataBuilder.create()
+ .withId(ledgerId)
+ .withEnsembleSize(1)
+ .withWriteQuorumSize(1)
+ .withAckQuorumSize(1)
+ .withDigestType(DigestType.CRC32C)
+ .withPassword("pwd".getBytes(UTF_8))
+ .withClosedState()
+ .withLastEntryId(entries)
+ .withLength(totalLen)
+ .newEnsembleEntry(0L, Arrays.asList(BookieId.parse("127.0.0.1:3181")))
+ .build();
+
+ BackedInputStreamImpl inputStream = new BackedInputStreamImpl(data);
+ TestOffloadIndexBlock index = new TestOffloadIndexBlock(metadata);
+ for (Pair pair : offsets) {
+ index.put(pair.getLeft(), OffloadIndexEntryImpl.of(pair.getLeft(), 0, pair.getRight(), 0));
+ }
+ return Pair.of(new OpenDALBackedReadHandleImpl(ledgerId, index, inputStream, executor, offsetsCache), data);
+ }
+
+ private static class TestOffloadIndexBlock implements OffloadIndexBlock {
+ private final LedgerMetadata ledgerMetadata;
+ private final java.util.NavigableMap entries = new java.util.TreeMap<>();
+
+ private TestOffloadIndexBlock(LedgerMetadata ledgerMetadata) {
+ this.ledgerMetadata = ledgerMetadata;
+ }
+
+ private void put(long entryId, OffloadIndexEntry entry) {
+ entries.put(entryId, entry);
+ }
+
+ @Override
+ public OffloadIndexEntry getIndexEntryForEntry(long messageEntryId) throws IOException {
+ return entries.floorEntry(messageEntryId).getValue();
+ }
+
+ @Override
+ public int getEntryCount() {
+ return entries.size();
+ }
+
+ @Override
+ public LedgerMetadata getLedgerMetadata() {
+ return ledgerMetadata;
+ }
+
+ @Override
+ public long getDataObjectLength() {
+ return 0;
+ }
+
+ @Override
+ public long getDataBlockHeaderLength() {
+ return 0;
+ }
+
+ @Override
+ public IndexInputStream toStream() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+ }
+
+ private static class BackedInputStreamImpl extends BackedInputStream {
+
+ private final ByteBuf data;
+
+ private BackedInputStreamImpl(ByteBuf data) {
+ this.data = data;
+ }
+
+ @Override
+ public void seek(long position) {
+ data.readerIndex((int) position);
+ }
+
+ @Override
+ public void seekForward(long position) throws IOException {
+ data.readerIndex((int) position);
+ }
+
+ @Override
+ public long getCurrentPosition() {
+ return data.readerIndex();
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (data.readableBytes() == 0) {
+ throw new EOFException("The input-stream has no bytes to read");
+ }
+ return data.readByte();
+ }
+
+ @Override
+ public int available() throws IOException {
+ return data.readableBytes();
+ }
+ }
+
+ @DataProvider
+ public Object[][] streamStartAt() {
+ return new Object[][]{
+ {0, false},
+ {1, false},
+ {128, false},
+ {0, true},
+ {1, true},
+ {128, true}
+ };
+ }
+
+ @Test(dataProvider = "streamStartAt")
+ public void testRead(int streamStartAt, boolean hasDirtyData) throws Exception {
+ int entryCount = 5;
+ Pair ledgerDataPair = createReadHandle(1, entryCount, hasDirtyData);
+ OpenDALBackedReadHandleImpl ledger = ledgerDataPair.getLeft();
+ ByteBuf data = ledgerDataPair.getRight();
+ data.readerIndex(streamStartAt);
+
+ for (int i = 0; i < entryCount; i++) {
+ LedgerEntries entries = ledger.read(i, i);
+ assertEquals(new String(entries.iterator().next().getEntryBytes()), getExpectedEntryContent(i));
+ }
+
+ LedgerEntries entries1 = ledger.read(0, entryCount - 1);
+ Iterator iterator1 = entries1.iterator();
+ for (int i = 0; i < entryCount; i++) {
+ assertEquals(new String(iterator1.next().getEntryBytes()), getExpectedEntryContent(i));
+ }
+
+ LedgerEntries entries2 = ledger.read(0, entryCount - 2);
+ Iterator iterator2 = entries2.iterator();
+ for (int i = 0; i < entryCount - 1; i++) {
+ assertEquals(new String(iterator2.next().getEntryBytes()), getExpectedEntryContent(i));
+ }
+
+ LedgerEntries entries3 = ledger.read(0, entryCount - 1);
+ Iterator iterator3 = entries3.iterator();
+ for (int i = 0; i < entryCount; i++) {
+ assertEquals(new String(iterator3.next().getEntryBytes()), getExpectedEntryContent(i));
+ }
+
+ ledger.close();
+ }
+}
diff --git a/tiered-storage/opendal/src/test/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALManagedLedgerOffloaderStreamingTest.java b/tiered-storage/opendal/src/test/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALManagedLedgerOffloaderStreamingTest.java
new file mode 100644
index 0000000000000..5e6860d90eb07
--- /dev/null
+++ b/tiered-storage/opendal/src/test/java/org/apache/bookkeeper/mledger/offload/opendal/impl/OpenDALManagedLedgerOffloaderStreamingTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.impl;
+
+import static org.testng.Assert.assertEquals;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.LedgerOffloader.OffloadHandle;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStatsDisable;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffsetsCache;
+import org.apache.bookkeeper.mledger.offload.opendal.provider.OpenDALOperatorProvider;
+import org.apache.bookkeeper.mledger.offload.opendal.provider.OpenDALTieredStorageConfiguration;
+import org.apache.bookkeeper.mledger.offload.opendal.provider.OperatorCache;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadSegment;
+import org.testng.annotations.Test;
+
+public class OpenDALManagedLedgerOffloaderStreamingTest {
+
+ private static ManagedLedger createMockManagedLedger() {
+ return (ManagedLedger) Proxy.newProxyInstance(
+ ManagedLedger.class.getClassLoader(),
+ new Class>[]{ManagedLedger.class},
+ (proxy, method, args) -> {
+ if ("getLedgerInfo".equals(method.getName()) && args != null && args.length == 1) {
+ long ledgerId = (long) args[0];
+ LedgerInfo ledgerInfo = LedgerInfo.newBuilder()
+ .setLedgerId(ledgerId)
+ .setSize(100)
+ .setEntries(100)
+ .build();
+ return CompletableFuture.completedFuture(ledgerInfo);
+ }
+ if ("toString".equals(method.getName())) {
+ return "MockManagedLedger";
+ }
+ if ("getName".equals(method.getName())) {
+ return "MockManagedLedger";
+ }
+ throw new UnsupportedOperationException(method.toString());
+ });
+ }
+
+ @Test
+ public void testStreamingOffloadWriteThenRead() throws Exception {
+ Random random = new Random(0);
+ List expected = new ArrayList<>();
+
+ OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder()
+ .numThreads(2)
+ .name("opendal-offloader-test")
+ .build();
+ OffsetsCache offsetsCache = new OffsetsCache();
+ OperatorCache operatorCache = new OperatorCache();
+ try {
+ HashMap props = new HashMap<>();
+ props.put(OpenDALTieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, "transient");
+ props.put(OpenDALTieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+ props.put("managedLedgerOffloadMinBlockSizeInBytes", "1024");
+
+ OpenDALTieredStorageConfiguration config = OpenDALTieredStorageConfiguration.create(props);
+ OpenDALOperatorProvider operatorProvider = new OpenDALOperatorProvider(config, operatorCache);
+
+ OpenDALManagedLedgerOffloader offloader = OpenDALManagedLedgerOffloader.create(
+ config,
+ new HashMap<>(),
+ scheduler,
+ scheduler,
+ LedgerOffloaderStatsDisable.INSTANCE,
+ offsetsCache,
+ operatorProvider);
+
+ ManagedLedger managedLedger = createMockManagedLedger();
+ UUID uuid = UUID.randomUUID();
+ long ledgerId = 0;
+
+ OffloadHandle offloadHandle = offloader.streamingOffload(
+ managedLedger, uuid, ledgerId, 0, config.getOffloadDriverMetadata())
+ .get(5, TimeUnit.SECONDS);
+
+ for (int i = 0; i < 10; i++) {
+ byte[] data = new byte[100];
+ random.nextBytes(data);
+ expected.add(data.clone());
+ EntryImpl entry = EntryImpl.create(ledgerId, i, data);
+ try {
+ assertEquals(offloadHandle.offerEntry(entry), OffloadHandle.OfferEntryResult.SUCCESS);
+ } finally {
+ entry.release();
+ }
+ }
+
+ offloadHandle.close();
+ LedgerOffloader.OffloadResult offloadResult = offloadHandle.getOffloadResultAsync()
+ .get(30, TimeUnit.SECONDS);
+ assertEquals(offloadResult.endLedger, ledgerId);
+ assertEquals(offloadResult.endEntry, 9);
+
+ OffloadContext offloadContext = OffloadContext.newBuilder()
+ .addOffloadSegment(OffloadSegment.newBuilder()
+ .setUidMsb(uuid.getMostSignificantBits())
+ .setUidLsb(uuid.getLeastSignificantBits())
+ .setComplete(true)
+ .setEndEntryId(9)
+ .build())
+ .build();
+
+ ReadHandle readHandle = offloader.readOffloaded(ledgerId, offloadContext, config.getOffloadDriverMetadata())
+ .get(5, TimeUnit.SECONDS);
+ try {
+ LedgerEntries ledgerEntries = readHandle.readAsync(0, 9).get(10, TimeUnit.SECONDS);
+ try {
+ for (LedgerEntry ledgerEntry : ledgerEntries) {
+ assertEquals(ledgerEntry.getEntryBytes(), expected.get((int) ledgerEntry.getEntryId()));
+ }
+ } finally {
+ ledgerEntries.close();
+ }
+ } finally {
+ readHandle.close();
+ }
+ } finally {
+ operatorCache.close();
+ offsetsCache.close();
+ scheduler.shutdownNow();
+ }
+ }
+}
diff --git a/tiered-storage/opendal/src/test/java/org/apache/bookkeeper/mledger/offload/opendal/storage/OpenDALStorageTest.java b/tiered-storage/opendal/src/test/java/org/apache/bookkeeper/mledger/offload/opendal/storage/OpenDALStorageTest.java
new file mode 100644
index 0000000000000..60db1f136c5ab
--- /dev/null
+++ b/tiered-storage/opendal/src/test/java/org/apache/bookkeeper/mledger/offload/opendal/storage/OpenDALStorageTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.opendal.storage;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.bookkeeper.mledger.offload.opendal.provider.OpenDALOperatorProvider;
+import org.apache.bookkeeper.mledger.offload.opendal.provider.OpenDALTieredStorageConfiguration;
+import org.apache.bookkeeper.mledger.offload.opendal.provider.OperatorCache;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+public class OpenDALStorageTest {
+
+ private OperatorCache operatorCache;
+
+ @AfterMethod(alwaysRun = true)
+ public void teardown() {
+ if (operatorCache != null) {
+ operatorCache.close();
+ }
+ }
+
+ @Test
+ public void testWriteReadStatListDeleteOnMemoryBackend() throws Exception {
+ Map props = new HashMap<>();
+ props.put("managedLedgerOffloadDriver", "transient");
+
+ OpenDALTieredStorageConfiguration config = OpenDALTieredStorageConfiguration.create(props);
+ operatorCache = new OperatorCache();
+ OpenDALOperatorProvider provider = new OpenDALOperatorProvider(config, operatorCache);
+
+ OpenDALStorage storage = new OpenDALStorage(provider, Collections.emptyMap());
+
+ String key = "k1";
+ byte[] payload = "hello-opendal".getBytes(UTF_8);
+ storage.writeBytes(key, payload, Map.of("role", "test"));
+
+ OpenDALStorage.ObjectMetadata metadata = storage.stat(key);
+ assertEquals(metadata.getSize(), payload.length);
+
+ try (InputStream in = storage.readRange(key, 0, payload.length - 1)) {
+ assertEquals(new String(in.readAllBytes(), UTF_8), "hello-opendal");
+ }
+
+ OpenDALStorage.ListResult list = storage.list("", null, 100);
+ assertTrue(list.getItems().stream().anyMatch(i -> key.equals(i.getPath())));
+
+ storage.delete(key);
+ try {
+ storage.stat(key);
+ } catch (FileNotFoundException expected) {
+ // ok
+ }
+ }
+}
diff --git a/tiered-storage/pom.xml b/tiered-storage/pom.xml
index ce845693e0970..439f3d42a8301 100644
--- a/tiered-storage/pom.xml
+++ b/tiered-storage/pom.xml
@@ -33,11 +33,13 @@
${project.version}
+ 0.48.2
jcloud
file-system
+ opendal