Skip to content

lynn901/flowWorker

Repository files navigation

Flow Worker (云主机工作流任务执行器)

本项目是一个基于 Spring Boot 3.1.5Java 17 开发的 Camunda 外部任务执行器 (External Task Worker)。它专门负责处理 "云主机工作流 (Cloud Host Workflow)" 和 "漫游迁移工作流 (Roaming Migration Workflow)" 中的自动化任务逻辑。

🏗 核心架构

本项目采用 Camunda 的 外部任务模式 (External Task Pattern)。与传统的嵌入式执行不同,执行器作为一个独立的微服务运行,通过 REST API 主动从流程引擎中拉取任务。

  • 解耦设计:业务逻辑与流程编排完全分离,互不干扰
  • 高可用与扩展:可以根据任务量横向扩展(启动多个 Worker 实例),所有实例共享同一个 Topic
  • 长轮询机制:通过长轮询(Long Polling)降低引擎负载,同时保证任务响应的实时性
  • 错误重试机制:支持任务失败后自动重试,最多可重试 3 次

🛠 任务主题 (Topic Handlers)

基础云主机管理工作流 (cloud-host-workflow)

订阅 Topic 任务描述 输入变量 输出变量
create-host 创建云主机,生成唯一 ID hostName (可选) hostId (UUID), status ("CREATED")
start-host 启动主机 hostId status ("RUNNING")
stop-host 停止主机 hostId status ("STOPPED")
check-status 状态检查 hostId N/A

漫游迁移工作流 (cloud-host-roaming-workflow) - 支持错误重试

订阅 Topic 任务描述 输入变量 输出变量 重试机制
prepare-roaming 准备主机进行迁移 hostId preparationStatus, sourceHost 2 次重试,延迟 5 秒
migrate-roaming 迁移数据到目标主机 hostId, preparationStatus migrationStatus, targetHost, dataSize 3 次重试,延迟 5 秒
switch-roaming 切换网络和服务到新主机 hostId, targetHost networkStatus, serviceStatus 2 次重试,延迟 5 秒
verify-roaming 验证迁移是否成功 hostId, targetHost migrationSuccess, healthCheck, dataIntegrity, servicesRunning 2 次重试,延迟 5 秒
rollback-roaming 迁移失败后回滚到原主机 hostId, targetHost rollbackStatus, originalHostRestored 2 次重试,延迟 5 秒

⚙️ 配置说明

可以通过修改 src/main/resources/application.yml 或设置环境变量来配置执行器:

变量名 描述 默认值
CAMUNDA_BASE_URL Camunda 引擎的 REST API 地址 http://localhost:8080/engine-rest
WORKER_ID 执行器的唯一标识(用于任务锁定) cloud-host-worker
CAMUNDA_USERNAME 认证用户名 admin
CAMUNDA_PASSWORD 认证密码 admin

🚀 快速开始

前置条件

  • Java 17+
  • Maven 3.8+
  • Docker 和 Docker Compose(用于容器部署)
  • 已启动的 Camunda 7.20+ 流程引擎(本地部署或远程)

方式 1:本地编译与运行

编译打包

mvn clean install

直接启动

启动 Camunda 引擎后,运行:

java -jar target/flow-worker-1.0.0.jar

或通过 Maven 插件启动:

mvn spring-boot:run

方式 2:Docker 本地部署(推荐)

项目包含 Docker Compose 配置文件,可快速启动完整的工作环境(Camunda 引擎 + Flow Worker)。

前置条件

  • Docker 20.10+
  • Docker Compose 1.29+

启动服务

# 构建 Flow Worker 镜像
docker build -t flow-worker:1.0.0 .

# 启动完整环境(Camunda 引擎 + Flow Worker)
docker-compose up -d

验证服务状态

# 查看运行中的容器
docker-compose ps

# 查看 Flow Worker 日志
docker-compose logs -f flow-worker

# 查看 Camunda 引擎日志
docker-compose logs -f camunda

访问服务

停止服务

docker-compose down

# 也删除数据卷(清空数据库)
docker-compose down -v

方式 3:Docker 单独部署

如果已有 Camunda 引擎,可单独部署 Flow Worker 容器:

# 构建镜像
docker build -t flow-worker:1.0.0 .

# 启动容器(指向现有的 Camunda 引擎)
docker run -d \
  --name flow-worker \
  -e CAMUNDA_BASE_URL=http://your-camunda-host:8080/engine-rest \
  -e CAMUNDA_USERNAME=admin \
  -e CAMUNDA_PASSWORD=admin \
  flow-worker:1.0.0

📄 工作流部署

自动部署

使用提供的脚本自动部署两个工作流定义:

chmod +x deploy.sh
./deploy.sh

脚本会自动部署:

  1. cloud_host_workflow.bpmn - 基础云主机管理工作流
  2. cloud_host_roaming_workflow.bpmn - 漫游迁移工作流(支持错误重试)

启动工作流实例

启动基础工作流

curl -X POST \
  -H "Content-Type: application/json" \
  http://localhost:8080/engine-rest/process-definition/key/cloud-host-workflow/start

启动漫游迁移工作流

# 方式 1:使用 shell 脚本
./start_roaming_migration.sh my-host-01

# 方式 2:使用 curl
curl -X POST \
  -H "Content-Type: application/json" \
  -d '{"variables": {"hostId": {"value": "my-host-01"}}}' \
  http://localhost:8080/engine-rest/process-definition/key/cloud-host-roaming-workflow/start

📊 项目结构

.
├── src/
│   └── main/
│       ├── java/com/example/worker/
│       │   ├── FlowWorkerApplication.java      # Spring Boot 启动类
│       │   └── handler/
│       │       └── CloudHostWorker.java        # 9 个 Topic Handler
│       └── resources/
│           └── application.yml                 # 配置文件
├── cloud_host_workflow.bpmn                   # 基础工作流定义
├── cloud_host_roaming_workflow.bpmn           # 漫游迁移工作流定义(支持重试)
├── docker-compose.yml                         # Docker Compose 配置
├── Dockerfile                                 # Docker 镜像构建文件
├── deploy.sh                                  # 工作流部署脚本
├── start_roaming_migration.sh                 # 启动漫游迁移脚本
├── pom.xml                                    # Maven 配置
└── README.md                                  # 本文件

🔍 日志与调试

查看日志级别

编辑 src/main/resources/application.yml 调整日志级别:

logging:
  level:
    root: INFO
    com.example.worker: DEBUG          # Flow Worker 代码
    org.camunda.bpm.client: WARN       # Camunda 客户端

实时监控日志

# 本地运行
tail -f logs/application.log

# Docker 容器
docker-compose logs -f flow-worker

🧪 测试工作流

场景 1:基础工作流测试

# 1. 启动基础工作流
curl -X POST http://localhost:8080/engine-rest/process-definition/key/cloud-host-workflow/start

# 2. 在 Cockpit 中观察执行流程
# 访问 http://localhost:8080/camunda/app/cockpit

场景 2:漫游迁移工作流测试

# 1. 启动漫游迁移工作流
./start_roaming_migration.sh test-host-001

# 2. 观察 Flow Worker 日志
docker-compose logs -f flow-worker

# 3. 在 Cockpit 中监控流程进度

预期行为

  • 成功流程:数据迁移 → 网络切换 → 验证成功 → 流程完成
  • 失败恢复:任务失败 → 自动重试(最多 3 次) → 成功或最终失败
  • 回滚场景:迁移失败 → 触发回滚 → 恢复原主机

🛠️ 开发指南

添加新的 Topic Handler

CloudHostWorker.java 中添加新的嵌套类:

@Configuration
@ExternalTaskSubscription("your-topic-name")
public class YourTopicHandler implements ExternalTaskHandler {
    @Override
    public void execute(ExternalTask task, ExternalTaskService service) {
        try {
            // 获取输入变量
            String param = task.getVariable("paramName");
            
            // 执行业务逻辑
            log.info(">>> [Task: Your Topic] Processing: {}", param);
            
            // 返回输出变量
            Map<String, Object> variables = new HashMap<>();
            variables.put("result", "success");
            service.complete(task, variables);
        } catch (Exception e) {
            log.error(">>> [Task: Your Topic] Error: {}", e.getMessage());
            // 重试机制:重试次数,延迟毫秒
            service.handleFailure(task, e, 2, 5000);
        }
    }
}

错误处理最佳实践

  1. 使用 handleFailure 进行重试
service.handleFailure(task, exception, retryCount, delayInMillis);
  1. 处理业务错误
service.handleBpmnError(task, errorCode, errorMessage);
  1. 日志记录
log.info(">>> [Task: TopicName] {}", message);  // 信息日志
log.warn(">>> [Task: TopicName] {}", message);  // 警告日志
log.error(">>> [Task: TopicName] {}", error);   // 错误日志

📝 常见问题

Q1:Flow Worker 无法连接 Camunda 引擎

检查清单

  • Camunda 引擎是否正在运行?
  • CAMUNDA_BASE_URL 是否配置正确?
  • 网络连接是否正常?
  • 认证信息是否正确?

解决方案

# 测试连接
curl -u admin:admin http://localhost:8080/engine-rest/engine

# 检查 Flow Worker 日志
docker-compose logs flow-worker | grep -i "connection\|error"

Q2:任务持续失败和重试

原因分析

  • 查看 Flow Worker 日志了解具体错误
  • 检查输入变量是否正确
  • 验证外部依赖(网络、存储、权限等)

查看日志

docker-compose logs flow-worker | grep -i "error\|exception"

Q3:如何扩展 Flow Worker 实例?

# 启动多个 Flow Worker 实例
docker-compose up -d --scale flow-worker=3

📚 相关资源

📄 许可证

MIT License

👨‍💻 贡献

欢迎提交 Issue 和 Pull Request!

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors