Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions internal/commands/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type jobsRunner struct {
createUploadURL *spec.Operation
getDirectory *spec.Operation
getIntegration *spec.Operation
getMe *spec.Operation
getOrganization *spec.Operation
getRun *spec.Operation
getRunLogs *spec.Operation
Expand Down Expand Up @@ -96,6 +97,7 @@ func newJobsRunner(cfg config.Config, runtimeSpec *spec.RuntimeSpec, client *htt
createUploadURL: findOperation(runtimeSpec, "POST", "/files/upload-url"),
getDirectory: findOperation(runtimeSpec, "GET", "/files/dir"),
getIntegration: findOperation(runtimeSpec, "GET", "/files/integration-dir"),
getMe: findOperation(runtimeSpec, "GET", "/users/me"),
getOrganization: findOperation(runtimeSpec, "GET", "/organization"),
getRun: findOperation(runtimeSpec, "GET", "/runs/{run_id}"),
getRunLogs: findOperation(runtimeSpec, "GET", "/runs/{run_id}/logs"),
Expand Down Expand Up @@ -392,7 +394,8 @@ func (r *jobsRunner) prepareScript(ctx context.Context, script, name string, noU
}

key := fmt.Sprintf("%s/%s/%s", strings.Trim(prefix, "/"), name, filepath.Base(script))
respBody, err := r.execWithRetry(ctx, r.createUploadURL, nil, []executor.QueryPair{{Key: "key", Value: key}}, "")
uploadKey := fmt.Sprintf("%s/%s", bucket, key)
respBody, err := r.execWithRetry(ctx, r.createUploadURL, nil, []executor.QueryPair{{Key: "key", Value: uploadKey}}, "")
if err != nil {
return "", err
}
Expand Down Expand Up @@ -485,16 +488,38 @@ func (r *jobsRunner) resolveManagedUploadTarget(ctx context.Context, uploadPathO
}
}

rootDir := fmt.Sprintf("s3://%s/", bucket)
path := ""
dirBody, err := r.execWithRetry(ctx, r.getDirectory, nil, []executor.QueryPair{{Key: "dir", Value: rootDir}}, "")
orgID := strings.TrimSpace(gjson.GetBytes(orgBody, "id").String())

// Try to find the user's personal writable directory using the current user's ID.
if orgID != "" && r.getMe != nil {
if meBody, meErr := r.execWithRetry(ctx, r.getMe, nil, nil, ""); meErr == nil {
userID := strings.TrimSpace(gjson.GetBytes(meBody, "id").String())
if userID != "" {
userDir := fmt.Sprintf("%s/%s/data/customer-%s", bucket, orgID, userID)
if dirBody, dirErr := r.execWithRetry(ctx, r.getDirectory, nil, []executor.QueryPair{{Key: "dir", Value: userDir}}, ""); dirErr == nil {
path := strings.TrimSpace(gjson.GetBytes(dirBody, "path").String())
if !strings.HasPrefix(strings.ToLower(path), "s3://") {
path = "s3://" + path
}
if parsedBucket, prefix, ok := splitS3Path(path); ok && parsedBucket != "" {
return parsedBucket, prefix, nil
}
}
}
}
}

dirBody, err := r.execWithRetry(ctx, r.getDirectory, nil, []executor.QueryPair{{Key: "dir", Value: bucket}}, "")
if err != nil {
return "", "", fmt.Errorf("unable to resolve managed storage directory via API: %w", err)
}
path = strings.TrimSpace(gjson.GetBytes(dirBody, "path").String())
path := strings.TrimSpace(gjson.GetBytes(dirBody, "path").String())
if path == "" {
return "", "", fmt.Errorf("managed storage directory response missing path")
}
if !strings.HasPrefix(strings.ToLower(path), "s3://") {
path = "s3://" + path
}

bucket, prefix, ok := splitS3Path(path)
if !ok {
Expand Down
10 changes: 5 additions & 5 deletions internal/commands/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,9 @@ func TestJobsRunAutoUploadHandlesBucketNameAsS3URI(t *testing.T) {
if !sawUpload || !sawCreateRun {
t.Fatalf("expected upload and create-run calls; upload=%v create=%v", sawUpload, sawCreateRun)
}
// The dir query param passed to /files/dir should be s3://managed-bucket/ (not s3://s3://managed-bucket/)
if dirQueryParam != "s3://managed-bucket/" {
t.Fatalf("expected dir param s3://managed-bucket/, got %q", dirQueryParam)
// The dir query param passed to /files/dir should be the bare bucket name (not s3://managed-bucket/)
if dirQueryParam != "managed-bucket" {
t.Fatalf("expected dir param managed-bucket, got %q", dirQueryParam)
}
}

Expand Down Expand Up @@ -341,7 +341,7 @@ func TestJobsRunUsesUploadPathFlagOverride(t *testing.T) {
sawDirLookup = true
_, _ = io.WriteString(w, `{"name":"root","path":"s3://managed-bucket/customer/root"}`)
case r.Method == http.MethodPost && r.URL.Path == "/files/upload-url":
if got := r.URL.Query().Get("key"); !strings.HasPrefix(got, "flag-prefix/") {
if got := r.URL.Query().Get("key"); !strings.HasPrefix(got, "flag-bucket/flag-prefix/") {
t.Fatalf("expected key from upload-path flag, got %q", got)
}
_, _ = io.WriteString(w, fmt.Sprintf(`{"uploadUrl":%q}`, serverURLWithPath(serverURLFromRequest(r), "/upload")))
Expand Down Expand Up @@ -392,7 +392,7 @@ func TestJobsRunUsesUploadPathEnvOverride(t *testing.T) {
sawDirLookup = true
_, _ = io.WriteString(w, `{"name":"root","path":"s3://managed-bucket/customer/root"}`)
case r.Method == http.MethodPost && r.URL.Path == "/files/upload-url":
if got := r.URL.Query().Get("key"); !strings.HasPrefix(got, "env-prefix/") {
if got := r.URL.Query().Get("key"); !strings.HasPrefix(got, "env-bucket/env-prefix/") {
t.Fatalf("expected key from upload-path env, got %q", got)
}
_, _ = io.WriteString(w, fmt.Sprintf(`{"uploadUrl":%q}`, serverURLWithPath(serverURLFromRequest(r), "/upload")))
Expand Down
2 changes: 0 additions & 2 deletions internal/executor/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ func UploadFileToPresignedURL(ctx context.Context, client *http.Client, uploadUR
if err != nil {
return fmt.Errorf("build upload request: %w", err)
}
req.Header.Set("Content-Type", "application/octet-stream")

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("upload request failed: %w", err)
Expand Down