Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0
go.opentelemetry.io/otel/sdk v1.39.0
go.opentelemetry.io/otel/sdk/metric v1.39.0
go.opentelemetry.io/otel/trace v1.39.0
golang.org/x/tools v0.38.0
gorm.io/driver/mysql v1.5.7
gorm.io/gorm v1.30.0
Expand Down Expand Up @@ -146,7 +147,6 @@ require (
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect
go.opentelemetry.io/otel/metric v1.39.0 // indirect
go.opentelemetry.io/otel/trace v1.39.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
golang.org/x/mod v0.29.0 // indirect
golang.org/x/oauth2 v0.32.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions internal/core/control_panel/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controlpanel

import (
"sync"
"sync/atomic"
"time"

"github.com/langgenius/dify-plugin-daemon/internal/core/debugging_runtime"
Expand Down Expand Up @@ -69,11 +70,14 @@ type ControlPanel struct {
plugin_entities.PluginUniqueIdentifier,
*debugging_runtime.RemotePluginRuntime,
]

localReadinessSnapshot atomic.Pointer[LocalReadinessSnapshot]
}

type LocalPluginFailsRecord struct {
RetryCount int32
LastTriedAt time.Time
LastError string
}

// create a new control panel as the engine of the local plugin daemon
Expand Down
212 changes: 212 additions & 0 deletions internal/core/control_panel/readiness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package controlpanel

import (
"sync"
"time"

"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
)

type LocalReadinessSnapshot struct {
// ⭐ 核心:readiness 只基于初始插件状态
// Pod 一旦 ready,永远不会因为运行时新增插件而变为 not ready
Ready bool

// 初始插件状态(Pod启动时锁定,之后永不改变)
InitialPluginsReady bool
InitialExpected int
InitialRunning int
InitialMissing []string
InitialFailed []string

// 运行时新增插件状态(与readiness无关,仅供监控)
RuntimePluginsLoading int
RuntimeMissing []string

// 全量统计(包含初始+运行时)
Expected int
Running int
Missing []string
Failed []string
UpdatedAt time.Time
Platform string
Installed int
Ignored int
MaxRetries int32
}

type initialPluginSet struct {
lock sync.RWMutex
ids map[string]bool // plugin id → true
ready bool // 是否已锁定
}

var initialPlugins = &initialPluginSet{
ids: make(map[string]bool),
}

func (c *ControlPanel) LocalReadiness() (LocalReadinessSnapshot, bool) {
ptr := c.localReadinessSnapshot.Load()
if ptr == nil {
return LocalReadinessSnapshot{}, false
}
return *ptr, true
}

func (c *ControlPanel) updateLocalReadinessSnapshot(
installed []plugin_entities.PluginUniqueIdentifier,
) {
now := time.Now()

expected := make([]plugin_entities.PluginUniqueIdentifier, 0, len(installed))
ignored := 0
for _, id := range installed {
if _, ok := c.localPluginWatchIgnoreList.Load(id); ok {
ignored++
continue
}
expected = append(expected, id)
}

// 计算全量插件状态
missing := make([]string, 0)
failed := make([]string, 0)
running := 0
for _, id := range expected {
if c.localPluginRuntimes.Exists(id) {
running++
continue
}

if retry, ok := c.localPluginFailsRecord.Load(id); ok && retry.RetryCount >= c.config.PluginLocalMaxRetryCount {
failed = append(failed, id.String())
continue
}
missing = append(missing, id.String())
}

// 计算初始插件的状态
initialMissing := make([]string, 0)
initialFailed := make([]string, 0)
initialRunning := 0
initialExpected := 0

isInitialReady := c.isInitialPluginsReady(expected, &initialExpected, &initialRunning, &initialMissing, &initialFailed)

// 计算运行时新增插件
runtimeMissing := make([]string, 0)
runtimeLoading := 0

initialSet := c.getInitialPluginSet()
for _, id := range expected {
idStr := id.String()
if !initialSet[idStr] {
// 这是运行时新增的插件
if !c.localPluginRuntimes.Exists(id) {
if retry, ok := c.localPluginFailsRecord.Load(id); !ok || retry.RetryCount < c.config.PluginLocalMaxRetryCount {
runtimeMissing = append(runtimeMissing, idStr)
runtimeLoading++
}
}
}
}

// 🔑 关键:readiness ONLY depends on initial plugins
// Once ready, it will never become not ready due to runtime plugin additions
snapshot := &LocalReadinessSnapshot{
Ready: isInitialReady,
InitialPluginsReady: isInitialReady,
InitialExpected: initialExpected,
InitialRunning: initialRunning,
InitialMissing: initialMissing,
InitialFailed: initialFailed,
RuntimePluginsLoading: runtimeLoading,
RuntimeMissing: runtimeMissing,
Expected: len(expected),
Installed: len(installed),
Ignored: ignored,
Running: running,
Missing: missing,
Failed: failed,
UpdatedAt: now,
Platform: string(c.config.Platform),
MaxRetries: c.config.PluginLocalMaxRetryCount,
}
c.localReadinessSnapshot.Store(snapshot)
}

// isInitialPluginsReady 检查初始插件是否全部启动完成
func (c *ControlPanel) isInitialPluginsReady(
current []plugin_entities.PluginUniqueIdentifier,
initialExpected *int,
initialRunning *int,
initialMissing *[]string,
initialFailed *[]string,
) bool {
initialSet := c.getInitialPluginSet()
if len(initialSet) == 0 && len(current) > 0 {
// 首次启动,锁定初始插件集合
c.lockInitialPlugins(current)
initialSet = c.getInitialPluginSet()
}

missingList := make([]string, 0)
failedList := make([]string, 0)
running := 0
expected := 0

for _, id := range current {
idStr := id.String()
if !initialSet[idStr] {
continue
}

expected++
if c.localPluginRuntimes.Exists(id) {
running++
continue
}

if retry, ok := c.localPluginFailsRecord.Load(id); ok && retry.RetryCount >= c.config.PluginLocalMaxRetryCount {
failedList = append(failedList, idStr)
continue
}
missingList = append(missingList, idStr)
}

*initialExpected = expected
*initialRunning = running
*initialMissing = missingList
*initialFailed = failedList

return len(missingList) == 0
}

// lockInitialPlugins 锁定初始插件集合(仅在首次调用时)
func (c *ControlPanel) lockInitialPlugins(
plugins []plugin_entities.PluginUniqueIdentifier,
) {
initialPlugins.lock.Lock()
defer initialPlugins.lock.Unlock()

if initialPlugins.ready {
return
}

for _, id := range plugins {
initialPlugins.ids[id.String()] = true
}
initialPlugins.ready = true
}

// getInitialPluginSet 获取初始插件集合(只读)
func (c *ControlPanel) getInitialPluginSet() map[string]bool {
initialPlugins.lock.RLock()
defer initialPlugins.lock.RUnlock()

result := make(map[string]bool)
for k, v := range initialPlugins.ids {
result[k] = v
}
return result
}
20 changes: 12 additions & 8 deletions internal/core/control_panel/server_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
func (c *ControlPanel) startLocalMonitor() {
log.Info("start to handle new plugins", "path", c.config.PluginInstalledPath)
log.Info("launch plugins with max concurrency", "concurrency", c.config.PluginLocalLaunchingConcurrent)
log.Info("plugin max retry count", "max_retry_count", c.config.PluginLocalMaxRetryCount)

c.handleNewLocalPlugins()
// sync every 30 seconds
Expand Down Expand Up @@ -83,10 +84,11 @@ func (c *ControlPanel) handleNewLocalPlugins() {
retry = LocalPluginFailsRecord{
RetryCount: 0,
LastTriedAt: time.Now(),
LastError: err.Error(),
}
}

if retry.RetryCount >= MAX_RETRY_COUNT {
if retry.RetryCount >= c.config.PluginLocalMaxRetryCount {
continue
}

Expand Down Expand Up @@ -114,6 +116,7 @@ func (c *ControlPanel) handleNewLocalPlugins() {
c.localPluginFailsRecord.Store(uniquePluginIdentifier, LocalPluginFailsRecord{
RetryCount: retry.RetryCount + 1,
LastTriedAt: time.Now(),
LastError: err.Error(),
})
} else {
// reset the failure record
Expand All @@ -124,17 +127,18 @@ func (c *ControlPanel) handleNewLocalPlugins() {

// wait for all plugins to be launched
wg.Wait()

// update readiness snapshot
c.updateLocalReadinessSnapshot(plugins)
}

var (
MAX_RETRY_COUNT = int32(15)

RETRY_WAIT_INTERVAL_MAP = map[int32]time.Duration{
0: 0 * time.Second,
3: 30 * time.Second,
8: 60 * time.Second,
MAX_RETRY_COUNT: 240 * time.Second,
// stop
0: 0 * time.Second,
1: 15 * time.Second,
2: 30 * time.Second,
3: 60 * time.Second,
4: 120 * time.Second,
}
)

Expand Down
35 changes: 35 additions & 0 deletions internal/core/plugin_manager/readiness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package plugin_manager

import (
controlpanel "github.com/langgenius/dify-plugin-daemon/internal/core/control_panel"
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
)

type ReadinessReport struct {
Ready bool
Reason string
Plugins *controlpanel.LocalReadinessSnapshot
}

func (p *PluginManager) Readiness() ReadinessReport {
if p == nil || p.config == nil {
return ReadinessReport{Ready: false, Reason: "manager_not_initialized"}
}

if p.config.Platform != app.PLATFORM_LOCAL {
return ReadinessReport{Ready: true, Reason: "non_local_platform"}
}

snapshot, ok := p.controlPanel.LocalReadiness()
if !ok {
return ReadinessReport{Ready: false, Reason: "plugin_monitor_not_ready"}
}

if snapshot.Ready {
return ReadinessReport{Ready: true, Reason: "plugins_ready", Plugins: &snapshot}
}
if len(snapshot.Failed) > 0 {
return ReadinessReport{Ready: false, Reason: "plugins_failed", Plugins: &snapshot}
}
return ReadinessReport{Ready: false, Reason: "plugins_starting", Plugins: &snapshot}
}
32 changes: 32 additions & 0 deletions internal/server/controllers/ready_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package controllers

import (
"net/http"

"github.com/gin-gonic/gin"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
)

func ReadyCheck(appConfig *app.Config) gin.HandlerFunc {
return func(c *gin.Context) {
_ = appConfig
report := plugin_manager.Manager().Readiness()
if report.Ready {
c.JSON(http.StatusOK, gin.H{
"status": "ok",
"ready": true,
"reason": report.Reason,
"detail": report.Plugins,
})
return
}

c.JSON(http.StatusServiceUnavailable, gin.H{
"status": "unready",
"ready": false,
"reason": report.Reason,
"detail": report.Plugins,
})
}
}
3 changes: 2 additions & 1 deletion internal/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ engine := gin.New()
engine.Use(log.LoggerMiddleware())
} else {
engine.Use(log.LoggerMiddlewareWithConfig(log.LoggerConfig{
SkipPaths: []string{"/health/check"},
SkipPaths: []string{"/health/check", "/ready/check"},
}))
}
engine.Use(controllers.CollectActiveRequests())
engine.NoRoute(func(c *gin.Context) {
c.JSON(http.StatusNotFound, gin.H{"code": "not_found", "message": "route not found"})
})
engine.GET("/health/check", controllers.HealthCheck(config))
engine.GET("/ready/check", controllers.ReadyCheck(config))

endpointGroup := engine.Group("/e")
serverlessTransactionGroup := engine.Group("/backwards-invocation")
Expand Down
Loading