Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"

"github.com/joho/godotenv"
"github.com/kelseyhightower/envconfig"
"github.com/langgenius/dify-plugin-daemon/internal/server"
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
Expand All @@ -11,7 +12,7 @@ import (

func main() {
var config app.Config

_ = godotenv.Load()
err := envconfig.Process("", &config)
if err != nil {
log.Panic("error processing environment variables", "error", err)
Expand Down
10 changes: 6 additions & 4 deletions internal/core/debugging_runtime/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (s *DifyServer) OnBoot(c gnet.Engine) (action gnet.Action) {

func (s *DifyServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
// new plugin connected
log.Info("new debugging connection", "remote", c.RemoteAddr().String())
c.SetContext(&codec{})
runtime := &RemotePluginRuntime{
MediaTransport: basic_runtime.NewMediaTransport(
Expand Down Expand Up @@ -104,10 +105,11 @@ func (s *DifyServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
// close plugin
plugin.cleanupResources()

// trigger runtime disconnected event
s.WalkNotifiers(func(notifier PluginRuntimeNotifier) {
notifier.OnRuntimeDisconnected(plugin)
})
if plugin.initialized {
s.WalkNotifiers(func(notifier PluginRuntimeNotifier) {
notifier.OnRuntimeDisconnected(plugin)
})
}

// decrease current connection
atomic.AddInt32(&s.currentConn, -1)
Expand Down
38 changes: 38 additions & 0 deletions internal/core/plugin_manager/manager.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package plugin_manager

import (
"context"
"errors"
"fmt"
"strings"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/langgenius/dify-cloud-kit/oss"
Expand Down Expand Up @@ -214,6 +217,41 @@ func (c *PluginManager) NeedRedirecting(
return true, nil
}

// TryLaunchLocalPlugin attempts to launch a local plugin on-demand.
// This is used as a fallback when the runtime is not found during dispatch,
// instead of waiting for the WatchDog's 30s periodic scan.
// Returns true if the plugin was successfully launched and is ready.
func (p *PluginManager) TryLaunchLocalPlugin(
identity plugin_entities.PluginUniqueIdentifier,
) bool {
if p.config.Platform != app.PLATFORM_LOCAL {
return false
}

exists, err := p.installedBucket.Exists(identity)
if err != nil || !exists {
return false
}

_, ch, err := p.controlPanel.LaunchLocalPlugin(context.Background(), identity)
if err != nil {
if errors.Is(err, controlpanel.ErrorPluginAlreadyLaunched) {
// another goroutine launched it between our check and now
return true
}
log.Warn("on-demand launch failed", "plugin", identity.String(), "error", err)
return false
}

select {
case err := <-ch:
return err == nil
case <-time.After(2 * time.Minute):
log.Warn("on-demand launch timed out", "plugin", identity.String())
return false
}
}

func pluginAssetCacheKey(
pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
path string,
Expand Down
100 changes: 100 additions & 0 deletions internal/server/fallback_oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package server

import (
"github.com/langgenius/dify-cloud-kit/oss"
"github.com/langgenius/dify-plugin-daemon/pkg/utils/log"
)

type FallbackOSS struct {
primary oss.OSS
fallback oss.OSS
}

func NewFallbackOSS(primary oss.OSS, fallback oss.OSS) *FallbackOSS {
return &FallbackOSS{
primary: primary,
fallback: fallback,
}
}

func (f *FallbackOSS) Save(key string, data []byte) error {
primaryErr := f.primary.Save(key, data)
if primaryErr != nil {
// primary failed, save to fallback synchronously to ensure data is available
log.Warn("fallback: primary Save failed, saving to local fallback", "key", key, "error", primaryErr)
if fallbackErr := f.fallback.Save(key, data); fallbackErr != nil {
log.Error("fallback: both primary and local Save failed", "key", key, "primaryError", primaryErr, "fallbackError", fallbackErr)
return primaryErr
}
return nil
}
return nil
}

func (f *FallbackOSS) Load(key string) ([]byte, error) {
data, err := f.primary.Load(key)
if err == nil {
return data, nil
}

fallbackData, fallbackErr := f.fallback.Load(key)
if fallbackErr != nil {
return nil, err
}

log.Warn("fallback: primary Load failed, using local cache", "key", key, "error", err)
return fallbackData, nil
}

func (f *FallbackOSS) Exists(key string) (bool, error) {
exists, err := f.primary.Exists(key)
if err == nil {
return exists, nil
}

fallbackExists, fallbackErr := f.fallback.Exists(key)
if fallbackErr != nil {
return false, err
}

log.Warn("fallback: primary Exists failed, using local cache", "key", key, "error", err)
return fallbackExists, nil
}

func (f *FallbackOSS) State(key string) (oss.OSSState, error) {
state, err := f.primary.State(key)
if err == nil {
return state, nil
}

fallbackState, fallbackErr := f.fallback.State(key)
if fallbackErr != nil {
return oss.OSSState{}, err
}

log.Warn("fallback: primary State failed, using local cache", "key", key, "error", err)
return fallbackState, nil
}

func (f *FallbackOSS) List(prefix string) ([]oss.OSSPath, error) {
paths, err := f.primary.List(prefix)
if err == nil {
return paths, nil
}

fallbackPaths, fallbackErr := f.fallback.List(prefix)
if fallbackErr != nil {
return nil, err
}

log.Warn("fallback: primary List failed, using local cache", "prefix", prefix, "error", err)
return fallbackPaths, nil
}

func (f *FallbackOSS) Delete(key string) error {
return f.primary.Delete(key)
}

func (f *FallbackOSS) Type() string {
return f.primary.Type()
}
5 changes: 5 additions & 0 deletions internal/server/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func (app *App) RedirectPluginInvoke() gin.HandlerFunc {

// check if plugin in current node
if needRedirecting, originalError := app.pluginManager.NeedRedirecting(identity); needRedirecting {
if app.pluginManager.TryLaunchLocalPlugin(identity) {
log.Info("on-demand launch succeeded, serving locally", "plugin", identity.String())
ctx.Next()
return
}
app.redirectPluginInvokeByPluginIdentifier(ctx, identity, originalError)
ctx.Abort()
} else {
Expand Down
14 changes: 14 additions & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ func initOSS(config *app.Config) oss.OSS {
log.Panic("failed to create storage", "error", err)
}

if config.PluginStorageType != oss.OSS_TYPE_LOCAL && config.PluginStorageType != "local_file" {
fallbackStorage, fallbackErr := factory.Load("local", oss.OSSArgs{
Local: &oss.Local{
Path: config.PluginStorageLocalRoot,
},
})
if fallbackErr != nil {
log.Warn("failed to create local fallback storage, running without fallback", "error", fallbackErr)
return storage
}
log.Info("storage: wrapping primary with local fallback", "primary_type", storage.Type())
return NewFallbackOSS(storage, fallbackStorage)
}

return storage
}

Expand Down
Loading