Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions server/pkg/controller/filedata/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type _fetchConfig struct {
var _defaultFetchConfig = _fetchConfig{RetryCount: 3, InitialTimeout: 10 * gTime.Second, MaxTimeout: 30 * gTime.Second}
var globalFileFetchSemaphore = make(chan struct{}, 400)

const bulkFileDataFetchTimeout = 45 * gTime.Second

type bulkS3MetaFetchResult struct {
s3MetaObject fileData.S3FileMetadata
dbEntry fileData.Row
Expand Down Expand Up @@ -129,7 +131,7 @@ func (c *Controller) InsertOrUpdateMetadata(ctx *gin.Context, req *fileData.PutF
dbInsertErr := c.Repo.InsertOrUpdate(context.Background(), row)
if dbInsertErr != nil {
logger.WithError(dbInsertErr).Error("insert or update failed")
return uploadErr
return dbInsertErr
}
//}()
return nil
Expand Down Expand Up @@ -180,7 +182,11 @@ func (c *Controller) GetFilesData(ctx *gin.Context, req fileData.GetFilesData) (
return nil, stacktrace.Propagate(err, "")
}

doRows, err := c.Repo.GetFilesData(ctx, req.Type, req.FileIDs)
reqCtx, cancel := context.WithTimeout(ctx.Request.Context(), bulkFileDataFetchTimeout)
defer cancel()
reqID := requestid.Get(ctx)

doRows, err := c.Repo.GetFilesData(reqCtx, req.Type, req.FileIDs)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
Expand All @@ -196,7 +202,7 @@ func (c *Controller) GetFilesData(ctx *gin.Context, req fileData.GetFilesData) (
}
pendingIndexFileIds := array.FindMissingElementsInSecondList(req.FileIDs, dbFileIds)
// Fetch missing doRows in parallel
s3MetaFetchResults, err := c.getS3FileMetadataParallel(ctx, activeRows)
s3MetaFetchResults, err := c.getS3FileMetadataParallel(reqCtx, reqID, activeRows)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
Expand All @@ -217,32 +223,49 @@ func (c *Controller) GetFilesData(ctx *gin.Context, req fileData.GetFilesData) (
}
}

if len(errFileIds) > 10 {
return nil, stacktrace.Propagate(
Comment on lines +226 to +227
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Don't abort GetFilesData when >10 item fetches fail

This new guard turns a partially successful bulk fetch into a hard 500 once more than 10 objects fail, even though the API already models per-file failures via ErrFileIDs. In practice, transient S3 issues or timeout cancellations can easily push failures above 10 for large requests, causing clients to lose all successful results and retry the whole batch repeatedly instead of making progress on good items.

Useful? React with 👍 / 👎.

ente.NewInternalError(
fmt.Sprintf("bulk file data fetch failed for %d fileIDs", len(errFileIds)),
),
"",
)
}

return &fileData.GetFilesDataResponse{
Data: fetchedEmbeddings,
PendingIndexFileIDs: pendingIndexFileIds,
ErrFileIDs: errFileIds,
}, nil
}

func (c *Controller) getS3FileMetadataParallel(ctx *gin.Context, dbRows []fileData.Row) ([]bulkS3MetaFetchResult, error) {
func (c *Controller) getS3FileMetadataParallel(ctx context.Context, reqID string, dbRows []fileData.Row) ([]bulkS3MetaFetchResult, error) {
var wg sync.WaitGroup
embeddingObjects := make([]bulkS3MetaFetchResult, len(dbRows))
for i := range dbRows {
dbRow := dbRows[i]
select {
case globalFileFetchSemaphore <- struct{}{}:
case <-ctx.Done():
embeddingObjects[i] = bulkS3MetaFetchResult{
err: ctx.Err(),
dbEntry: dbRow,
}
continue
}
wg.Add(1)
globalFileFetchSemaphore <- struct{}{} // Acquire from global semaphore
go func(i int, row fileData.Row) {
defer wg.Done()
defer func() { <-globalFileFetchSemaphore }() // Release back to global semaphore

ctxLogger := log.WithFields(log.Fields{
"objectKey": row.S3FileMetadataObjectKey(),
"req_id": requestid.Get(ctx),
"req_id": reqID,
"latest_bucket": row.LatestBucket,
"file_id": row.FileID,
})

s3FileMetadata, err := c.fetchS3FileMetadata(context.Background(), row, ctxLogger)
s3FileMetadata, err := c.fetchS3FileMetadata(ctx, row, ctxLogger)
if err != nil {
ctxLogger.
Error("error fetching object: "+row.S3FileMetadataObjectKey(), err)
Expand Down
5 changes: 3 additions & 2 deletions server/pkg/controller/filedata/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package filedata

import (
"context"
"strings"

"github.com/ente-io/museum/ente"
"github.com/ente-io/museum/ente/filedata"
"github.com/ente-io/museum/pkg/utils/auth"
"github.com/ente-io/museum/pkg/utils/network"
"github.com/ente-io/stacktrace"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
"strings"
)

func (c *Controller) InsertVideoPreview(ctx *gin.Context, req *filedata.VidPreviewRequest) error {
Expand Down Expand Up @@ -45,7 +46,7 @@ func (c *Controller) InsertVideoPreview(ctx *gin.Context, req *filedata.VidPrevi
size, uploadErr := c.uploadObject(obj, objectKey, bucketID)
if uploadErr != nil {
logger.WithError(uploadErr).Error("upload failed")
return nil
return uploadErr
}
row := filedata.Row{
FileID: req.FileID,
Expand Down
Loading