Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
795 changes: 794 additions & 1 deletion src/platform/assets/services/assetService.test.ts

Large diffs are not rendered by default.

235 changes: 229 additions & 6 deletions src/platform/assets/services/assetService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { fromZodError } from 'zod-validation-error'
import { z } from 'zod'

import { st } from '@/i18n'

Expand Down Expand Up @@ -28,9 +29,14 @@ export interface PaginationOptions {
offset?: number
}

interface AssetPaginationOptions extends PaginationOptions {
signal?: AbortSignal
}

interface AssetRequestOptions extends PaginationOptions {
includeTags: string[]
includePublic?: boolean
signal?: AbortSignal
}

interface AssetExportOptions {
Expand Down Expand Up @@ -169,10 +175,61 @@ const ASSETS_DOWNLOAD_ENDPOINT = '/assets/download'
const ASSETS_EXPORT_ENDPOINT = '/assets/export'
const EXPERIMENTAL_WARNING = `EXPERIMENTAL: If you are seeing this please make sure "Comfy.Assets.UseAssetAPI" is set to "false" in your ComfyUI Settings.\n`
const DEFAULT_LIMIT = 500
const INPUT_ASSETS_WITH_PUBLIC_LIMIT = 500

export const MODELS_TAG = 'models'
/** Asset tag used by the backend for placeholder records that are not installed. */
export const MISSING_TAG = 'missing'

/** Result of a HEAD lookup against an exact asset hash. */
export type AssetHashStatus = 'exists' | 'missing' | 'invalid'

const BLAKE3_ASSET_HASH_PATTERN = /^blake3:[0-9a-f]{64}$/i
const BLAKE3_HEX_PATTERN = /^[0-9a-f]{64}$/i
const uploadedAssetResponseSchema = assetItemSchema.extend({
created_new: z.boolean()
})

/** Returns true for a prefixed BLAKE3 asset hash: `blake3:<64 hex>`. */
export function isBlake3AssetHash(value: string): boolean {
return BLAKE3_ASSET_HASH_PATTERN.test(value)
}

/** Converts a raw 64-character BLAKE3 hex digest into an asset hash. */
export function toBlake3AssetHash(hash: string | undefined): string | null {
if (!hash || !BLAKE3_HEX_PATTERN.test(hash)) return null
return `blake3:${hash}`
}

function createAbortError(): DOMException {
return new DOMException('Aborted', 'AbortError')
}

function throwIfAborted(signal?: AbortSignal): void {
if (signal?.aborted) throw createAbortError()
}

async function withCallerAbort<T>(
promise: Promise<T>,
signal?: AbortSignal
): Promise<T> {
throwIfAborted(signal)
if (!signal) return await promise

let removeAbortListener = () => {}
const abortPromise = new Promise<never>((_, reject) => {
const onAbort = () => reject(createAbortError())
signal.addEventListener('abort', onAbort, { once: true })
removeAbortListener = () => signal.removeEventListener('abort', onAbort)
})

try {
return await Promise.race([promise, abortPromise])
} finally {
removeAbortListener()
}
}

/**
* Validates asset response data using Zod schema
*/
Expand All @@ -186,11 +243,43 @@ function validateAssetResponse(data: unknown): AssetResponse {
)
}

function validateUploadedAssetResponse(
data: unknown
): AssetItem & { created_new: boolean } {
const result = uploadedAssetResponseSchema.safeParse(data)
if (result.success) {
return result.data
}

console.error('Invalid asset upload response:', fromZodError(result.error))
throw new Error(
st(
'assetBrowser.errorUploadFailed',
'Failed to upload asset. Please try again.'
)
)
}

/**
* Private service for asset-related network requests
* Not exposed globally - used internally by ComfyApi
*/
function createAssetService() {
let inputAssetsIncludingPublic: AssetItem[] | null = null
let inputAssetsIncludingPublicRequestId = 0
let pendingInputAssetsIncludingPublic: Promise<AssetItem[]> | null = null

/** Invalidates the cached public-inclusive input assets without aborting in-flight readers. */
function invalidateInputAssetsIncludingPublic(): void {
inputAssetsIncludingPublicRequestId++
pendingInputAssetsIncludingPublic = null
inputAssetsIncludingPublic = null
}

function invalidateInputAssetsCacheIfNeeded(tags?: string[]): void {
if (tags?.includes('input')) invalidateInputAssetsIncludingPublic()
}

/**
* Handles API response with consistent error handling and Zod validation
*/
Expand All @@ -202,7 +291,8 @@ function createAssetService() {
includeTags,
limit = DEFAULT_LIMIT,
offset,
includePublic
includePublic,
signal
} = options
const queryParams = new URLSearchParams({
include_tags: includeTags.join(','),
Expand All @@ -216,7 +306,9 @@ function createAssetService() {
}

const url = `${ASSETS_ENDPOINT}?${queryParams.toString()}`
const res = await api.fetchApi(url)
const res = signal
? await api.fetchApi(url, { signal })
: await api.fetchApi(url)
if (!res.ok) {
throw new Error(
`${EXPERIMENTAL_WARNING}Unable to load ${context}: Server returned ${res.status}. Please try again.`
Expand Down Expand Up @@ -402,15 +494,16 @@ function createAssetService() {
* @param options - Pagination options
* @param options.limit - Maximum number of assets to return (default: 500)
* @param options.offset - Number of assets to skip (default: 0)
* @param options.signal - Optional abort signal for cancelling the request
* @returns Promise<AssetItem[]> - Full asset objects filtered by tag, excluding missing assets
*/
async function getAssetsByTag(
tag: string,
includePublic: boolean = true,
{ limit = DEFAULT_LIMIT, offset = 0 }: PaginationOptions = {}
{ limit = DEFAULT_LIMIT, offset = 0, signal }: AssetPaginationOptions = {}
): Promise<AssetItem[]> {
const data = await handleAssetRequest(
{ includeTags: [tag], limit, offset, includePublic },
{ includeTags: [tag], limit, offset, includePublic, signal },
`assets for tag ${tag}`
)

Expand All @@ -419,6 +512,116 @@ function createAssetService() {
)
}

/**
* Gets every asset for a tag by walking paginated asset API responses.
*
* @param tag - The tag to filter by (e.g., 'models', 'input')
* @param includePublic - Whether to include public assets (default: true)
* @param options - Pagination options
* @param options.limit - Page size for each request (default: 500)
* @param options.signal - Optional abort signal for cancelling requests
* @returns Promise<AssetItem[]> - Full asset objects filtered by tag
*/
async function getAllAssetsByTag(
tag: string,
includePublic: boolean = true,
{ limit = DEFAULT_LIMIT, signal }: AssetPaginationOptions = {}
): Promise<AssetItem[]> {
const assets: AssetItem[] = []
const pageSize = limit > 0 ? limit : DEFAULT_LIMIT
let offset = 0

while (true) {
if (signal?.aborted) throw createAbortError()

const data = await handleAssetRequest(
{
includeTags: [tag],
limit: pageSize,
offset,
includePublic,
signal
},
`assets for tag ${tag}`
)
const batch = data.assets ?? []
assets.push(...batch.filter((asset) => !asset.tags.includes(MISSING_TAG)))

const noMoreFromServer = data.has_more === false
const inferredLastPage =
data.has_more === undefined && batch.length < pageSize
if (batch.length === 0 || noMoreFromServer || inferredLastPage) {
return assets
}

offset += batch.length
}
}

function startInputAssetsIncludingPublicRequest(): Promise<AssetItem[]> {
const requestId = ++inputAssetsIncludingPublicRequestId

pendingInputAssetsIncludingPublic = getAllAssetsByTag('input', true, {
limit: INPUT_ASSETS_WITH_PUBLIC_LIMIT
})
.then((assets) => {
if (requestId === inputAssetsIncludingPublicRequestId) {
inputAssetsIncludingPublic = assets
}
return assets
})
.finally(() => {
if (requestId === inputAssetsIncludingPublicRequestId) {
pendingInputAssetsIncludingPublic = null
}
})

void pendingInputAssetsIncludingPublic.catch(() => {})
return pendingInputAssetsIncludingPublic
}

/**
* Gets cached input assets including public assets for missing media checks.
* Caller aborts cancel only that caller; shared fetches are invalidated
* through invalidateInputAssetsIncludingPublic().
*/
async function getInputAssetsIncludingPublic(
signal?: AbortSignal
): Promise<AssetItem[]> {
throwIfAborted(signal)
if (inputAssetsIncludingPublic) return inputAssetsIncludingPublic

const request =
pendingInputAssetsIncludingPublic ??
startInputAssetsIncludingPublicRequest()
return await withCallerAbort(request, signal)
}

/**
* Checks whether an asset exists for an exact asset hash.
*
* Uses the HEAD /assets/hash/{hash} endpoint and maps status-only responses:
* 200 -> exists, 404 -> missing, and 400 -> invalid hash format.
*/
async function checkAssetHash(
assetHash: string,
signal?: AbortSignal
): Promise<AssetHashStatus> {
const response = await api.fetchApi(
`${ASSETS_ENDPOINT}/hash/${encodeURIComponent(assetHash)}`,
{
method: 'HEAD',
signal
}
)

if (response.status === 200) return 'exists'
if (response.status === 404) return 'missing'
if (response.status === 400) return 'invalid'

throw new Error(`Unexpected asset hash check status: ${response.status}`)
}

/**
* Deletes an asset by ID
* Only available in cloud environment
Expand All @@ -437,6 +640,8 @@ function createAssetService() {
`Unable to delete asset ${id}: Server returned ${res.status}`
)
}

invalidateInputAssetsIncludingPublic()
}

/**
Expand Down Expand Up @@ -544,7 +749,9 @@ function createAssetService() {
)
}

return await res.json()
const asset = validateUploadedAssetResponse(await res.json())
invalidateInputAssetsCacheIfNeeded(params.tags)
return asset
}

/**
Expand Down Expand Up @@ -597,7 +804,9 @@ function createAssetService() {
)
}

return await res.json()
const asset = validateUploadedAssetResponse(await res.json())
invalidateInputAssetsCacheIfNeeded(params.tags)
return asset
}

/**
Expand Down Expand Up @@ -627,6 +836,7 @@ function createAssetService() {
if (!parseResult.success) {
throw fromZodError(parseResult.error)
}
invalidateInputAssetsIncludingPublic()
return parseResult.data
}

Expand Down Expand Up @@ -657,6 +867,7 @@ function createAssetService() {
if (!parseResult.success) {
throw fromZodError(parseResult.error)
}
invalidateInputAssetsIncludingPublic()
return parseResult.data
}

Expand Down Expand Up @@ -708,6 +919,13 @@ function createAssetService() {
)
)
}
if (
params.tags?.includes('input') &&
result.data.type === 'async' &&
result.data.task.status === 'completed'
) {
invalidateInputAssetsIncludingPublic()
}
return result.data
}

Expand All @@ -723,6 +941,7 @@ function createAssetService() {
)
)
}
invalidateInputAssetsCacheIfNeeded(params.tags)
return result.data
}

Expand Down Expand Up @@ -763,6 +982,10 @@ function createAssetService() {
getAssetsForNodeType,
getAssetDetails,
getAssetsByTag,
getAllAssetsByTag,
getInputAssetsIncludingPublic,
invalidateInputAssetsIncludingPublic,
checkAssetHash,
deleteAsset,
updateAsset,
addAssetTags,
Expand Down
Loading
Loading