Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0e52815
feat: implement scheduling functionality for AgentflowV2
prd-hoang-doan Mar 14, 2026
529b35d
feat: implement schedule management for agentflows
prd-hoang-doan Mar 21, 2026
d99a990
feat(schedule): refactor schedule services and tests for schedule job
prd-hoang-doan Mar 21, 2026
d7ddbc2
feat(schedule): refactor schedule services and tests for schedule job
prd-hoang-doan Mar 22, 2026
ce573c6
feat(schedule): remove timestamp in schedule tabes to support sqlite
prd-hoang-doan Mar 24, 2026
35a5802
fix(schedule): add nullable to optional entity columns and entity int…
jchui-wd Mar 25, 2026
5118ce3
resolved conflicts and merged with main
jchui-wd Apr 6, 2026
6e5c8eb
fixed tests and hide schedule input from package/agentflow
jchui-wd Apr 6, 2026
9c2524c
feat: integrate IdentityManager into worker and queue management
prd-hoang-doan Apr 7, 2026
974f80f
feat: add support for MIN_SCHEDULE_INTERVAL_SECONDS in cron job confi…
prd-hoang-doan Apr 10, 2026
3969d05
Merge remote-tracking branch 'origin/main' into pr-5971
jchui-wd Apr 13, 2026
f4928e6
merged and resolved conflicts
jchui-wd Apr 14, 2026
bc14c64
resolved conflicts and merged with main
jchui-wd Apr 17, 2026
7815ee0
Merge remote-tracking branch 'origin' into pr-5971
jchui-wd Apr 20, 2026
3110203
fixed broken tests
jchui-wd Apr 20, 2026
1b3e4e5
Merge branch 'main' into feat/agentflow-cron-job
HenryHengZJ Apr 22, 2026
5b63167
UI UX fixes for scheduler in dark mode
HenryHengZJ Apr 23, 2026
5abe295
fix(schedule): remove unnecessary UUID generation in createOrUpdateSc…
prd-hoang-doan Apr 23, 2026
d150d2c
Merge branch 'main' into feat/agentflow-cron-job
HenryHengZJ Apr 23, 2026
7e7ac96
adds required scheduleInputMode column with text/form/none variants, …
HenryHengZJ Apr 24, 2026
1661d3b
add deleteScheduleTriggerLogs endpoint and UI integration for log del…
HenryHengZJ Apr 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 151 additions & 1 deletion packages/components/nodes/agentflow/Start/Start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Start_Agentflow implements INode {
constructor() {
this.label = 'Start'
this.name = 'startAgentflow'
this.version = 1.1
this.version = 1.2
this.type = 'Start'
this.category = 'Agent Flows'
this.description = 'Starting point of the agentflow'
Expand All @@ -40,6 +40,11 @@ class Start_Agentflow implements INode {
label: 'Form Input',
name: 'formInput',
description: 'Start the workflow with form inputs'
},
{
label: 'Schedule Input',

This comment was marked as resolved.

Copy link
Copy Markdown
Contributor

@jchui-wd jchui-wd Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed with Jocelyn that editing Start.ts affects both the legacy canvas and the agentflow UI. We'd probably delay this merge to avoid unintended impacts on downstream or merge it in a separate feature branch.

name: 'scheduleInput',
description: 'Start the workflow on a recurring schedule (cron)'
}
],
default: 'chatInput'
Expand Down Expand Up @@ -125,6 +130,144 @@ class Start_Agentflow implements INode {
}
]
},
{
label: 'Schedule Type',
name: 'scheduleType',
type: 'options',
options: [
{
label: 'Visual Picker',
name: 'visualPicker',
description: 'Use a visual picker to select schedule options'
},
{
label: 'Cron Expression',
name: 'cronExpression',
description: 'Use a cron expression to define the schedule'
}
],
show: {
startInputType: 'scheduleInput'
}
},
{
label: 'Cron Expression',
name: 'scheduleCronExpression',
type: 'string',
placeholder: '0 9 * * 1-5',
description:
'Standard 5-field cron expression (minute hour day month weekday). Example: "0 9 * * 1-5" runs at 09:00 every weekday.',
show: {
startInputType: 'scheduleInput',
scheduleType: 'cronExpression'
}
},
{
label: 'Frequency',
name: 'scheduleFrequency',
type: 'options',
options: [
{
label: 'Hourly',
name: 'hourly',
description: 'Run every hour at the specified time'
},
{
label: 'Daily',
name: 'daily',
description: 'Run every day at the specified time'
},
{
label: 'Weekly',
name: 'weekly',
description: 'Run every week on the specified day and time'
},
{
label: 'Monthly',
name: 'monthly',
description: 'Run every month on the specified date and time'
}
],
show: {
startInputType: 'scheduleInput',
scheduleType: 'visualPicker'
}
},
{
label: 'On Minute',
name: 'scheduleOnMinute',
type: 'number',
placeholder: '30',
description:
'Minute of the hour when the schedule should run (0-59). For example, "30" means the schedule will run at the 30th minute of the hour.',
show: {
startInputType: 'scheduleInput',
scheduleType: 'visualPicker',
scheduleFrequency: 'hourly'
}
},
{
label: 'On Time',
name: 'scheduleOnTime',
type: 'timePicker',
show: {
startInputType: 'scheduleInput',
scheduleType: 'visualPicker',
scheduleFrequency: ['daily', 'weekly', 'monthly']
}
},
{
label: 'On Day of Week',
name: 'scheduleOnDayOfWeek',
type: 'weekDaysPicker',
show: {
startInputType: 'scheduleInput',
scheduleType: 'visualPicker',
scheduleFrequency: 'weekly'
}
},
{
label: 'On Day of Month',
name: 'scheduleOnDayOfMonth',
type: 'monthDaysPicker',
show: {
startInputType: 'scheduleInput',
scheduleType: 'visualPicker',
scheduleFrequency: 'monthly'
}
},
{
label: 'End Date',
name: 'scheduleEndDate',
type: 'datePicker',
description: 'Optional date after which the schedule will stop firing.',
optional: true,
show: {
startInputType: 'scheduleInput'
}
},
{
label: 'Timezone',
name: 'scheduleTimezone',
type: 'string',
placeholder: 'UTC',
description: 'IANA timezone name, e.g. America/New_York. Defaults to UTC.',
optional: true,
show: {
startInputType: 'scheduleInput'
}
},
{
label: 'Default Input',
name: 'scheduleDefaultInput',
type: 'string',
placeholder: 'Run the daily report',
description: 'Default question/input passed to the flow when it is triggered by the scheduler.',
rows: 4,
show: {
startInputType: 'scheduleInput'
}
},
{
label: 'Ephemeral Memory',
name: 'startEphemeralMemory',
Expand Down Expand Up @@ -213,6 +356,13 @@ class Start_Agentflow implements INode {
outputData.form = form
}

if (startInputType === 'scheduleInput') {
const defaultInput = nodeData.inputs?.scheduleDefaultInput as string
const effectiveInput = (typeof input === 'string' && input) || defaultInput || ''
inputData.question = effectiveInput
outputData.question = effectiveInput
}

if (startEphemeralMemory) {
outputData.ephemeralMemory = true
}
Expand Down
22 changes: 22 additions & 0 deletions packages/server/__mocks__/typeorm.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Manual mock for 'typeorm'.
* All decorator factories are replaced with no-ops so TypeORM entity classes
* can be defined in tests without a real database connection.
* Used by all server-package test files via jest.mock('typeorm').
*/

const decorator = (): (() => void) => () => {}

module.exports = {
PrimaryGeneratedColumn: decorator,
PrimaryColumn: decorator,
CreateDateColumn: decorator,
UpdateDateColumn: decorator,
Index: decorator,
ManyToOne: decorator,
OneToMany: decorator,
OneToOne: decorator,
JoinColumn: decorator,
Unique: decorator,
DataSource: jest.fn()
}
13 changes: 13 additions & 0 deletions packages/server/jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ module.exports = {
// File extensions to recognize in module resolution
moduleFileExtensions: ['ts', 'tsx', 'js', 'jsx', 'json', 'node'],

// uuid v10+ ships ESM-only; redirect to the CJS dist so Jest can require it.
// typeorm is not resolvable via pnpm symlinks in the test runner; redirect to
// the shared manual mock so all test files get the same decorator stubs without
// needing an inline jest.mock() factory.
moduleNameMapper: {
'^uuid$': '<rootDir>/node_modules/uuid/dist/index.js',
'^typeorm$': '<rootDir>/__mocks__/typeorm.ts'
},

// Include the package's own node_modules so that Jest can resolve
// symlinked pnpm dependencies when tests live inside src/
modulePaths: ['<rootDir>/node_modules'],

// Display individual test results with the test suite hierarchy.
verbose: true
}
1 change: 1 addition & 0 deletions packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
"multer-s3": "^3.0.1",
"mysql2": "^3.11.3",
"nanoid": "3",
"node-cron": "^4.2.1",
"nodemailer": "^7.0.7",
"openai": "6.19.0",
"passport": "^0.7.0",
Expand Down
23 changes: 23 additions & 0 deletions packages/server/src/Interface.Schedule.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { DataSource } from 'typeorm'
import { IComponentNodes } from './Interface'
import { Telemetry } from './utils/telemetry'
import { CachePool } from './CachePool'
import { UsageCacheManager } from './UsageCacheManager'

export interface IScheduleQueueAppServer {
appDataSource: DataSource
componentNodes: IComponentNodes
telemetry: Telemetry
cachePool: CachePool
usageCacheManager: UsageCacheManager
}

export interface IScheduleAgentflowJobData extends IScheduleQueueAppServer {
scheduleRecordId: string
targetId: string
cronExpression: string
timezone: string
defaultInput?: string
workspaceId: string
scheduledAt: string // ISO string
}
11 changes: 11 additions & 0 deletions packages/server/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ interface CustomListener extends QueueEventsListener {
export default class Worker extends BaseCommand {
predictionWorkerId: string
upsertionWorkerId: string
scheduleWorkerId: string

async run(): Promise<void> {
logger.info('Starting Flowise Worker...')
Expand Down Expand Up @@ -51,6 +52,12 @@ export default class Worker extends BaseCommand {
this.upsertionWorkerId = upsertionWorker.id
logger.info(`Upsertion Worker ${this.upsertionWorkerId} created`)

/** Schedule */
const scheduleQueue = queueManager.getQueue('schedule')
const scheduleWorker = scheduleQueue.createWorker()
this.scheduleWorkerId = scheduleWorker.id
logger.info(`Schedule Worker ${this.scheduleWorkerId} created`)

// Keep the process running
process.stdin.resume()
}
Expand Down Expand Up @@ -98,6 +105,10 @@ export default class Worker extends BaseCommand {
const upsertWorker = queueManager.getQueue('upsert').getWorker()
logger.info(`Shutting down Flowise Upsertion Worker ${this.upsertionWorkerId}...`)
await upsertWorker.close()

const scheduleWorker = queueManager.getQueue('schedule').getWorker()
logger.info(`Shutting down Flowise Schedule Worker...`)
await scheduleWorker.close()
} catch (error) {
logger.error('There was an error shutting down Flowise Worker...', error)
await this.failExit()
Expand Down
54 changes: 53 additions & 1 deletion packages/server/src/controllers/chatflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { WorkspaceUserErrorMessage, WorkspaceUserService } from '../../enterpris
import { QueryRunner } from 'typeorm'
import { GeneralErrorMessage } from '../../utils/constants'
import { sanitizeFlowDataForPublicEndpoint } from '../../utils/sanitizeFlowData'
import scheduleService from '../../services/schedule'
import { ScheduleBeat } from '../../queue/ScheduleBeat'

const checkIfChatflowIsValidForStreaming = async (req: Request, res: Response, next: NextFunction) => {
try {
Expand Down Expand Up @@ -273,6 +275,54 @@ const checkIfChatflowHasChanged = async (req: Request, res: Response, next: Next
}
}

const getScheduleStatus = async (req: Request, res: Response, next: NextFunction) => {
try {
if (!req.params?.id) {
throw new InternalFlowiseError(
StatusCodes.PRECONDITION_FAILED,
'Error: chatflowsController.getScheduleStatus - id not provided!'
)
}
const workspaceId = req.user?.activeWorkspaceId
if (!workspaceId) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, 'Error: chatflowsController.getScheduleStatus - workspace not found!')
}
const status = await scheduleService.getScheduleStatus(req.params.id, workspaceId)
return res.json({
enabled: status.record?.enabled ?? false,
canEnable: status.canEnable,
reason: status.reason,
record: status.record
})
} catch (error) {
next(error)
}
}

const toggleScheduleEnabled = async (req: Request, res: Response, next: NextFunction) => {
try {
if (!req.params?.id) {
throw new InternalFlowiseError(
StatusCodes.PRECONDITION_FAILED,
'Error: chatflowsController.toggleScheduleEnabled - id not provided!'
)
}
const workspaceId = req.user?.activeWorkspaceId
if (!workspaceId) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, 'Error: chatflowsController.toggleScheduleEnabled - workspace not found!')
}
const { enabled } = req.body
if (typeof enabled !== 'boolean') {
throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, '"enabled" must be a boolean')
}
const record = await scheduleService.toggleScheduleEnabled(req.params.id, workspaceId, enabled)
await ScheduleBeat.getInstance().onScheduleChanged(record.id, enabled ? 'upsert' : 'delete')
return res.json(record)
} catch (error) {
next(error)
}
}

export default {
checkIfChatflowIsValidForStreaming,
checkIfChatflowIsValidForUploads,
Expand All @@ -284,5 +334,7 @@ export default {
updateChatflow,
getSinglePublicChatflow,
getSinglePublicChatbotConfig,
checkIfChatflowHasChanged
checkIfChatflowHasChanged,
getScheduleStatus,
toggleScheduleEnabled
}
Loading