Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions pkg/azurestore/azureservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package azurestore

import (
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"sort"
"strings"

Expand All @@ -40,8 +42,17 @@ const (
InfoBlobSuffix string = ".info"
MaxBlockBlobSize int64 = blockblob.MaxBlocks * blockblob.MaxStageBlockBytes
MaxBlockBlobChunkSize int64 = blockblob.MaxStageBlockBytes

// sentinelBlockIndex is a reserved block index for the marker block that is
// staged when an upload is created (see StageSentinelBlock). It sits far above
// the maximum number of blocks Azure allows per blob (blockblob.MaxBlocks), so
// it can never collide with a data block index.
sentinelBlockIndex int = math.MaxInt32
)

// sentinelBlockID is the base64 block ID of the marker block. See StageSentinelBlock.
var sentinelBlockID = blockIDIntToBase64(sentinelBlockIndex)

type azService struct {
ContainerClient *container.Client
ContainerName string
Expand All @@ -66,6 +77,9 @@ type AzBlob interface {
Delete(ctx context.Context) error
// Upload the blob
Upload(ctx context.Context, body io.ReadSeeker) error
// StageSentinelBlock stages a marker block so that a freshly created upload
// always has at least one uncommitted block (see GetOffset).
StageSentinelBlock(ctx context.Context) error
// Download returns a readcloser to download the contents of the blob
Download(ctx context.Context) (io.ReadCloser, error)
// Get the offset of the blob and its indexes
Expand Down Expand Up @@ -190,6 +204,23 @@ func (blockBlob *BlockBlob) Upload(ctx context.Context, body io.ReadSeeker) erro
return err
}

// StageSentinelBlock stages a marker block so that a freshly created upload always
// has at least one uncommitted block. This lets GetOffset reliably tell a finished
// upload (no uncommitted blocks) apart from a newly created one, even when blob
// versioning leaves committed blocks behind on the blob (see #1349).
//
// The marker is identified by a dedicated, reserved block ID, so it is never added
// to the committed block list (Commit only commits blockBlob.Indexes) and is dropped
// by Azure once the real blocks are committed. GetOffset skips it explicitly.
//
// Azure rejects staging a zero-length block (it returns InvalidHeaderValue for
// Content-Length: 0, see #1358), so the marker carries a single byte.
func (blockBlob *BlockBlob) StageSentinelBlock(ctx context.Context) error {
body := readSeekCloser{bytes.NewReader([]byte{0})}
_, err := blockBlob.BlobClient.StageBlock(ctx, sentinelBlockID, body, nil)
return err
}

// Download the blockBlob from Azure Blob Storage
func (blockBlob *BlockBlob) Download(ctx context.Context) (io.ReadCloser, error) {
resp, err := blockBlob.BlobClient.DownloadStream(ctx, nil)
Expand Down Expand Up @@ -220,6 +251,11 @@ func (blockBlob *BlockBlob) GetOffset(ctx context.Context) (int64, error) {

var indexes []int
for _, block := range resp.UncommittedBlocks {
// Skip the marker block staged in NewUpload (see StageSentinelBlock); it is
// not real data and must not contribute to the offset or the block indexes.
if block.Name != nil && *block.Name == sentinelBlockID {
continue
}
offset += *block.Size
indexes = append(indexes, blockIDBase64ToInt(block.Name))
}
Expand Down Expand Up @@ -259,6 +295,11 @@ func (infoBlob *InfoBlob) Upload(ctx context.Context, body io.ReadSeeker) error
return err
}

// infoBlob does not use a sentinel block, so this is a no-op.
func (infoBlob *InfoBlob) StageSentinelBlock(ctx context.Context) error {
return nil
}

// Download the infoBlob from Azure Blob Storage
func (infoBlob *InfoBlob) Download(ctx context.Context) (io.ReadCloser, error) {
resp, err := infoBlob.BlobClient.DownloadStream(ctx, nil)
Expand Down
4 changes: 2 additions & 2 deletions pkg/azurestore/azurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ func (store AzureStore) NewUpload(ctx context.Context, info handler.FileInfo) (h
return nil, fmt.Errorf("azurestore: unable to create InfoHandler file:\n%s", err)
}

// Stage an empty sentinel block so that "no uncommitted blocks" reliably means upload complete.
// Stage a sentinel block so that "no uncommitted blocks" reliably means upload complete.
// Without it we cannot distinguish completed uploads from a new upload that has not written any blocks yet.
// Committed blocks exist if versioning is enabled and a blob is overwritten.
if err := azUpload.BlockBlob.Upload(ctx, bytes.NewReader([]byte{})); err != nil {
if err := azUpload.BlockBlob.StageSentinelBlock(ctx); err != nil {
return nil, err
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/azurestore/azurestore_mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/azurestore/azurestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestNewUpload(t *testing.T) {
service.EXPECT().NewBlob(ctx, mockID).Return(blockBlob, nil).Times(1),
service.EXPECT().NewBlob(ctx, mockID+".info").Return(infoBlob, nil).Times(1),
infoBlob.EXPECT().Upload(ctx, r).Return(nil).Times(1),
blockBlob.EXPECT().Upload(ctx, gomock.Any()).Return(nil).Times(1),
blockBlob.EXPECT().StageSentinelBlock(ctx).Return(nil).Times(1),
)

upload, err := store.NewUpload(context.Background(), mockTusdInfo)
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestNewUploadWithPrefix(t *testing.T) {
service.EXPECT().NewBlob(ctx, objectPrefix+mockID).Return(blockBlob, nil).Times(1),
service.EXPECT().NewBlob(ctx, objectPrefix+mockID+".info").Return(infoBlob, nil).Times(1),
infoBlob.EXPECT().Upload(ctx, r).Return(nil).Times(1),
blockBlob.EXPECT().Upload(ctx, gomock.Any()).Return(nil).Times(1),
blockBlob.EXPECT().StageSentinelBlock(ctx).Return(nil).Times(1),
)

upload, err := store.NewUpload(context.Background(), mockTusdInfo)
Expand Down
Loading