Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"

"github.com/joho/godotenv"
"github.com/kelseyhightower/envconfig"
"github.com/langgenius/dify-plugin-daemon/internal/server"
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
Expand All @@ -11,7 +12,7 @@ import (

func main() {
var config app.Config

_ = godotenv.Load()
err := envconfig.Process("", &config)
if err != nil {
log.Panic("error processing environment variables", "error", err)
Expand Down
10 changes: 6 additions & 4 deletions internal/core/debugging_runtime/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (s *DifyServer) OnBoot(c gnet.Engine) (action gnet.Action) {

func (s *DifyServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
// new plugin connected
log.Info("new debugging connection", "remote", c.RemoteAddr().String())
c.SetContext(&codec{})
runtime := &RemotePluginRuntime{
MediaTransport: basic_runtime.NewMediaTransport(
Expand Down Expand Up @@ -104,10 +105,11 @@ func (s *DifyServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
// close plugin
plugin.cleanupResources()

// trigger runtime disconnected event
s.WalkNotifiers(func(notifier PluginRuntimeNotifier) {
notifier.OnRuntimeDisconnected(plugin)
})
if plugin.initialized {
s.WalkNotifiers(func(notifier PluginRuntimeNotifier) {
notifier.OnRuntimeDisconnected(plugin)
})
}

// decrease current connection
atomic.AddInt32(&s.currentConn, -1)
Expand Down
38 changes: 38 additions & 0 deletions internal/core/plugin_manager/manager.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package plugin_manager

import (
"context"
"errors"
"fmt"
"strings"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/langgenius/dify-cloud-kit/oss"
Expand Down Expand Up @@ -214,6 +217,41 @@ func (c *PluginManager) NeedRedirecting(
return true, nil
}

// TryLaunchLocalPlugin attempts to launch a local plugin on-demand.
// This is used as a fallback when the runtime is not found during dispatch,
// instead of waiting for the WatchDog's 30s periodic scan.
// Returns true if the plugin was successfully launched and is ready.
func (p *PluginManager) TryLaunchLocalPlugin(
identity plugin_entities.PluginUniqueIdentifier,
) bool {
if p.config.Platform != app.PLATFORM_LOCAL {
return false
}

exists, err := p.installedBucket.Exists(identity)
if err != nil || !exists {
return false
}

_, ch, err := p.controlPanel.LaunchLocalPlugin(context.Background(), identity)
if err != nil {
if errors.Is(err, controlpanel.ErrorPluginAlreadyLaunched) {
// another goroutine launched it between our check and now
return true
}
log.Warn("on-demand launch failed", "plugin", identity.String(), "error", err)
return false
}

select {
case err := <-ch:
return err == nil
case <-time.After(2 * time.Minute):
log.Warn("on-demand launch timed out", "plugin", identity.String())
return false
}
}

func pluginAssetCacheKey(
pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
path string,
Expand Down
121 changes: 121 additions & 0 deletions internal/server/fallback_oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package server

import (
"github.com/langgenius/dify-cloud-kit/oss"
"github.com/langgenius/dify-plugin-daemon/pkg/utils/log"
)

type FallbackOSS struct {
primary oss.OSS
fallback oss.OSS
}

func NewFallbackOSS(primary oss.OSS, fallback oss.OSS) *FallbackOSS {
return &FallbackOSS{
primary: primary,
fallback: fallback,
}
}

func (f *FallbackOSS) Save(key string, data []byte) error {
primaryErr := f.primary.Save(key, data)
if primaryErr != nil {
// primary failed, save to fallback synchronously to ensure data is available
log.Warn("fallback: primary Save failed, saving to local fallback", "key", key, "error", primaryErr)
if fallbackErr := f.fallback.Save(key, data); fallbackErr != nil {
log.Error("fallback: both primary and local Save failed", "key", key, "primaryError", primaryErr, "fallbackError", fallbackErr)
return primaryErr
}
return nil
}
return nil
}

func (f *FallbackOSS) Load(key string) ([]byte, error) {
data, err := f.primary.Load(key)
if err == nil {
return data, nil
}

fallbackData, fallbackErr := f.fallback.Load(key)
if fallbackErr != nil {
return nil, err
}

log.Warn("fallback: primary Load failed, using local cache", "key", key, "error", err)
return fallbackData, nil
}

func (f *FallbackOSS) Exists(key string) (bool, error) {
exists, err := f.primary.Exists(key)
if err == nil && exists {
return true, nil
}

fallbackExists, fallbackErr := f.fallback.Exists(key)
if fallbackErr == nil && fallbackExists {
log.Warn("fallback: primary Exists failed or returned false, found in local cache", "key", key, "error", err)
return true, nil
}

if err != nil {
return false, err
}
return false, nil
}

func (f *FallbackOSS) State(key string) (oss.OSSState, error) {
state, err := f.primary.State(key)
if err == nil {
return state, nil
}

fallbackState, fallbackErr := f.fallback.State(key)
if fallbackErr != nil {
return oss.OSSState{}, err
}

log.Warn("fallback: primary State failed, using local cache", "key", key, "error", err)
return fallbackState, nil
}

func (f *FallbackOSS) List(prefix string) ([]oss.OSSPath, error) {
allPaths := make([]oss.OSSPath, 0)
duplicatePaths := make(map[string]bool)

paths, err := f.primary.List(prefix)
if err != nil {
return nil, err
}

fallbackPaths, fallbackErr := f.fallback.List(prefix)
if fallbackErr != nil {
return nil, err
}

for _, path := range paths {
if _, ok := duplicatePaths[path.Path]; !ok {
allPaths = append(allPaths, path)
duplicatePaths[path.Path] = true
}
}

for _, path := range fallbackPaths {
if _, ok := duplicatePaths[path.Path]; !ok {
allPaths = append(allPaths, path)
duplicatePaths[path.Path] = true
}
}

log.Warn("fallback: primary List failed, using local cache", "prefix", prefix, "error", err)
return allPaths, nil
}

func (f *FallbackOSS) Delete(key string) error {
_ = f.fallback.Delete(key)
return f.primary.Delete(key)
}

func (f *FallbackOSS) Type() string {
return f.primary.Type()
}
Loading
Loading