diff --git a/modules/firehose/driver_test.go b/modules/firehose/driver_test.go index ba718ea7..b1d758bd 100644 --- a/modules/firehose/driver_test.go +++ b/modules/firehose/driver_test.go @@ -564,6 +564,175 @@ func TestFirehoseDriver(t *testing.T) { }, wantErr: nil, }, + { + title: "KAFKA_Sink", + res: resource.Resource{ + URN: "orn:entropy:firehose:project-1:resource-4-firehose", + Kind: "firehose", + Name: "resource-4", + Project: "project-1", + Labels: map[string]string{ + "team": "team-4", + }, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + UpdatedBy: "john.doe4@goto.com", + CreatedBy: "john.doe4@goto.com", + Spec: resource.Spec{ + Configs: []byte(`{ + "env_variables": { + "SINK_TYPE": "KAFKA", + "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar-4", + "SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz-4", + "SOURCE_KAFKA_BROKERS": "localhost:9095", + "SOURCE_KAFKA_TOPIC": "foo-log-4" + }, + "replicas": 1 + }`), + Dependencies: map[string]string{}, + }, + State: resource.State{ + Status: resource.StatusPending, + Output: nil, + }, + }, + kubeOutput: kubernetes.Output{ + Tolerations: map[string][]pod.Toleration{ + "firehose_KAFKA": { + { + Key: "key4", + Operator: "Equal", + Value: "value4", + Effect: "NoSchedule", + }, + }, + }, + }, + want: &helm.ReleaseConfig{ + Name: "project-1-resource-4-firehose", + Repository: "https://goto.github.io/charts/", + Chart: "firehose", + Version: "0.1.13", + Namespace: "namespace-1", + Timeout: 60, + Wait: true, + ForceUpdate: true, + Values: map[string]any{ + "firehose": map[string]any{ + "config": map[string]any{ + "DEFAULT_KEY_IN_FIREHOSE_MODULE_1": "default-key-in-firehose-module-value_1", + "DEFAULT_KEY_IN_FIREHOSE_MODULE_2": "default-key-in-firehose-module-value_2", + "DLQ_GCS_CREDENTIAL_PATH": "/etc/secret/dlq_gcs_auth.json", + "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar-4", + "SINK_BIGQUERY_CREDENTIAL_PATH": "/etc/secret/bigquery_auth.json", + "SINK_BIGTABLE_CREDENTIAL_PATH": "/etc/secret/gcs_auth.json", + "SINK_BLOB_GCS_CREDENTIAL_PATH": "/etc/secret/gcs_auth.json", + "SINK_TYPE": "KAFKA", + "SOURCE_KAFKA_BROKERS": "localhost:9095", + "SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz-4", + "SOURCE_KAFKA_TOPIC": "foo-log-4", + }, + "image": map[string]any{ + "pullPolicy": "IfNotPresent", + "repository": "gotocompany/firehose", + "tag": "0.8.1", + }, + "resources": map[string]any{ + "limits": map[string]any{ + "cpu": "6000m", + "memory": "6000Mi", + }, + "requests": map[string]any{ + "cpu": "600m", + "memory": "2500Mi", + }, + }, + }, + "init-firehose": map[string]any{ + "enabled": true, + "image": map[string]any{ + "repository": "busybox", + "pullPolicy": "IfNotPresent", + "tag": "latest", + }, + "command": []string{"cmd1", "--a"}, + "args": []string{"arg1", "arg2"}, + }, + "labels": map[string]string{ + "deployment": "project-1-resource-4-firehose", + "team": "team-4", + "orchestrator": "entropy", + }, + "mountSecrets": []map[string]string{ + { + "key": "gcs_credential", + "path": "gcs_auth.json", + "value": "gcs-credential", + }, + { + "key": "dlq_gcs_credential", + "path": "dlq_gcs_auth.json", + "value": "dlq-gcs-credential", + }, + { + "key": "bigquery_credential", + "path": "bigquery_auth.json", + "value": "big-query-credential", + }, + }, + "nodeAffinityMatchExpressions": map[string]any{ + "preferredDuringSchedulingIgnoredDuringExecution": []pod.WeightedPreference{ + { + Weight: 1, + Preference: []pod.Preference{ + { + Key: "another-node-label-key", + Operator: "In", + Values: []string{"another-node-label-value"}, + }, + }, + }, + }, + "requiredDuringSchedulingIgnoredDuringExecution": []pod.Preference{ + { + Key: "topology.kubernetes.io/zone", + Operator: "In", + Values: []string{"antarctica-east1", "antarctica-west1"}, + }, + }, + }, + "replicaCount": 1, + "telegraf": map[string]any{ + "enabled": true, + "image": map[string]string{ + "pullPolicy": "IfNotPresent", + "repository": "telegraf", + "tag": "1.18.0-alpine", + }, + "config": map[string]any{ + "output": map[string]any{ + "prometheus_remote_write": map[string]any{ + "enabled": true, + "url": "http://goto.namespace-1.com", + }, + }, + "additional_global_tags": map[string]string{ + "app": "orn:entropy:firehose:project-1:resource-4-firehose", + }, + }, + }, + "tolerations": []map[string]any{ + { + "key": "key4", + "operator": "Equal", + "value": "value4", + "effect": "NoSchedule", + }, + }, + }, + }, + wantErr: nil, + }, } for _, tt := range table { diff --git a/modules/firehose/schema/config.json b/modules/firehose/schema/config.json index 2d844fc5..e6def93e 100644 --- a/modules/firehose/schema/config.json +++ b/modules/firehose/schema/config.json @@ -54,7 +54,8 @@ "REDIS", "BIGQUERY", "BIGTABLE", - "MAXCOMPUTE" + "MAXCOMPUTE", + "KAFKA" ] }, "KAFKA_RECORD_PARSER_MODE": {