diff --git a/pkg/azurestore/azureservice.go b/pkg/azurestore/azureservice.go index 0fa4f197a..bc5ee29a6 100644 --- a/pkg/azurestore/azureservice.go +++ b/pkg/azurestore/azureservice.go @@ -15,12 +15,14 @@ package azurestore import ( + "bytes" "context" "encoding/base64" "encoding/binary" "errors" "fmt" "io" + "math" "sort" "strings" @@ -40,8 +42,17 @@ const ( InfoBlobSuffix string = ".info" MaxBlockBlobSize int64 = blockblob.MaxBlocks * blockblob.MaxStageBlockBytes MaxBlockBlobChunkSize int64 = blockblob.MaxStageBlockBytes + + // sentinelBlockIndex is a reserved block index for the marker block that is + // staged when an upload is created (see StageSentinelBlock). It sits far above + // the maximum number of blocks Azure allows per blob (blockblob.MaxBlocks), so + // it can never collide with a data block index. + sentinelBlockIndex int = math.MaxInt32 ) +// sentinelBlockID is the base64 block ID of the marker block. See StageSentinelBlock. +var sentinelBlockID = blockIDIntToBase64(sentinelBlockIndex) + type azService struct { ContainerClient *container.Client ContainerName string @@ -66,6 +77,9 @@ type AzBlob interface { Delete(ctx context.Context) error // Upload the blob Upload(ctx context.Context, body io.ReadSeeker) error + // StageSentinelBlock stages a marker block so that a freshly created upload + // always has at least one uncommitted block (see GetOffset). + StageSentinelBlock(ctx context.Context) error // Download returns a readcloser to download the contents of the blob Download(ctx context.Context) (io.ReadCloser, error) // Get the offset of the blob and its indexes @@ -190,6 +204,23 @@ func (blockBlob *BlockBlob) Upload(ctx context.Context, body io.ReadSeeker) erro return err } +// StageSentinelBlock stages a marker block so that a freshly created upload always +// has at least one uncommitted block. This lets GetOffset reliably tell a finished +// upload (no uncommitted blocks) apart from a newly created one, even when blob +// versioning leaves committed blocks behind on the blob (see #1349). +// +// The marker is identified by a dedicated, reserved block ID, so it is never added +// to the committed block list (Commit only commits blockBlob.Indexes) and is dropped +// by Azure once the real blocks are committed. GetOffset skips it explicitly. +// +// Azure rejects staging a zero-length block (it returns InvalidHeaderValue for +// Content-Length: 0, see #1358), so the marker carries a single byte. +func (blockBlob *BlockBlob) StageSentinelBlock(ctx context.Context) error { + body := readSeekCloser{bytes.NewReader([]byte{0})} + _, err := blockBlob.BlobClient.StageBlock(ctx, sentinelBlockID, body, nil) + return err +} + // Download the blockBlob from Azure Blob Storage func (blockBlob *BlockBlob) Download(ctx context.Context) (io.ReadCloser, error) { resp, err := blockBlob.BlobClient.DownloadStream(ctx, nil) @@ -220,6 +251,11 @@ func (blockBlob *BlockBlob) GetOffset(ctx context.Context) (int64, error) { var indexes []int for _, block := range resp.UncommittedBlocks { + // Skip the marker block staged in NewUpload (see StageSentinelBlock); it is + // not real data and must not contribute to the offset or the block indexes. + if block.Name != nil && *block.Name == sentinelBlockID { + continue + } offset += *block.Size indexes = append(indexes, blockIDBase64ToInt(block.Name)) } @@ -259,6 +295,11 @@ func (infoBlob *InfoBlob) Upload(ctx context.Context, body io.ReadSeeker) error return err } +// infoBlob does not use a sentinel block, so this is a no-op. +func (infoBlob *InfoBlob) StageSentinelBlock(ctx context.Context) error { + return nil +} + // Download the infoBlob from Azure Blob Storage func (infoBlob *InfoBlob) Download(ctx context.Context) (io.ReadCloser, error) { resp, err := infoBlob.BlobClient.DownloadStream(ctx, nil) diff --git a/pkg/azurestore/azurestore.go b/pkg/azurestore/azurestore.go index 5b50583db..afbbd5939 100644 --- a/pkg/azurestore/azurestore.go +++ b/pkg/azurestore/azurestore.go @@ -89,10 +89,10 @@ func (store AzureStore) NewUpload(ctx context.Context, info handler.FileInfo) (h return nil, fmt.Errorf("azurestore: unable to create InfoHandler file:\n%s", err) } - // Stage an empty sentinel block so that "no uncommitted blocks" reliably means upload complete. + // Stage a sentinel block so that "no uncommitted blocks" reliably means upload complete. // Without it we cannot distinguish completed uploads from a new upload that has not written any blocks yet. // Committed blocks exist if versioning is enabled and a blob is overwritten. - if err := azUpload.BlockBlob.Upload(ctx, bytes.NewReader([]byte{})); err != nil { + if err := azUpload.BlockBlob.StageSentinelBlock(ctx); err != nil { return nil, err } diff --git a/pkg/azurestore/azurestore_mock_test.go b/pkg/azurestore/azurestore_mock_test.go index 48000a9ca..824cb1559 100644 --- a/pkg/azurestore/azurestore_mock_test.go +++ b/pkg/azurestore/azurestore_mock_test.go @@ -132,6 +132,20 @@ func (mr *MockAzBlobMockRecorder) GetOffset(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOffset", reflect.TypeOf((*MockAzBlob)(nil).GetOffset), arg0) } +// StageSentinelBlock mocks base method. +func (m *MockAzBlob) StageSentinelBlock(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StageSentinelBlock", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// StageSentinelBlock indicates an expected call of StageSentinelBlock. +func (mr *MockAzBlobMockRecorder) StageSentinelBlock(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StageSentinelBlock", reflect.TypeOf((*MockAzBlob)(nil).StageSentinelBlock), arg0) +} + // Upload mocks base method. func (m *MockAzBlob) Upload(arg0 context.Context, arg1 io.ReadSeeker) error { m.ctrl.T.Helper() diff --git a/pkg/azurestore/azurestore_test.go b/pkg/azurestore/azurestore_test.go index 11031b135..0e125fadd 100644 --- a/pkg/azurestore/azurestore_test.go +++ b/pkg/azurestore/azurestore_test.go @@ -63,7 +63,7 @@ func TestNewUpload(t *testing.T) { service.EXPECT().NewBlob(ctx, mockID).Return(blockBlob, nil).Times(1), service.EXPECT().NewBlob(ctx, mockID+".info").Return(infoBlob, nil).Times(1), infoBlob.EXPECT().Upload(ctx, r).Return(nil).Times(1), - blockBlob.EXPECT().Upload(ctx, gomock.Any()).Return(nil).Times(1), + blockBlob.EXPECT().StageSentinelBlock(ctx).Return(nil).Times(1), ) upload, err := store.NewUpload(context.Background(), mockTusdInfo) @@ -104,7 +104,7 @@ func TestNewUploadWithPrefix(t *testing.T) { service.EXPECT().NewBlob(ctx, objectPrefix+mockID).Return(blockBlob, nil).Times(1), service.EXPECT().NewBlob(ctx, objectPrefix+mockID+".info").Return(infoBlob, nil).Times(1), infoBlob.EXPECT().Upload(ctx, r).Return(nil).Times(1), - blockBlob.EXPECT().Upload(ctx, gomock.Any()).Return(nil).Times(1), + blockBlob.EXPECT().StageSentinelBlock(ctx).Return(nil).Times(1), ) upload, err := store.NewUpload(context.Background(), mockTusdInfo)