Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ require (
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-localereader v0.0.1 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
github.com/mozillazg/go-httpheader v0.2.1 // indirect
github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect
github.com/muesli/cancelreader v0.2.2 // indirect
Expand Down Expand Up @@ -159,6 +160,7 @@ require (
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gorm.io/driver/clickhouse v0.7.0 // indirect
gorm.io/driver/sqlite v1.6.0 // indirect
)

require (
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6T
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI=
github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
Expand Down Expand Up @@ -544,6 +546,8 @@ gorm.io/driver/postgres v1.5.11 h1:ubBVAfbKEUld/twyKZ0IYn9rSQh448EdelLYk9Mv314=
gorm.io/driver/postgres v1.5.11/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI=
gorm.io/driver/sqlite v1.5.0 h1:zKYbzRCpBrT1bNijRnxLDJWPjVfImGEn0lSnUY5gZ+c=
gorm.io/driver/sqlite v1.5.0/go.mod h1:kDMDfntV9u/vuMmz8APHtHF0b4nyBB7sfCieC6G8k8I=
gorm.io/driver/sqlite v1.6.0 h1:WHRRrIiulaPiPFmDcod6prc4l2VGVWHz80KspNsxSfQ=
gorm.io/driver/sqlite v1.6.0/go.mod h1:AO9V1qIQddBESngQUKWL9yoH93HIeA1X6V633rBwyT8=
gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gorm.io/gorm v1.30.0 h1:qbT5aPv1UH8gI99OsRlvDToLxW5zR7FzS9acZDOZcgs=
gorm.io/gorm v1.30.0/go.mod h1:8Z33v652h4//uMA76KjeDH8mJXPm1QNCYrMeatR0DOE=
Expand Down
44 changes: 33 additions & 11 deletions internal/service/install_task_service.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,66 @@
package service

import (
"fmt"

"github.com/langgenius/dify-plugin-daemon/internal/db"
"github.com/langgenius/dify-plugin-daemon/internal/types/exception"
"github.com/langgenius/dify-plugin-daemon/internal/types/models"
"github.com/langgenius/dify-plugin-daemon/pkg/entities"
"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
"golang.org/x/sync/singleflight"
"gorm.io/gorm"
)

var (
installationTasksGroup singleflight.Group
installationTaskGroup singleflight.Group
)
Comment on lines +15 to +18
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using multiple singleflight.Group instances within the same service is redundant when the keys are already prefixed with distinct identifiers (e.g., tasks: and task:). A single singleflight.Group can handle deduplication for all keys in this service, which simplifies the code and reduces global state.

var (
	installationGroup singleflight.Group
)


func FetchPluginInstallationTasks(
tenant_id string,
page int,
page_size int,
) *entities.Response {
tasks, err := db.GetAll[models.InstallTask](
db.Equal("tenant_id", tenant_id),
db.OrderBy("created_at", true),
db.Page(page, page_size),
)
key := fmt.Sprintf("tasks:%s:%d:%d", tenant_id, page, page_size)
v, err, _ := installationTasksGroup.Do(key, func() (interface{}, error) {
tasks, err := db.GetAll[models.InstallTask](
db.Equal("tenant_id", tenant_id),
db.OrderBy("created_at", true),
db.Page(page, page_size),
)
if err != nil {
return nil, err
}
return tasks, nil
})
Comment on lines +26 to +36
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The use of singleflight with reference types (like slices or pointers) can lead to data races. singleflight.Do returns the exact same object to all concurrent callers. If any caller or subsequent logic (e.g., middleware or decorators) modifies the returned slice or its elements before serialization, it will affect all other concurrent requests and cause a data race. Consider returning a deep copy of the result or ensuring the returned objects are treated as strictly immutable.

if err != nil {
return exception.InternalServerError(err).ToResponse()
}

return entities.NewSuccessResponse(tasks)
return entities.NewSuccessResponse(v)
}

func FetchPluginInstallationTask(
tenant_id string,
task_id string,
) *entities.Response {
task, err := db.GetOne[models.InstallTask](
db.Equal("id", task_id),
db.Equal("tenant_id", tenant_id),
)
key := fmt.Sprintf("task:%s:%s", tenant_id, task_id)
v, err, _ := installationTaskGroup.Do(key, func() (interface{}, error) {
task, err := db.GetOne[models.InstallTask](
db.Equal("id", task_id),
db.Equal("tenant_id", tenant_id),
)
if err != nil {
return nil, err
}
return task, nil
})
Comment on lines +49 to +58
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the list fetch, returning a pointer (*models.InstallTask) via singleflight is risky. All concurrent callers receive the same pointer. If one caller modifies the task object (e.g., updating a status field or modifying the Plugins slice), the change is visible to all other callers, leading to potential data races and inconsistent state. It is safer to return a deep copy of the task to each caller.

if err != nil {
return exception.InternalServerError(err).ToResponse()
}

return entities.NewSuccessResponse(task)
return entities.NewSuccessResponse(v)
}

func DeletePluginInstallationTask(
Expand Down
159 changes: 159 additions & 0 deletions internal/service/install_task_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package service

import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/langgenius/dify-plugin-daemon/internal/db"
"github.com/langgenius/dify-plugin-daemon/internal/types/models"
"golang.org/x/sync/singleflight"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)

func setupTestDB(t *testing.T) *gorm.DB {
t.Helper()
testDB, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
if err != nil {
t.Fatalf("failed to open test db: %v", err)
}
sqlDB, err := testDB.DB()
if err != nil {
t.Fatalf("failed to get underlying sql.DB: %v", err)
}
sqlDB.SetMaxOpenConns(1)

if err := testDB.AutoMigrate(&models.InstallTask{}); err != nil {
t.Fatalf("failed to auto migrate: %v", err)
}
db.DifyPluginDB = testDB
t.Cleanup(func() {
sqlDB.Close()
})
return testDB
}

func TestFetchPluginInstallationTasks_Singleflight(t *testing.T) {
testDB := setupTestDB(t)
installationTasksGroup = singleflight.Group{}

var queryCount atomic.Int32
testDB.Callback().Query().Before("gorm:query").Register("test:count_tasks", func(tx *gorm.DB) {
queryCount.Add(1)
time.Sleep(100 * time.Millisecond)
})
defer testDB.Callback().Query().Remove("test:count_tasks")

const concurrency = 50
var wg sync.WaitGroup
wg.Add(concurrency)
start := make(chan struct{})
errs := make([]int, concurrency)

for i := 0; i < concurrency; i++ {
go func(idx int) {
defer wg.Done()
<-start
resp := FetchPluginInstallationTasks("tenant-1", 1, 10)
errs[idx] = resp.Code
}(i)
}

close(start)
wg.Wait()

for i, code := range errs {
if code != 0 {
t.Errorf("goroutine %d: expected code 0, got %d", i, code)
}
}

if count := queryCount.Load(); count != 1 {
t.Errorf("singleflight not working: expected 1 db query for same key, got %d", count)
}
}

func TestFetchPluginInstallationTask_Singleflight(t *testing.T) {
testDB := setupTestDB(t)
installationTaskGroup = singleflight.Group{}

// Insert a test record before registering the callback.
task := models.InstallTask{
TenantID: "tenant-1",
Status: models.InstallTaskStatusPending,
TotalPlugins: 1,
}
if err := testDB.Create(&task).Error; err != nil {
t.Fatalf("failed to create test task: %v", err)
}

var queryCount atomic.Int32
testDB.Callback().Query().Before("gorm:query").Register("test:count_task", func(tx *gorm.DB) {
queryCount.Add(1)
time.Sleep(100 * time.Millisecond)
})
defer testDB.Callback().Query().Remove("test:count_task")

const concurrency = 50
var wg sync.WaitGroup
wg.Add(concurrency)
start := make(chan struct{})
errs := make([]int, concurrency)

for i := 0; i < concurrency; i++ {
go func(idx int) {
defer wg.Done()
<-start
resp := FetchPluginInstallationTask("tenant-1", task.ID)
errs[idx] = resp.Code
}(i)
}

close(start)
wg.Wait()

for i, code := range errs {
if code != 0 {
t.Errorf("goroutine %d: expected code 0, got %d", i, code)
}
}

if count := queryCount.Load(); count != 1 {
t.Errorf("singleflight not working: expected 1 db query for same key, got %d", count)
}
}

func TestFetchPluginInstallationTasks_DifferentKeysNotDeduplicated(t *testing.T) {
testDB := setupTestDB(t)
installationTasksGroup = singleflight.Group{}

var queryCount atomic.Int32
testDB.Callback().Query().Before("gorm:query").Register("test:count_diff", func(tx *gorm.DB) {
queryCount.Add(1)
time.Sleep(50 * time.Millisecond)
})
defer testDB.Callback().Query().Remove("test:count_diff")

const numKeys = 3
var wg sync.WaitGroup
wg.Add(numKeys)
start := make(chan struct{})

for i := 0; i < numKeys; i++ {
go func(idx int) {
defer wg.Done()
<-start
FetchPluginInstallationTasks(fmt.Sprintf("tenant-%d", idx), 1, 10)
}(i)
}

close(start)
wg.Wait()

if count := queryCount.Load(); count != int32(numKeys) {
t.Errorf("different keys should not be deduplicated: expected %d db queries, got %d", numKeys, count)
}
}
Loading