From 4c6dd5fb65447dbc1ebe3b22aaedbb7d911f5b43 Mon Sep 17 00:00:00 2001 From: Muhammad Abduh Date: Mon, 2 Feb 2026 15:55:43 +0700 Subject: [PATCH] feat: add tolerations and affinity job --- modules/dagger/driver.go | 7 +- modules/firehose/driver.go | 9 +- modules/firehose/driver_plan_update_test.go | 7 +- modules/firehose/driver_test.go | 33 +-- modules/job/driver/driver_test.go | 213 ++++++++++++++++++++ modules/job/driver/plan.go | 1 + modules/job/driver/sync.go | 29 ++- modules/kubernetes/driver.go | 9 +- modules/kubernetes/output.go | 40 +--- pkg/kube/job/job.go | 3 +- pkg/kube/pod/pod.go | 108 ++++++++++ 11 files changed, 394 insertions(+), 65 deletions(-) create mode 100644 modules/job/driver/driver_test.go diff --git a/modules/dagger/driver.go b/modules/dagger/driver.go index 3c962356..9eef8afc 100644 --- a/modules/dagger/driver.go +++ b/modules/dagger/driver.go @@ -18,6 +18,7 @@ import ( "github.com/goto/entropy/pkg/errors" "github.com/goto/entropy/pkg/helm" "github.com/goto/entropy/pkg/kube" + "github.com/goto/entropy/pkg/kube/pod" ) const ( @@ -107,7 +108,7 @@ type driverConf struct { // timeout value for a kube deployment run KubeDeployTimeout int `json:"kube_deploy_timeout_seconds"` - NodeAffinityMatchExpressions kubernetes.NodeAffinityMatchExpressions `json:"node_affinity_match_expressions"` + NodeAffinityMatchExpressions pod.NodeAffinityMatchExpressions `json:"node_affinity_match_expressions"` } type Output struct { @@ -234,8 +235,8 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config, }) } - requiredDuringSchedulingIgnoredDuringExecution := []kubernetes.Preference{} - preferredDuringSchedulingIgnoredDuringExecution := []kubernetes.WeightedPreference{} + requiredDuringSchedulingIgnoredDuringExecution := []pod.Preference{} + preferredDuringSchedulingIgnoredDuringExecution := []pod.WeightedPreference{} affinityKey := daggerTaintKey if affinity, ok := kubeOut.Affinities[affinityKey]; ok { diff --git a/modules/firehose/driver.go b/modules/firehose/driver.go index de28b98f..4fc4fc28 100644 --- a/modules/firehose/driver.go +++ b/modules/firehose/driver.go @@ -16,6 +16,7 @@ import ( "github.com/goto/entropy/pkg/errors" "github.com/goto/entropy/pkg/helm" "github.com/goto/entropy/pkg/kube" + "github.com/goto/entropy/pkg/kube/pod" ) const ( @@ -113,7 +114,7 @@ type driverConf struct { // Tolerations represents the tolerations to be set for the deployment. // The key in the map is the sink-type in upper case. - Tolerations map[string]kubernetes.Toleration `json:"tolerations"` + Tolerations map[string]pod.Toleration `json:"tolerations"` EnvVariables map[string]string `json:"env_variables,omitempty"` @@ -137,7 +138,7 @@ type driverConf struct { RequestsAndLimits map[string]RequestsAndLimits `json:"requests_and_limits" validate:"required"` // NodeAffinityMatchExpressions can be used to set node-affinity for the deployment. - NodeAffinityMatchExpressions kubernetes.NodeAffinityMatchExpressions `json:"node_affinity_match_expressions"` + NodeAffinityMatchExpressions pod.NodeAffinityMatchExpressions `json:"node_affinity_match_expressions"` // delay between stopping a firehose and making an offset reset request OffsetResetDelaySeconds int `json:"offset_reset_delay_seconds"` @@ -272,8 +273,8 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config, mountSecrets := []map[string]any{} - requiredDuringSchedulingIgnoredDuringExecution := []kubernetes.Preference{} - preferredDuringSchedulingIgnoredDuringExecution := []kubernetes.WeightedPreference{} + requiredDuringSchedulingIgnoredDuringExecution := []pod.Preference{} + preferredDuringSchedulingIgnoredDuringExecution := []pod.WeightedPreference{} var affinityKey = "" affinityMode := kubeOut.AffinityMode[resourceName] diff --git a/modules/firehose/driver_plan_update_test.go b/modules/firehose/driver_plan_update_test.go index 1c4ab8e4..2cc1b255 100644 --- a/modules/firehose/driver_plan_update_test.go +++ b/modules/firehose/driver_plan_update_test.go @@ -14,6 +14,7 @@ import ( "github.com/goto/entropy/modules/kubernetes" "github.com/goto/entropy/pkg/errors" "github.com/goto/entropy/pkg/kube" + "github.com/goto/entropy/pkg/kube/pod" ) func TestFirehoseDriver_Plan_Update(t *testing.T) { @@ -65,7 +66,7 @@ func TestFirehoseDriver_Plan_Update(t *testing.T) { "kube_cluster": { Kind: "kubernetes", Output: modules.MustJSON(kubernetes.Output{ - Tolerations: map[string][]kubernetes.Toleration{}, + Tolerations: map[string][]pod.Toleration{}, }), }, }, @@ -175,7 +176,7 @@ func TestFirehoseDriver_Plan_Update(t *testing.T) { Configs: kube.Config{ Namespace: "overriden-namespace", }, - Tolerations: map[string][]kubernetes.Toleration{}, + Tolerations: map[string][]pod.Toleration{}, }), }, }, @@ -281,7 +282,7 @@ func TestFirehoseDriver_Plan_Update(t *testing.T) { "kube_cluster": { Kind: "kubernetes", Output: modules.MustJSON(kubernetes.Output{ - Tolerations: map[string][]kubernetes.Toleration{}, + Tolerations: map[string][]pod.Toleration{}, }), }, }, diff --git a/modules/firehose/driver_test.go b/modules/firehose/driver_test.go index 0f067234..ba718ea7 100644 --- a/modules/firehose/driver_test.go +++ b/modules/firehose/driver_test.go @@ -12,6 +12,7 @@ import ( "github.com/goto/entropy/modules/kubernetes" "github.com/goto/entropy/pkg/errors" "github.com/goto/entropy/pkg/helm" + "github.com/goto/entropy/pkg/kube/pod" ) func TestFirehoseDriver(t *testing.T) { @@ -57,7 +58,7 @@ func TestFirehoseDriver(t *testing.T) { }, }, kubeOutput: kubernetes.Output{ - Tolerations: map[string][]kubernetes.Toleration{ + Tolerations: map[string][]pod.Toleration{ "firehose_LOG": { { Key: "key1", @@ -149,10 +150,10 @@ func TestFirehoseDriver(t *testing.T) { }, }, "nodeAffinityMatchExpressions": map[string]any{ - "preferredDuringSchedulingIgnoredDuringExecution": []kubernetes.WeightedPreference{ + "preferredDuringSchedulingIgnoredDuringExecution": []pod.WeightedPreference{ { Weight: 1, - Preference: []kubernetes.Preference{ + Preference: []pod.Preference{ { Key: "another-node-label-key", Operator: "In", @@ -161,7 +162,7 @@ func TestFirehoseDriver(t *testing.T) { }, }, }, - "requiredDuringSchedulingIgnoredDuringExecution": []kubernetes.Preference{ + "requiredDuringSchedulingIgnoredDuringExecution": []pod.Preference{ { Key: "topology.kubernetes.io/zone", Operator: "In", @@ -234,7 +235,7 @@ func TestFirehoseDriver(t *testing.T) { }, }, kubeOutput: kubernetes.Output{ - Tolerations: map[string][]kubernetes.Toleration{ + Tolerations: map[string][]pod.Toleration{ "firehose_LOG": { { Key: "key1", @@ -326,10 +327,10 @@ func TestFirehoseDriver(t *testing.T) { }, }, "nodeAffinityMatchExpressions": map[string]any{ - "preferredDuringSchedulingIgnoredDuringExecution": []kubernetes.WeightedPreference{ + "preferredDuringSchedulingIgnoredDuringExecution": []pod.WeightedPreference{ { Weight: 1, - Preference: []kubernetes.Preference{ + Preference: []pod.Preference{ { Key: "another-node-label-key", Operator: "In", @@ -338,7 +339,7 @@ func TestFirehoseDriver(t *testing.T) { }, }, }, - "requiredDuringSchedulingIgnoredDuringExecution": []kubernetes.Preference{ + "requiredDuringSchedulingIgnoredDuringExecution": []pod.Preference{ { Key: "topology.kubernetes.io/zone", Operator: "In", @@ -411,7 +412,7 @@ func TestFirehoseDriver(t *testing.T) { }, }, kubeOutput: kubernetes.Output{ - Tolerations: map[string][]kubernetes.Toleration{ + Tolerations: map[string][]pod.Toleration{ "firehose_LOG": { { Key: "key1", @@ -511,10 +512,10 @@ func TestFirehoseDriver(t *testing.T) { }, }, "nodeAffinityMatchExpressions": map[string]any{ - "preferredDuringSchedulingIgnoredDuringExecution": []kubernetes.WeightedPreference{ + "preferredDuringSchedulingIgnoredDuringExecution": []pod.WeightedPreference{ { Weight: 1, - Preference: []kubernetes.Preference{ + Preference: []pod.Preference{ { Key: "another-node-label-key", Operator: "In", @@ -523,7 +524,7 @@ func TestFirehoseDriver(t *testing.T) { }, }, }, - "requiredDuringSchedulingIgnoredDuringExecution": []kubernetes.Preference{ + "requiredDuringSchedulingIgnoredDuringExecution": []pod.Preference{ { Key: "topology.kubernetes.io/zone", Operator: "In", @@ -597,18 +598,18 @@ func TestFirehoseDriver(t *testing.T) { func firehoseDriverConf() driverConf { return driverConf{ KubeDeployTimeout: 60, - NodeAffinityMatchExpressions: kubernetes.NodeAffinityMatchExpressions{ - RequiredDuringSchedulingIgnoredDuringExecution: []kubernetes.Preference{ + NodeAffinityMatchExpressions: pod.NodeAffinityMatchExpressions{ + RequiredDuringSchedulingIgnoredDuringExecution: []pod.Preference{ { Key: "topology.kubernetes.io/zone", Operator: "In", Values: []string{"antarctica-east1", "antarctica-west1"}, }, }, - PreferredDuringSchedulingIgnoredDuringExecution: []kubernetes.WeightedPreference{ + PreferredDuringSchedulingIgnoredDuringExecution: []pod.WeightedPreference{ { Weight: 1, - Preference: []kubernetes.Preference{ + Preference: []pod.Preference{ { Key: "another-node-label-key", Operator: "In", diff --git a/modules/job/driver/driver_test.go b/modules/job/driver/driver_test.go new file mode 100644 index 00000000..c8797482 --- /dev/null +++ b/modules/job/driver/driver_test.go @@ -0,0 +1,213 @@ +package driver + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/goto/entropy/core/resource" + "github.com/goto/entropy/modules" + "github.com/goto/entropy/modules/job/config" + "github.com/goto/entropy/modules/kubernetes" + kubejob "github.com/goto/entropy/pkg/kube/job" + "github.com/goto/entropy/pkg/kube/pod" +) + +func TestDriver(t *testing.T) { + t.Parallel() + + table := []struct { + title string + res resource.Resource + kubeOutput kubernetes.Output + want *kubejob.Job + wantErr error + }{ + { + title: "default flow", + res: resource.Resource{ + URN: "orn:entropy:job:test-1", + Kind: "job", + Name: "test-1", + Project: "project-1", + Labels: map[string]string{ + "team": "team-1", + }, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + UpdatedBy: "john.doe@goto.com", + CreatedBy: "john.doe@goto.com", + Spec: resource.Spec{ + Configs: []byte(`{ + "env_variables": { + "SINK_TYPE": "LOG", + "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", + "SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz", + "SOURCE_KAFKA_BROKERS": "localhost:9092", + "SOURCE_KAFKA_TOPIC": "foo-log" + }, + "replicas": 1, + "namespace": "namespace-1" + }`), + Dependencies: map[string]string{}, + }, + State: resource.State{ + Status: resource.StatusPending, + Output: nil, + }, + }, + kubeOutput: kubernetes.Output{}, + want: &kubejob.Job{ + Name: "project-1-test-1-job", + Namespace: "namespace-1", + Labels: map[string]string{ + "name": "test-1", + "orchestrator": "entropy", + }, + Pod: &pod.Pod{ + Name: "project-1-test-1-job", + Labels: map[string]string{ + "app": "project-1-test-1-job", + }, + }, + Parallelism: func() *int32 { v := int32(1); return &v }(), + BackOffList: func() *int32 { v := int32(0); return &v }(), + TTLSeconds: func() *int32 { v := int32(172800); return &v }(), + }, + wantErr: nil, + }, + { + title: "with toleration and affinity", + res: resource.Resource{ + URN: "orn:entropy:job:test-1", + Kind: "job", + Name: "test-1", + Project: "project-1", + Labels: map[string]string{ + "team": "team-1", + }, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + UpdatedBy: "john.doe@goto.com", + CreatedBy: "john.doe@goto.com", + Spec: resource.Spec{ + Configs: []byte(`{ + "env_variables": { + "SINK_TYPE": "LOG", + "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", + "SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz", + "SOURCE_KAFKA_BROKERS": "localhost:9092", + "SOURCE_KAFKA_TOPIC": "foo-log" + }, + "replicas": 1, + "namespace": "namespace-1" + }`), + Dependencies: map[string]string{}, + }, + State: resource.State{ + Status: resource.StatusPending, + Output: nil, + }, + }, + kubeOutput: kubernetes.Output{ + Affinities: map[string]pod.NodeAffinityMatchExpressions{ + "job": { + RequiredDuringSchedulingIgnoredDuringExecution: []pod.Preference{ + { + Key: "name", + Operator: "In", + Values: []string{"nodepool-1"}, + }, + }, + }, + }, + Tolerations: map[string][]pod.Toleration{ + "job": { + { + Key: "key1", + Operator: "Equal", + Value: "value1", + Effect: "NoSchedule", + }, + }, + "firehose_BIGQUERY": { + { + Key: "key2", + Operator: "Equal", + Value: "value2", + Effect: "NoSchedule", + }, + }, + "firehose_BLOB": { + { + Key: "key3", + Operator: "Equal", + Value: "value3", + Effect: "NoSchedule", + }, + }, + }, + }, + want: &kubejob.Job{ + Name: "project-1-test-1-job", + Namespace: "namespace-1", + Labels: map[string]string{ + "name": "test-1", + "orchestrator": "entropy", + }, + Pod: &pod.Pod{ + Name: "project-1-test-1-job", + Labels: map[string]string{ + "app": "project-1-test-1-job", + }, + Tolerations: []pod.Toleration{ + { + Key: "key1", + Operator: "Equal", + Value: "value1", + Effect: "NoSchedule", + }, + }, + NodeAffinityMatchExpressions: &pod.NodeAffinityMatchExpressions{ + RequiredDuringSchedulingIgnoredDuringExecution: []pod.Preference{ + { + Key: "name", + Operator: "In", + Values: []string{"nodepool-1"}, + }, + }, + }, + }, + Parallelism: func() *int32 { v := int32(1); return &v }(), + BackOffList: func() *int32 { v := int32(0); return &v }(), + TTLSeconds: func() *int32 { v := int32(172800); return &v }(), + }, + wantErr: nil, + }, + } + + for _, tt := range table { + t.Run(tt.title, func(t *testing.T) { + drv := &Driver{ + Conf: driverConf(), + } + + conf, err := config.ReadConfig(tt.res, tt.res.Spec.Configs, drv.Conf) + require.NoError(t, err) + + job := drv.getJob(tt.res, conf, tt.kubeOutput) + + require.NotNil(t, job) + + wantJSON := string(modules.MustJSON(tt.want)) + gotJSON := string(modules.MustJSON(job)) + assert.JSONEq(t, wantJSON, gotJSON) + }) + } +} + +func driverConf() config.DriverConf { + return config.DriverConf{} +} diff --git a/modules/job/driver/plan.go b/modules/job/driver/plan.go index c3699d52..9f347e59 100644 --- a/modules/job/driver/plan.go +++ b/modules/job/driver/plan.go @@ -40,6 +40,7 @@ func (driver *Driver) planCreate(exr module.ExpandedResource, act module.ActionR func planPendingWithConf(conf *config.Config, exr module.ExpandedResource, steps []PendingStep) (*resource.Resource, error) { immediately := time.Now() + exr.Resource.Spec.Configs = modules.MustJSON(conf) exr.Resource.State = resource.State{ Status: resource.StatusPending, diff --git a/modules/job/driver/sync.go b/modules/job/driver/sync.go index 23b0b57a..69eb9ea0 100644 --- a/modules/job/driver/sync.go +++ b/modules/job/driver/sync.go @@ -20,12 +20,15 @@ const ( labelOrchestrator = "orchestrator" labelName = "name" orchestratorLabelValue = "entropy" + + resourceName = "job" + // Num retries before failing. backoffLimit int32 = 0 ) func (driver *Driver) create(ctx context.Context, r resource.Resource, config *config.Config, out kubernetes.Output) error { - j := getJob(r, config) + j := driver.getJob(r, config, out) if err := driver.CreateJob(ctx, out.Configs, j); err != nil { return errors.ErrInternal.WithCausef("%s", err.Error()) } @@ -56,7 +59,7 @@ func (driver *Driver) start(ctx context.Context, config *config.Config, out kube return nil } -func getJob(res resource.Resource, conf *config.Config) *job.Job { +func (driver *Driver) getJob(res resource.Resource, conf *config.Config, kubeOut kubernetes.Output) *job.Job { constantLabels := map[string]string{ labelOrchestrator: orchestratorLabelValue, labelName: res.Name, @@ -111,6 +114,28 @@ func getJob(res resource.Resource, conf *config.Config) *job.Job { // This label is to support `app` filter on pod for getting the logs until we find better solution Labels: map[string]string{"app": conf.Name}, } + + var tolerations []pod.Toleration + for _, t := range kubeOut.Tolerations[resourceName] { + tolerations = append(tolerations, pod.Toleration{ + Key: t.Key, + Value: t.Value, + Effect: t.Effect, + Operator: t.Operator, + }) + } + + if len(tolerations) > 0 { + p.Tolerations = tolerations + } + + if aff, ok := kubeOut.Affinities[resourceName]; ok { + p.NodeAffinityMatchExpressions = &pod.NodeAffinityMatchExpressions{ + RequiredDuringSchedulingIgnoredDuringExecution: aff.RequiredDuringSchedulingIgnoredDuringExecution, + PreferredDuringSchedulingIgnoredDuringExecution: aff.PreferredDuringSchedulingIgnoredDuringExecution, + } + } + limit := backoffLimit j := &job.Job{ Pod: p, diff --git a/modules/kubernetes/driver.go b/modules/kubernetes/driver.go index 4af33668..cf8a8e9e 100644 --- a/modules/kubernetes/driver.go +++ b/modules/kubernetes/driver.go @@ -10,13 +10,14 @@ import ( "github.com/goto/entropy/core/resource" "github.com/goto/entropy/pkg/errors" "github.com/goto/entropy/pkg/kube" + "github.com/goto/entropy/pkg/kube/pod" ) type kubeDriver struct { - TolerationMode map[string]string `json:"toleration_mode"` - Tolerations map[string][]Toleration `json:"tolerations"` - AffinityMode map[string]string `json:"affinity_mode"` - Affinities map[string]NodeAffinityMatchExpressions `json:"affinities"` + TolerationMode map[string]string `json:"toleration_mode"` + Tolerations map[string][]pod.Toleration `json:"tolerations"` + AffinityMode map[string]string `json:"affinity_mode"` + Affinities map[string]pod.NodeAffinityMatchExpressions `json:"affinities"` } func (m *kubeDriver) Plan(ctx context.Context, res module.ExpandedResource, diff --git a/modules/kubernetes/output.go b/modules/kubernetes/output.go index 7c43e42e..dc03ba8d 100644 --- a/modules/kubernetes/output.go +++ b/modules/kubernetes/output.go @@ -7,39 +7,17 @@ import ( "k8s.io/apimachinery/pkg/version" "github.com/goto/entropy/pkg/kube" + "github.com/goto/entropy/pkg/kube/pod" "github.com/mitchellh/mapstructure" ) type Output struct { - Configs kube.Config `json:"configs"` - ServerInfo version.Info `json:"server_info"` - TolerationMode map[string]string `json:"toleration_mode"` - Tolerations map[string][]Toleration `json:"tolerations"` - AffinityMode map[string]string `json:"affinity_mode"` - Affinities map[string]NodeAffinityMatchExpressions `json:"affinities"` -} - -type Toleration struct { - Key string `json:"key"` - Value string `json:"value"` - Effect string `json:"effect"` - Operator string `json:"operator"` -} - -type NodeAffinityMatchExpressions struct { - RequiredDuringSchedulingIgnoredDuringExecution []Preference `json:"requiredDuringSchedulingIgnoredDuringExecution,omitempty"` - PreferredDuringSchedulingIgnoredDuringExecution []WeightedPreference `json:"preferredDuringSchedulingIgnoredDuringExecution,omitempty"` -} - -type WeightedPreference struct { - Weight int `json:"weight" validate:"required"` - Preference []Preference `json:"preference" validate:"required"` -} - -type Preference struct { - Key string `json:"key" validate:"required"` - Operator string `json:"operator" validate:"required"` - Values []string `json:"values"` + Configs kube.Config `json:"configs"` + ServerInfo version.Info `json:"server_info"` + TolerationMode map[string]string `json:"toleration_mode"` + Tolerations map[string][]pod.Toleration `json:"tolerations"` + AffinityMode map[string]string `json:"affinity_mode"` + Affinities map[string]pod.NodeAffinityMatchExpressions `json:"affinities"` } func (out Output) JSON() []byte { @@ -50,7 +28,7 @@ func (out Output) JSON() []byte { return b } -func PreferenceSliceToInterfaceSlice(prefs []Preference) []map[string]interface{} { +func PreferenceSliceToInterfaceSlice(prefs []pod.Preference) []map[string]interface{} { result := make([]map[string]interface{}, len(prefs)) for i, pref := range prefs { @@ -69,7 +47,7 @@ func PreferenceSliceToInterfaceSlice(prefs []Preference) []map[string]interface{ return result } -func WeightedPreferencesToInterfaceSlice(weightedPrefs []WeightedPreference) []map[string]interface{} { +func WeightedPreferencesToInterfaceSlice(weightedPrefs []pod.WeightedPreference) []map[string]interface{} { result := make([]map[string]interface{}, len(weightedPrefs)) for i, wp := range weightedPrefs { diff --git a/pkg/kube/job/job.go b/pkg/kube/job/job.go index 2028f54f..f2268884 100644 --- a/pkg/kube/job/job.go +++ b/pkg/kube/job/job.go @@ -3,10 +3,9 @@ package job import ( "strings" + "github.com/goto/entropy/pkg/kube/pod" v1 "k8s.io/api/batch/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/goto/entropy/pkg/kube/pod" ) const WatchTimeout int64 = 60 diff --git a/pkg/kube/pod/pod.go b/pkg/kube/pod/pod.go index a998057e..060731c0 100644 --- a/pkg/kube/pod/pod.go +++ b/pkg/kube/pod/pod.go @@ -8,14 +8,120 @@ import ( "github.com/goto/entropy/pkg/kube/volume" ) +type Toleration struct { + Key string `json:"key"` + Value string `json:"value"` + Effect string `json:"effect"` + Operator string `json:"operator"` +} + +func (t *Toleration) ToCoreV1() *corev1.Toleration { + if t == nil { + return nil + } + return &corev1.Toleration{ + Key: t.Key, + Value: t.Value, + Effect: corev1.TaintEffect(t.Effect), + Operator: corev1.TolerationOperator(t.Operator), + } +} + +type NodeAffinityMatchExpressions struct { + RequiredDuringSchedulingIgnoredDuringExecution []Preference `json:"requiredDuringSchedulingIgnoredDuringExecution,omitempty"` + PreferredDuringSchedulingIgnoredDuringExecution []WeightedPreference `json:"preferredDuringSchedulingIgnoredDuringExecution,omitempty"` +} + +type WeightedPreference struct { + Weight int `json:"weight" validate:"required"` + Preference []Preference `json:"preference" validate:"required"` +} + +type Preference struct { + Key string `json:"key" validate:"required"` + Operator string `json:"operator" validate:"required"` + Values []string `json:"values"` +} + +func (na *NodeAffinityMatchExpressions) ToCoreV1() *corev1.NodeAffinity { + if na == nil { + return nil + } + + nodeSelectorsTerm := []corev1.NodeSelectorTerm{} + if len(na.RequiredDuringSchedulingIgnoredDuringExecution) > 0 { + nodeSelectorsTerm = append(nodeSelectorsTerm, corev1.NodeSelectorTerm{ + MatchExpressions: func() []corev1.NodeSelectorRequirement { + var reqs []corev1.NodeSelectorRequirement + for _, expr := range na.RequiredDuringSchedulingIgnoredDuringExecution { + reqs = append(reqs, corev1.NodeSelectorRequirement{ + Key: expr.Key, + Operator: corev1.NodeSelectorOperator(expr.Operator), + Values: expr.Values, + }) + } + return reqs + }(), + }) + } + + preferredSchedulingTerm := []corev1.PreferredSchedulingTerm{} + if len(na.PreferredDuringSchedulingIgnoredDuringExecution) > 0 { + for _, wp := range na.PreferredDuringSchedulingIgnoredDuringExecution { + preferredSchedulingTerm = append(preferredSchedulingTerm, corev1.PreferredSchedulingTerm{ + Weight: int32(wp.Weight), + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: func() []corev1.NodeSelectorRequirement { + var reqs []corev1.NodeSelectorRequirement + for _, expr := range wp.Preference { + reqs = append(reqs, corev1.NodeSelectorRequirement{ + Key: expr.Key, + Operator: corev1.NodeSelectorOperator(expr.Operator), + Values: expr.Values, + }) + } + return reqs + }(), + }, + }) + } + } + + return &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: nodeSelectorsTerm, + }, + PreferredDuringSchedulingIgnoredDuringExecution: preferredSchedulingTerm, + } +} + type Pod struct { Name string Containers []container.Container Volumes []volume.Volume Labels map[string]string + // Tolerations represents the tolerations to be set for the deployment. + // The key in the map is the sink-type in upper case. + Tolerations []Toleration + // NodeAffinityMatchExpressions can be used to set node-affinity for the deployment. + NodeAffinityMatchExpressions *NodeAffinityMatchExpressions } func (p Pod) Template() corev1.PodTemplateSpec { + var tolerations []corev1.Toleration + + if len(p.Tolerations) > 0 { + for _, t := range p.Tolerations { + tolerations = append(tolerations, *t.ToCoreV1()) + } + } + + var affinity *corev1.Affinity + + if p.NodeAffinityMatchExpressions != nil { + affinity = &corev1.Affinity{NodeAffinity: p.NodeAffinityMatchExpressions.ToCoreV1()} + } + var containers []corev1.Container for _, c := range p.Containers { containers = append(containers, c.Template()) @@ -34,6 +140,8 @@ func (p Pod) Template() corev1.PodTemplateSpec { Labels: p.Labels, }, Spec: corev1.PodSpec{ + Tolerations: tolerations, + Affinity: affinity, Containers: containers, Volumes: volumes, RestartPolicy: corev1.RestartPolicyNever,