Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions lib/strategy/kubernetes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ defmodule Cluster.Strategy.Kubernetes do
Available values:

+ `:endpoints` (default)
+ `:endpoint_slices`
+ `:pods`

#### :endpoints
Expand All @@ -61,6 +62,22 @@ defmodule Cluster.Strategy.Kubernetes do
Then, this strategy will fetch the addresses of all endpoints with that label and attempt to
connect.

> **Note**: The Endpoints API is deprecated in Kubernetes 1.33. For clusters running
> Kubernetes 1.33+, use `:endpoint_slices` instead.

#### :endpoint_slices

When setting this value, this strategy will lookup IP from endpoint slices using the
`discovery.k8s.io/v1` API. This is the recommended mode for Kubernetes 1.33+ clusters
where the legacy Endpoints API is deprecated.

In order for your endpoint slices to be found they should be returned when you run:

kubectl get endpointslices -l app=myapp

Then, this strategy will fetch the addresses of all endpoint slices with that label and
attempt to connect.

#### :pods

When setting this value, this strategy will lookup IP from pods directly.
Expand Down Expand Up @@ -400,6 +417,9 @@ defmodule Cluster.Strategy.Kubernetes do
:endpoints ->
"api/v1/namespaces/#{namespace}/endpoints?#{query_params}"

:endpoint_slices ->
"apis/discovery.k8s.io/v1/namespaces/#{namespace}/endpointslices?#{query_params}"

:pods ->
"api/v1/namespaces/#{namespace}/pods?#{query_params}"
end
Expand Down Expand Up @@ -490,6 +510,35 @@ defmodule Cluster.Strategy.Kubernetes do
end
end

defp parse_response(:endpoint_slices, resp) do
case resp do
%{"items" => items} when is_list(items) ->
Enum.reduce(items, [], fn
%{"endpoints" => endpoints} = _slice, acc when is_list(endpoints) ->
addrs =
Enum.flat_map(endpoints, fn
%{"addresses" => addresses, "targetRef" => %{"namespace" => namespace}} =
endpoint
when is_list(addresses) ->
Enum.map(addresses, fn ip ->
%{ip: ip, namespace: namespace, hostname: endpoint["hostname"]}
end)

_ ->
[]
end)

acc ++ addrs

_, acc ->
acc
end)

_ ->
[]
end
end

defp parse_response(:pods, resp) do
case resp do
%{"items" => items} when is_list(items) ->
Expand Down
66 changes: 66 additions & 0 deletions test/fixtures/vcr_cassettes/kubernetes_endpoint_slices.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
[
{
"request": {
"body": "",
"headers": {
"authorization": "***"
},
"method": "get",
"options": {
"httpc_options": [],
"http_options": {
"ssl": "[verify: :verify_none]"
}
},
"request_body": "",
"url": "https://cluster.localhost./apis/discovery.k8s.io/v1/namespaces/__libcluster_test/endpointslices?labelSelector=app=test_selector"
},
"response": {
"binary": false,
"body": "{\"kind\":\"EndpointSliceList\",\"apiVersion\":\"discovery.k8s.io/v1\",\"metadata\":{\"resourceVersion\":\"17042410\"},\"items\":[{\"metadata\":{\"name\":\"development-development-abc12\",\"namespace\":\"airatel-service-localization\",\"uid\":\"7e3faf1e-0294-11e8-bcad-42010a9c01cc\",\"resourceVersion\":\"17037787\",\"creationTimestamp\":\"2018-01-26T12:29:03Z\",\"labels\":{\"app\":\"development\",\"kubernetes.io/service-name\":\"development-development\"}},\"addressType\":\"IPv4\",\"endpoints\":[{\"addresses\":[\"10.48.33.136\"],\"hostname\":\"my-hostname-0\",\"conditions\":{\"ready\":true,\"serving\":true,\"terminating\":false},\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"airatel-service-localization\",\"name\":\"development-4292695165-mgq9f\",\"uid\":\"eb0f3e80-0295-11e8-bcad-42010a9c01cc\"}}],\"ports\":[{\"name\":\"web\",\"port\":8443,\"protocol\":\"TCP\"}]}]}\n",
"headers": {
"date": "Fri, 26 Jan 2018 13:18:46 GMT",
"content-length": "877",
"content-type": "application/json"
},
"status_code": [
"HTTP/1.1",
200,
"OK"
],
"type": "ok"
}
},
{
"request": {
"body": "",
"headers": {
"authorization": "***"
},
"method": "get",
"options": {
"httpc_options": [],
"http_options": {
"ssl": "[verify: :verify_none]"
}
},
"request_body": "",
"url": "https://cluster.localhost./apis/discovery.k8s.io/v1/namespaces/__libcluster_test/endpointslices?labelSelector=app=test_selector&resourceVersion=0"
},
"response": {
"binary": false,
"body": "{\"kind\":\"EndpointSliceList\",\"apiVersion\":\"discovery.k8s.io/v1\",\"metadata\":{\"resourceVersion\":\"17042410\"},\"items\":[{\"metadata\":{\"name\":\"development-development-abc12\",\"namespace\":\"airatel-service-localization\",\"uid\":\"7e3faf1e-0294-11e8-bcad-42010a9c01cc\",\"resourceVersion\":\"17037787\",\"creationTimestamp\":\"2018-01-26T12:29:03Z\",\"labels\":{\"app\":\"development\",\"kubernetes.io/service-name\":\"development-development\"}},\"addressType\":\"IPv4\",\"endpoints\":[{\"addresses\":[\"10.48.33.136\"],\"hostname\":\"my-hostname-0\",\"conditions\":{\"ready\":true,\"serving\":true,\"terminating\":false},\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"airatel-service-localization\",\"name\":\"development-4292695165-mgq9f\",\"uid\":\"eb0f3e80-0295-11e8-bcad-42010a9c01cc\"}}],\"ports\":[{\"name\":\"web\",\"port\":8443,\"protocol\":\"TCP\"}]}]}\n",
"headers": {
"date": "Fri, 26 Jan 2018 13:18:46 GMT",
"content-length": "877",
"content-type": "application/json"
},
"status_code": [
"HTTP/1.1",
200,
"OK"
],
"type": "ok"
}
}
]
118 changes: 118 additions & 0 deletions test/kubernetes_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,124 @@ defmodule Cluster.Strategy.KubernetesTest do
end
end

test "works with endpoint_slices" do
use_cassette "kubernetes_endpoint_slices", custom: true do
capture_log(fn ->
start_supervised!({Kubernetes,
[
%Cluster.Strategy.State{
topology: :name,
config: [
kubernetes_node_basename: "test_basename",
kubernetes_selector: "app=test_selector",
kubernetes_ip_lookup_mode: :endpoint_slices,
# If you want to run the test freshly, you'll need to create a DNS Entry
kubernetes_master: "cluster.localhost.",
kubernetes_service_account_path:
Path.join([__DIR__, "fixtures", "kubernetes", "service_account"])
],
connect: {Nodes, :connect, [self()]},
disconnect: {Nodes, :disconnect, [self()]},
list_nodes: {Nodes, :list_nodes, [[]]}
}
]})

assert_receive {:connect, :"test_basename@10.48.33.136"}, 5_000
end)
end
end

test "works with endpoint_slices and cached resources" do
use_cassette "kubernetes_endpoint_slices", custom: true do
capture_log(fn ->
start_supervised!({Kubernetes,
[
%Cluster.Strategy.State{
topology: :name,
config: [
kubernetes_node_basename: "test_basename",
kubernetes_selector: "app=test_selector",
kubernetes_ip_lookup_mode: :endpoint_slices,
kubernetes_use_cached_resources: true,
# If you want to run the test freshly, you'll need to create a DNS Entry
kubernetes_master: "cluster.localhost.",
kubernetes_service_account_path:
Path.join([__DIR__, "fixtures", "kubernetes", "service_account"])
],
connect: {Nodes, :connect, [self()]},
disconnect: {Nodes, :disconnect, [self()]},
list_nodes: {Nodes, :list_nodes, [[]]}
}
]})

assert_receive {:connect, :"test_basename@10.48.33.136"}, 5_000
end)
end
end

test "works with endpoint_slices and hostname mode" do
use_cassette "kubernetes_endpoint_slices", custom: true do
capture_log(fn ->
start_supervised!({Kubernetes,
[
%Cluster.Strategy.State{
topology: :name,
config: [
kubernetes_node_basename: "test_basename",
kubernetes_cluster_name: "my_cluster",
kubernetes_ip_lookup_mode: :endpoint_slices,
mode: :hostname,
kubernetes_selector: "app=test_selector",
kubernetes_service_name: "my_service",
# If you want to run the test freshly, you'll need to create a DNS Entry
kubernetes_master: "cluster.localhost.",
kubernetes_service_account_path:
Path.join([__DIR__, "fixtures", "kubernetes", "service_account"])
],
connect: {Nodes, :connect, [self()]},
disconnect: {Nodes, :disconnect, [self()]},
list_nodes: {Nodes, :list_nodes, [[]]}
}
]})

assert_receive {:connect,
:"test_basename@my-hostname-0.my_service.airatel-service-localization.svc.my_cluster.local"},
5_000
end)
end
end

test "works with endpoint_slices and dns mode" do
use_cassette "kubernetes_endpoint_slices", custom: true do
capture_log(fn ->
start_supervised!({Kubernetes,
[
%Cluster.Strategy.State{
topology: :name,
config: [
kubernetes_node_basename: "test_basename",
kubernetes_cluster_name: "my_cluster",
kubernetes_ip_lookup_mode: :endpoint_slices,
mode: :dns,
kubernetes_selector: "app=test_selector",
# If you want to run the test freshly, you'll need to create a DNS Entry
kubernetes_master: "cluster.localhost.",
kubernetes_service_account_path:
Path.join([__DIR__, "fixtures", "kubernetes", "service_account"])
],
connect: {Nodes, :connect, [self()]},
disconnect: {Nodes, :disconnect, [self()]},
list_nodes: {Nodes, :list_nodes, [[]]}
}
]})

assert_receive {:connect,
:"test_basename@10-48-33-136.airatel-service-localization.pod.my_cluster.local"},
5_000
end)
end
end

test "works with pods" do
use_cassette "kubernetes_pods", custom: true do
capture_log(fn ->
Expand Down