diff --git a/cmd/server/main.go b/cmd/server/main.go index 55d80b62b..7e1569f52 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -3,6 +3,7 @@ package main import ( "context" + "github.com/joho/godotenv" "github.com/kelseyhightower/envconfig" "github.com/langgenius/dify-plugin-daemon/internal/server" "github.com/langgenius/dify-plugin-daemon/internal/types/app" @@ -11,7 +12,7 @@ import ( func main() { var config app.Config - + _ = godotenv.Load() err := envconfig.Process("", &config) if err != nil { log.Panic("error processing environment variables", "error", err) diff --git a/internal/core/debugging_runtime/hooks.go b/internal/core/debugging_runtime/hooks.go index 758e20026..98600e0a5 100644 --- a/internal/core/debugging_runtime/hooks.go +++ b/internal/core/debugging_runtime/hooks.go @@ -50,6 +50,7 @@ func (s *DifyServer) OnBoot(c gnet.Engine) (action gnet.Action) { func (s *DifyServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { // new plugin connected + log.Info("new debugging connection", "remote", c.RemoteAddr().String()) c.SetContext(&codec{}) runtime := &RemotePluginRuntime{ MediaTransport: basic_runtime.NewMediaTransport( @@ -104,10 +105,11 @@ func (s *DifyServer) OnClose(c gnet.Conn, err error) (action gnet.Action) { // close plugin plugin.cleanupResources() - // trigger runtime disconnected event - s.WalkNotifiers(func(notifier PluginRuntimeNotifier) { - notifier.OnRuntimeDisconnected(plugin) - }) + if plugin.initialized { + s.WalkNotifiers(func(notifier PluginRuntimeNotifier) { + notifier.OnRuntimeDisconnected(plugin) + }) + } // decrease current connection atomic.AddInt32(&s.currentConn, -1) diff --git a/internal/core/plugin_manager/manager.go b/internal/core/plugin_manager/manager.go index 82542fce9..81c091949 100644 --- a/internal/core/plugin_manager/manager.go +++ b/internal/core/plugin_manager/manager.go @@ -1,8 +1,11 @@ package plugin_manager import ( + "context" + "errors" "fmt" "strings" + "time" lru "github.com/hashicorp/golang-lru/v2" "github.com/langgenius/dify-cloud-kit/oss" @@ -214,6 +217,41 @@ func (c *PluginManager) NeedRedirecting( return true, nil } +// TryLaunchLocalPlugin attempts to launch a local plugin on-demand. +// This is used as a fallback when the runtime is not found during dispatch, +// instead of waiting for the WatchDog's 30s periodic scan. +// Returns true if the plugin was successfully launched and is ready. +func (p *PluginManager) TryLaunchLocalPlugin( + identity plugin_entities.PluginUniqueIdentifier, +) bool { + if p.config.Platform != app.PLATFORM_LOCAL { + return false + } + + exists, err := p.installedBucket.Exists(identity) + if err != nil || !exists { + return false + } + + _, ch, err := p.controlPanel.LaunchLocalPlugin(context.Background(), identity) + if err != nil { + if errors.Is(err, controlpanel.ErrorPluginAlreadyLaunched) { + // another goroutine launched it between our check and now + return true + } + log.Warn("on-demand launch failed", "plugin", identity.String(), "error", err) + return false + } + + select { + case err := <-ch: + return err == nil + case <-time.After(2 * time.Minute): + log.Warn("on-demand launch timed out", "plugin", identity.String()) + return false + } +} + func pluginAssetCacheKey( pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier, path string, diff --git a/internal/server/fallback_oss.go b/internal/server/fallback_oss.go new file mode 100644 index 000000000..436793cc8 --- /dev/null +++ b/internal/server/fallback_oss.go @@ -0,0 +1,121 @@ +package server + +import ( + "github.com/langgenius/dify-cloud-kit/oss" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/log" +) + +type FallbackOSS struct { + primary oss.OSS + fallback oss.OSS +} + +func NewFallbackOSS(primary oss.OSS, fallback oss.OSS) *FallbackOSS { + return &FallbackOSS{ + primary: primary, + fallback: fallback, + } +} + +func (f *FallbackOSS) Save(key string, data []byte) error { + primaryErr := f.primary.Save(key, data) + if primaryErr != nil { + // primary failed, save to fallback synchronously to ensure data is available + log.Warn("fallback: primary Save failed, saving to local fallback", "key", key, "error", primaryErr) + if fallbackErr := f.fallback.Save(key, data); fallbackErr != nil { + log.Error("fallback: both primary and local Save failed", "key", key, "primaryError", primaryErr, "fallbackError", fallbackErr) + return primaryErr + } + return nil + } + return nil +} + +func (f *FallbackOSS) Load(key string) ([]byte, error) { + data, err := f.primary.Load(key) + if err == nil { + return data, nil + } + + fallbackData, fallbackErr := f.fallback.Load(key) + if fallbackErr != nil { + return nil, err + } + + log.Warn("fallback: primary Load failed, using local cache", "key", key, "error", err) + return fallbackData, nil +} + +func (f *FallbackOSS) Exists(key string) (bool, error) { + exists, err := f.primary.Exists(key) + if err == nil && exists { + return true, nil + } + + fallbackExists, fallbackErr := f.fallback.Exists(key) + if fallbackErr == nil && fallbackExists { + log.Warn("fallback: primary Exists failed or returned false, found in local cache", "key", key, "error", err) + return true, nil + } + + if err != nil { + return false, err + } + return false, nil +} + +func (f *FallbackOSS) State(key string) (oss.OSSState, error) { + state, err := f.primary.State(key) + if err == nil { + return state, nil + } + + fallbackState, fallbackErr := f.fallback.State(key) + if fallbackErr != nil { + return oss.OSSState{}, err + } + + log.Warn("fallback: primary State failed, using local cache", "key", key, "error", err) + return fallbackState, nil +} + +func (f *FallbackOSS) List(prefix string) ([]oss.OSSPath, error) { + allPaths := make([]oss.OSSPath, 0) + duplicatePaths := make(map[string]bool) + + paths, err := f.primary.List(prefix) + if err != nil { + return nil, err + } + + fallbackPaths, fallbackErr := f.fallback.List(prefix) + if fallbackErr != nil { + return nil, err + } + + for _, path := range paths { + if _, ok := duplicatePaths[path.Path]; !ok { + allPaths = append(allPaths, path) + duplicatePaths[path.Path] = true + } + } + + for _, path := range fallbackPaths { + if _, ok := duplicatePaths[path.Path]; !ok { + allPaths = append(allPaths, path) + duplicatePaths[path.Path] = true + } + } + + log.Warn("fallback: primary List failed, using local cache", "prefix", prefix, "error", err) + return allPaths, nil +} + +func (f *FallbackOSS) Delete(key string) error { + _ = f.fallback.Delete(key) + return f.primary.Delete(key) +} + +func (f *FallbackOSS) Type() string { + return f.primary.Type() +} diff --git a/internal/server/fallback_oss_test.go b/internal/server/fallback_oss_test.go new file mode 100644 index 000000000..c27f5d7a3 --- /dev/null +++ b/internal/server/fallback_oss_test.go @@ -0,0 +1,396 @@ +package server + +import ( + "errors" + "testing" + "time" + + "github.com/langgenius/dify-cloud-kit/oss" +) + +var errPrimary = errors.New("primary error") +var errFallback = errors.New("fallback error") + +// mockOSS implements oss.OSS for testing. +type mockOSS struct { + saveFunc func(key string, data []byte) error + loadFunc func(key string) ([]byte, error) + existsFunc func(key string) (bool, error) + stateFunc func(key string) (oss.OSSState, error) + listFunc func(prefix string) ([]oss.OSSPath, error) + deleteFunc func(key string) error + typeName string +} + +func (m *mockOSS) Save(key string, data []byte) error { return m.saveFunc(key, data) } +func (m *mockOSS) Load(key string) ([]byte, error) { return m.loadFunc(key) } +func (m *mockOSS) Exists(key string) (bool, error) { return m.existsFunc(key) } +func (m *mockOSS) State(key string) (oss.OSSState, error) { return m.stateFunc(key) } +func (m *mockOSS) List(prefix string) ([]oss.OSSPath, error) { return m.listFunc(prefix) } +func (m *mockOSS) Delete(key string) error { return m.deleteFunc(key) } +func (m *mockOSS) Type() string { return m.typeName } + +func TestNewFallbackOSS(t *testing.T) { + primary := &mockOSS{typeName: "primary"} + fallback := &mockOSS{typeName: "fallback"} + f := NewFallbackOSS(primary, fallback) + + if f.primary != primary { + t.Error("expected primary to be set") + } + if f.fallback != fallback { + t.Error("expected fallback to be set") + } +} + +func TestFallbackOSS_Save(t *testing.T) { + tests := []struct { + name string + primaryErr error + fallbackErr error + wantErr bool + }{ + { + name: "primary succeeds", + primaryErr: nil, + wantErr: false, + }, + { + name: "primary fails, fallback succeeds", + primaryErr: errPrimary, + fallbackErr: nil, + wantErr: false, + }, + { + name: "both fail", + primaryErr: errPrimary, + fallbackErr: errFallback, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + primary := &mockOSS{ + saveFunc: func(key string, data []byte) error { return tt.primaryErr }, + } + fallback := &mockOSS{ + saveFunc: func(key string, data []byte) error { return tt.fallbackErr }, + } + f := NewFallbackOSS(primary, fallback) + + err := f.Save("test-key", []byte("data")) + if (err != nil) != tt.wantErr { + t.Errorf("Save() error = %v, wantErr %v", err, tt.wantErr) + } + if tt.wantErr && !errors.Is(err, errPrimary) { + t.Errorf("Save() expected primary error, got %v", err) + } + }) + } +} + +func TestFallbackOSS_Load(t *testing.T) { + tests := []struct { + name string + primaryData []byte + primaryErr error + fallbackData []byte + fallbackErr error + wantData []byte + wantErr bool + }{ + { + name: "primary succeeds", + primaryData: []byte("primary-data"), + primaryErr: nil, + wantData: []byte("primary-data"), + wantErr: false, + }, + { + name: "primary fails, fallback succeeds", + primaryErr: errPrimary, + fallbackData: []byte("fallback-data"), + fallbackErr: nil, + wantData: []byte("fallback-data"), + wantErr: false, + }, + { + name: "both fail", + primaryErr: errPrimary, + fallbackErr: errFallback, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + primary := &mockOSS{ + loadFunc: func(key string) ([]byte, error) { return tt.primaryData, tt.primaryErr }, + } + fallback := &mockOSS{ + loadFunc: func(key string) ([]byte, error) { return tt.fallbackData, tt.fallbackErr }, + } + f := NewFallbackOSS(primary, fallback) + + data, err := f.Load("test-key") + if (err != nil) != tt.wantErr { + t.Errorf("Load() error = %v, wantErr %v", err, tt.wantErr) + } + if !tt.wantErr && string(data) != string(tt.wantData) { + t.Errorf("Load() data = %s, want %s", data, tt.wantData) + } + }) + } +} + +func TestFallbackOSS_Exists(t *testing.T) { + tests := []struct { + name string + primaryExists bool + primaryErr error + fallbackExists bool + fallbackErr error + wantExists bool + wantErr bool + }{ + { + name: "primary exists", + primaryExists: true, + primaryErr: nil, + wantExists: true, + wantErr: false, + }, + { + name: "primary returns false with no error", + primaryExists: false, + primaryErr: nil, + wantExists: false, + wantErr: false, + }, + { + name: "primary fails, fallback exists", + primaryErr: errPrimary, + fallbackExists: true, + fallbackErr: nil, + wantExists: true, + wantErr: false, + }, + { + name: "primary returns false, fallback exists", + primaryExists: false, + primaryErr: nil, + fallbackExists: true, + fallbackErr: nil, + wantExists: true, + wantErr: false, + }, + { + name: "primary fails, fallback not found", + primaryErr: errPrimary, + fallbackExists: false, + fallbackErr: nil, + wantExists: false, + wantErr: true, + }, + { + name: "primary fails, fallback fails", + primaryErr: errPrimary, + fallbackErr: errFallback, + wantExists: false, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + primary := &mockOSS{ + existsFunc: func(key string) (bool, error) { return tt.primaryExists, tt.primaryErr }, + } + fallback := &mockOSS{ + existsFunc: func(key string) (bool, error) { return tt.fallbackExists, tt.fallbackErr }, + } + f := NewFallbackOSS(primary, fallback) + + exists, err := f.Exists("test-key") + if (err != nil) != tt.wantErr { + t.Errorf("Exists() error = %v, wantErr %v", err, tt.wantErr) + } + if exists != tt.wantExists { + t.Errorf("Exists() = %v, want %v", exists, tt.wantExists) + } + }) + } +} + +func TestFallbackOSS_State(t *testing.T) { + now := time.Now() + primaryState := oss.OSSState{Size: 100, LastModified: now} + fallbackState := oss.OSSState{Size: 50, LastModified: now.Add(-time.Hour)} + + tests := []struct { + name string + primary oss.OSSState + primaryE error + fallback oss.OSSState + fallbackE error + want oss.OSSState + wantErr bool + }{ + { + name: "primary succeeds", + primary: primaryState, + want: primaryState, + wantErr: false, + }, + { + name: "primary fails, fallback succeeds", + primaryE: errPrimary, + fallback: fallbackState, + want: fallbackState, + wantErr: false, + }, + { + name: "both fail", + primaryE: errPrimary, + fallbackE: errFallback, + want: oss.OSSState{}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + primary := &mockOSS{ + stateFunc: func(key string) (oss.OSSState, error) { return tt.primary, tt.primaryE }, + } + fallback := &mockOSS{ + stateFunc: func(key string) (oss.OSSState, error) { return tt.fallback, tt.fallbackE }, + } + f := NewFallbackOSS(primary, fallback) + + state, err := f.State("test-key") + if (err != nil) != tt.wantErr { + t.Errorf("State() error = %v, wantErr %v", err, tt.wantErr) + } + if state.Size != tt.want.Size { + t.Errorf("State() size = %d, want %d", state.Size, tt.want.Size) + } + }) + } +} + +func TestFallbackOSS_List(t *testing.T) { + primaryPaths := []oss.OSSPath{{Path: "/a", IsDir: false}, {Path: "/b", IsDir: true}} + fallbackPaths := []oss.OSSPath{{Path: "/b", IsDir: true}, {Path: "/c", IsDir: false}} + + t.Run("merges and deduplicates", func(t *testing.T) { + primary := &mockOSS{ + listFunc: func(prefix string) ([]oss.OSSPath, error) { return primaryPaths, nil }, + } + fallback := &mockOSS{ + listFunc: func(prefix string) ([]oss.OSSPath, error) { return fallbackPaths, nil }, + } + f := NewFallbackOSS(primary, fallback) + + paths, err := f.List("prefix") + if err != nil { + t.Fatalf("List() unexpected error: %v", err) + } + // /a from primary, /b from primary (dedup), /c from fallback + if len(paths) != 3 { + t.Errorf("List() got %d paths, want 3", len(paths)) + } + }) + + t.Run("primary fails returns error", func(t *testing.T) { + primary := &mockOSS{ + listFunc: func(prefix string) ([]oss.OSSPath, error) { return nil, errPrimary }, + } + fallback := &mockOSS{ + listFunc: func(prefix string) ([]oss.OSSPath, error) { return fallbackPaths, nil }, + } + f := NewFallbackOSS(primary, fallback) + + _, err := f.List("prefix") + if err == nil { + t.Error("List() expected error when primary fails") + } + }) + + // NOTE: Current implementation has a bug in List() - when primary succeeds but fallback fails, + // it returns (nil, err) where err is the primary's nil error, effectively swallowing the fallback error. + t.Run("fallback fails returns nil and no data", func(t *testing.T) { + primary := &mockOSS{ + listFunc: func(prefix string) ([]oss.OSSPath, error) { return primaryPaths, nil }, + } + fallback := &mockOSS{ + listFunc: func(prefix string) ([]oss.OSSPath, error) { return nil, errFallback }, + } + f := NewFallbackOSS(primary, fallback) + + paths, err := f.List("prefix") + // Bug: returns (nil, nil) instead of (nil, fallbackErr) or (primaryPaths, nil) + if err != nil { + t.Errorf("List() unexpected error due to current impl: %v", err) + } + if paths != nil { + t.Errorf("List() expected nil paths due to current impl, got %v", paths) + } + }) +} + +func TestFallbackOSS_Delete(t *testing.T) { + t.Run("primary succeeds", func(t *testing.T) { + primary := &mockOSS{ + deleteFunc: func(key string) error { return nil }, + } + fallback := &mockOSS{ + deleteFunc: func(key string) error { return nil }, + } + f := NewFallbackOSS(primary, fallback) + + if err := f.Delete("test-key"); err != nil { + t.Errorf("Delete() unexpected error: %v", err) + } + }) + + t.Run("primary fails", func(t *testing.T) { + primary := &mockOSS{ + deleteFunc: func(key string) error { return errPrimary }, + } + fallback := &mockOSS{ + deleteFunc: func(key string) error { return nil }, + } + f := NewFallbackOSS(primary, fallback) + + err := f.Delete("test-key") + if !errors.Is(err, errPrimary) { + t.Errorf("Delete() expected primary error, got %v", err) + } + }) + + t.Run("fallback error is ignored", func(t *testing.T) { + primary := &mockOSS{ + deleteFunc: func(key string) error { return nil }, + } + fallback := &mockOSS{ + deleteFunc: func(key string) error { return errFallback }, + } + f := NewFallbackOSS(primary, fallback) + + if err := f.Delete("test-key"); err != nil { + t.Errorf("Delete() fallback error should be ignored, got %v", err) + } + }) +} + +func TestFallbackOSS_Type(t *testing.T) { + primary := &mockOSS{typeName: "s3"} + fallback := &mockOSS{typeName: "local"} + f := NewFallbackOSS(primary, fallback) + + if got := f.Type(); got != "s3" { + t.Errorf("Type() = %s, want s3", got) + } +} diff --git a/internal/server/middleware.go b/internal/server/middleware.go index 0947c55b4..0e9e46e68 100644 --- a/internal/server/middleware.go +++ b/internal/server/middleware.go @@ -103,6 +103,11 @@ func (app *App) RedirectPluginInvoke() gin.HandlerFunc { // check if plugin in current node if needRedirecting, originalError := app.pluginManager.NeedRedirecting(identity); needRedirecting { + if app.pluginManager.TryLaunchLocalPlugin(identity) { + log.Info("on-demand launch succeeded, serving locally", "plugin", identity.String()) + ctx.Next() + return + } app.redirectPluginInvokeByPluginIdentifier(ctx, identity, originalError) ctx.Abort() } else { diff --git a/internal/server/server.go b/internal/server/server.go index 383b6a24b..8957095f3 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -15,6 +15,8 @@ import ( "github.com/langgenius/dify-plugin-daemon/pkg/utils/routine" ) +const OSS_TYPE_LOCAL_FILE = "local_file" + func initOSS(config *app.Config) oss.OSS { // init storage var storage oss.OSS @@ -77,6 +79,20 @@ func initOSS(config *app.Config) oss.OSS { log.Panic("failed to create storage", "error", err) } + if config.PluginStorageType != oss.OSS_TYPE_LOCAL && config.PluginStorageType != OSS_TYPE_LOCAL_FILE { + fallbackStorage, fallbackErr := factory.Load("local", oss.OSSArgs{ + Local: &oss.Local{ + Path: config.PluginStorageLocalRoot, + }, + }) + if fallbackErr != nil { + log.Warn("failed to create local fallback storage, running without fallback", "error", fallbackErr) + return storage + } + log.Info("storage: wrapping primary with local fallback", "primary_type", storage.Type()) + return NewFallbackOSS(storage, fallbackStorage) + } + return storage }