diff --git a/flower/api/tasks.py b/flower/api/tasks.py index 730c290e4..332f2e22b 100644 --- a/flower/api/tasks.py +++ b/flower/api/tasks.py @@ -14,6 +14,7 @@ from ..utils import tasks from ..utils.broker import Broker +from ..utils.tasks import parse_args, parse_kwargs, make_json_serializable from . import BaseApiHandler logger = logging.getLogger(__name__) @@ -636,3 +637,50 @@ def get(self, taskid): response['worker'] = task.worker.hostname self.write(response) + +class TaskReapply(BaseTaskHandler): + @web.authenticated + async def post(self, taskid): + """ + Get task info and reapply the task with the same arguments. + + :param taskid: ID of the task to reapply. + """ + # Get original task info + task = tasks.get_task_by_id(self.application.events, taskid) + if not task: + raise HTTPError(404, f"Unknown task '{taskid}'") + + # Get task name + taskname = task.name + if not taskname: + raise HTTPError(400, "Cannot reapply task with no name") + + try: + # Get the task object from registered tasks + task_obj = self.capp.tasks[taskname] + except KeyError as exc: + raise HTTPError(404, f"Unknown task '{taskname}'") from exc + + # Parse args and kwargs from the original task + try: + args = parse_args(task.args) + kwargs = parse_kwargs(task.kwargs) + except (ValueError, json.JSONDecodeError, SyntaxError, TypeError) as exc: + logger.error("Error parsing task arguments: %s", exc) + raise HTTPError(400, f"Invalid task arguments: {str(exc)}") from exc + + # Apply the task with original arguments + try: + # Ensure args and kwargs are JSON serializable + args = make_json_serializable(args) + kwargs = make_json_serializable(kwargs) + + result = task_obj.apply_async(args=args, kwargs=kwargs) + response = {'task-id': result.task_id} + if self.backend_configured(result): + response.update(state=result.state) + self.write(response) + except Exception as exc: + logger.error("Error reapplying task with args=%s, kwargs=%s: %s", args, kwargs, str(exc)) + raise HTTPError(500, f"Error reapplying task: {str(exc)}") from exc diff --git a/flower/static/js/flower.js b/flower/static/js/flower.js index 6edde42bf..b1370673a 100644 --- a/flower/static/js/flower.js +++ b/flower/static/js/flower.js @@ -690,4 +690,37 @@ var flower = (function () { }); + $('#task-retry').click(function () { + const $button = $(this); + const $spinner = $button.find('.spinner-border'); + const taskId = $('#taskid').text(); + + if (!taskId) { + show_alert('Task ID is missing. Cannot proceed.', 'danger'); + return; + } + + // Show loading state + $button.prop('disabled', true); + $spinner.removeClass('d-none'); + + // Reapply the task using the reapply endpoint + $.ajax({ + type: 'POST', + url: url_prefix() + '/api/task/reapply/' + taskId, + success: function (response) { + show_alert(`Task ${taskId} has been retried (new task ID: ${response['task-id']})`, 'success'); + // Optionally reload the page after success + setTimeout(() => location.reload(), 1500); + }, + error: function (response) { + show_alert(response.responseText || 'Failed to retry task', 'danger'); + // Reset button state on error + $button.prop('disabled', false); + $spinner.addClass('d-none'); + } + }); + }); + + }(jQuery)); diff --git a/flower/templates/task.html b/flower/templates/task.html index 8d66bb08b..5a8c3732b 100644 --- a/flower/templates/task.html +++ b/flower/templates/task.html @@ -16,6 +16,11 @@

{{ getattr(task, 'name', None) }} {% elif task.state == "RECEIVED" or task.state == "RETRY" %} + {% elif task.state == "FAILURE" %} + {% end %}

@@ -89,4 +94,4 @@

{{ getattr(task, 'name', None) }} -{% end %} +{% end %} \ No newline at end of file diff --git a/flower/urls.py b/flower/urls.py index d3ea5df85..1de9d8c99 100644 --- a/flower/urls.py +++ b/flower/urls.py @@ -51,6 +51,7 @@ (r"/api/task/timeout/(.+)", control.TaskTimout), (r"/api/task/rate-limit/(.+)", control.TaskRateLimit), (r"/api/task/revoke/(.+)", control.TaskRevoke), + (r"/api/task/reapply/(.+)", tasks.TaskReapply), # Metrics (r"/metrics", monitor.Metrics), (r"/healthcheck", monitor.Healthcheck), diff --git a/flower/utils/tasks.py b/flower/utils/tasks.py index 4abcd82f6..a846cabb8 100644 --- a/flower/utils/tasks.py +++ b/flower/utils/tasks.py @@ -1,5 +1,7 @@ import datetime import time +import json +import ast from .search import parse_search_terms, satisfies_search_terms @@ -68,3 +70,53 @@ def get_task_by_id(events, task_id): def as_dict(task): return task.as_dict() + +def parse_args(args): + """ + Parse and process the `args` of the task. + """ + if not args: + return [] + try: + # Attempt to parse JSON + parsed_args = json.loads(args) + if isinstance(parsed_args, str) and parsed_args.startswith('(') and parsed_args.endswith(')'): + return ast.literal_eval(parsed_args) # Handle stringified tuples safely + return parsed_args + except (json.JSONDecodeError, SyntaxError): + # Fallback for stringified tuples or ellipsis + if args == '...': + return [...] + if args.startswith('(') and args.endswith(')'): + return ast.literal_eval(args) + return [args] + +def parse_kwargs(kwargs): + """ + Parse and process the `kwargs` of the task. + """ + if not kwargs: + return {} + try: + # Attempt to parse JSON + return json.loads(kwargs) + except json.JSONDecodeError as json_err: + # Fallback for stringified dictionaries + if kwargs.startswith('{') and kwargs.endswith('}'): + try: + return ast.literal_eval(kwargs) + except (ValueError, SyntaxError) as literal_err: + raise ValueError(f"Could not parse kwargs: {kwargs!r}") from literal_err + raise ValueError(f"Could not parse kwargs: {kwargs!r}") from json_err + +def make_json_serializable(obj): + """ + Recursively replace non-serializable types with JSON-serializable alternatives. + """ + if isinstance(obj, list): + return [make_json_serializable(item) for item in obj] + elif isinstance(obj, dict): + return {key: make_json_serializable(value) for key, value in obj.items()} + elif obj is Ellipsis: + return None # Replace `...` with `null` + return obj # Return the object if it's already serializable \ No newline at end of file diff --git a/tests/unit/api/test_tasks.py b/tests/unit/api/test_tasks.py index 551957d7e..96d3c6618 100644 --- a/tests/unit/api/test_tasks.py +++ b/tests/unit/api/test_tasks.py @@ -6,6 +6,7 @@ import celery.states as states from celery.events import Event +from celery.events.state import Task from celery.result import AsyncResult from flower.events import EventsState @@ -96,6 +97,205 @@ def get_task_by_id(events, task_id): return Task() +class TaskReapplyTests(BaseApiTestCase): + def test_reapply_success(self): + """Test successfully reapplying a task""" + mock_task = Task() + mock_task.name = 'tasks.add' + mock_task.args = '[1, 2]' + mock_task.kwargs = '{"multiply": 2}' + + with patch('flower.api.tasks.tasks.get_task_by_id', return_value=mock_task): + task = self._app.capp.tasks['tasks.add'] = Mock() + task.apply_async = Mock(return_value=AsyncResult('new-task-id')) + r = self.post('/api/task/reapply/123', body='') + + self.assertEqual(200, r.code) + body = json.loads(r.body.decode('utf-8')) + self.assertIn('task-id', body) + task.apply_async.assert_called_once_with( + args=[1, 2], kwargs={"multiply": 2} + ) + + def test_reapply_task_not_found(self): + """Test reapplying a non-existent task returns 404""" + with patch('flower.api.tasks.tasks.get_task_by_id', return_value=None): + r = self.post('/api/task/reapply/nonexistent', body='') + + self.assertEqual(404, r.code) + + def test_reapply_task_no_name(self): + """Test reapplying a task with no name returns 400""" + mock_task = Task() + mock_task.name = None + + with patch('flower.api.tasks.tasks.get_task_by_id', return_value=mock_task): + r = self.post('/api/task/reapply/123', body='') + + self.assertEqual(400, r.code) + + def test_reapply_unknown_task_name(self): + """Test reapplying a task that is not registered returns 404""" + mock_task = Task() + mock_task.name = 'unknown.task' + mock_task.args = '[]' + mock_task.kwargs = '{}' + + if 'unknown.task' in self._app.capp.tasks: + del self._app.capp.tasks['unknown.task'] + + with patch('flower.api.tasks.tasks.get_task_by_id', return_value=mock_task): + r = self.post('/api/task/reapply/123', body='') + + self.assertEqual(404, r.code) + + def test_reapply_invalid_args(self): + """Test reapplying a task with invalid args returns 400""" + mock_task = Task() + mock_task.name = 'tasks.add' + mock_task.args = 'invalid json' + mock_task.kwargs = '{}' + + with patch('flower.api.tasks.tasks.get_task_by_id', return_value=mock_task): + self._app.capp.tasks['tasks.add'] = Mock() + with patch('flower.api.tasks.parse_args', side_effect=ValueError("Invalid args")): + r = self.post('/api/task/reapply/123', body='') + + self.assertEqual(400, r.code) + + def test_reapply_apply_async_error(self): + """Test handling error during apply_async returns 500""" + mock_task = Task() + mock_task.name = 'tasks.add' + mock_task.args = '[1, 2]' + mock_task.kwargs = '{}' + + with patch('flower.api.tasks.tasks.get_task_by_id', return_value=mock_task): + task = self._app.capp.tasks['tasks.add'] = Mock() + task.apply_async = Mock(side_effect=Exception("Connection error")) + r = self.post('/api/task/reapply/123', body='') + + self.assertEqual(500, r.code) + + def test_reapply_with_empty_args(self): + """Test reapplying a task with empty args""" + mock_task = Task() + mock_task.name = 'tasks.simple' + mock_task.args = '' + mock_task.kwargs = '' + + with patch('flower.api.tasks.tasks.get_task_by_id', return_value=mock_task): + task = self._app.capp.tasks['tasks.simple'] = Mock() + task.apply_async = Mock(return_value=AsyncResult('new-task-id')) + r = self.post('/api/task/reapply/123', body='') + + self.assertEqual(200, r.code) + task.apply_async.assert_called_once_with(args=[], kwargs={}) + + def test_reapply_with_ellipsis_args(self): + """Test reapplying a task with ellipsis in args""" + mock_task = Task() + mock_task.name = 'tasks.test' + mock_task.args = '...' + mock_task.kwargs = '{}' + + with patch('flower.api.tasks.tasks.get_task_by_id', return_value=mock_task): + task = self._app.capp.tasks['tasks.test'] = Mock() + task.apply_async = Mock(return_value=AsyncResult('new-task-id')) + r = self.post('/api/task/reapply/123', body='') + + self.assertEqual(200, r.code) + task.apply_async.assert_called_once_with(args=[None], kwargs={}) + + def test_reapply_with_nested_json_args(self): + """Test reapplying task with nested JSON structures in args""" + mock_task = Task() + mock_task.name = 'tasks.process' + mock_task.args = '[{"user_id": 123, "items": [1, 2, 3]}, "action"]' + mock_task.kwargs = '{}' + + with patch('flower.api.tasks.tasks.get_task_by_id', return_value=mock_task): + task = self._app.capp.tasks['tasks.process'] = Mock() + task.apply_async = Mock(return_value=AsyncResult('new-task-id')) + r = self.post('/api/task/reapply/123', body='') + + self.assertEqual(200, r.code) + task.apply_async.assert_called_once_with( + args=[{"user_id": 123, "items": [1, 2, 3]}, "action"], + kwargs={} + ) + + def test_reapply_with_complex_kwargs(self): + """Test reapplying task with complex JSON in kwargs""" + mock_task = Task() + mock_task.name = 'tasks.configure' + mock_task.args = '[]' + mock_task.kwargs = '{"retry": true, "timeout": 30, "options": {"key": "value"}}' + + with patch('flower.api.tasks.tasks.get_task_by_id', return_value=mock_task): + task = self._app.capp.tasks['tasks.configure'] = Mock() + task.apply_async = Mock(return_value=AsyncResult('new-task-id')) + r = self.post('/api/task/reapply/123', body='') + + self.assertEqual(200, r.code) + task.apply_async.assert_called_once_with( + args=[], + kwargs={"retry": True, "timeout": 30, "options": {"key": "value"}} + ) + + def test_reapply_with_python_tuple_args(self): + """Test reapplying task with Python tuple string in args""" + mock_task = Task() + mock_task.name = 'tasks.tuple_task' + mock_task.args = '(1, 2, 3)' + mock_task.kwargs = '{}' + + with patch('flower.api.tasks.tasks.get_task_by_id', return_value=mock_task): + task = self._app.capp.tasks['tasks.tuple_task'] = Mock() + task.apply_async = Mock(return_value=AsyncResult('new-task-id')) + r = self.post('/api/task/reapply/123', body='') + + self.assertEqual(200, r.code) + task.apply_async.assert_called_once_with(args=(1, 2, 3), kwargs={}) + + def test_reapply_with_python_dict_kwargs(self): + """Test reapplying task with Python dict string in kwargs""" + mock_task = Task() + mock_task.name = 'tasks.dict_task' + mock_task.args = '[]' + mock_task.kwargs = "{'count': 5, 'enabled': True}" + + with patch('flower.api.tasks.tasks.get_task_by_id', return_value=mock_task): + task = self._app.capp.tasks['tasks.dict_task'] = Mock() + task.apply_async = Mock(return_value=AsyncResult('new-task-id')) + r = self.post('/api/task/reapply/123', body='') + + self.assertEqual(200, r.code) + task.apply_async.assert_called_once_with( + args=[], + kwargs={'count': 5, 'enabled': True} + ) + + def test_reapply_json_serialization_in_response(self): + """Test that response is properly JSON serialized""" + mock_task = Task() + mock_task.name = 'tasks.add' + mock_task.args = '[1, 2]' + mock_task.kwargs = '{}' + + with patch('flower.api.tasks.tasks.get_task_by_id', return_value=mock_task): + task = self._app.capp.tasks['tasks.add'] = Mock() + task.apply_async = Mock(return_value=AsyncResult('test-task-123')) + r = self.post('/api/task/reapply/123', body='') + + self.assertEqual(200, r.code) + body = json.loads(r.body.decode('utf-8')) + self.assertIn('task-id', body) + self.assertEqual(body['task-id'], 'test-task-123') + + self.assertIsInstance(body, dict) + + class TaskTests(BaseApiTestCase): def setUp(self): self.app = super().get_app()