Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 7 additions & 227 deletions controller/api/destination/endpoint_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package destination
import (
"fmt"
"net/netip"
"reflect"

pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2-proxy-api/go/net"
Expand Down Expand Up @@ -37,24 +36,19 @@ type (
controllerNS string
identityTrustDomain string
nodeTopologyZone string
nodeName string
defaultOpaquePorts map[uint32]struct{}

forceOpaqueTransport,
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,

extEndpointZoneWeights bool

meshedHTTP2ClientParams *pb.Http2ClientParams

availableEndpoints watcher.AddressSet
filteredSnapshot watcher.AddressSet
stream pb.Destination_GetServer
endStream chan struct{}
log *logging.Entry
overflowCounter prometheus.Counter
stream pb.Destination_GetServer
endStream chan struct{}
log *logging.Entry
overflowCounter prometheus.Counter

updates chan interface{}
stop chan struct{}
Expand All @@ -67,10 +61,6 @@ type (
removeUpdate struct {
set watcher.AddressSet
}

noEndpointsUpdate struct {
exists bool
}
)

var updatesQueueOverflowCounter = promauto.NewCounterVec(
Expand All @@ -88,8 +78,6 @@ func newEndpointTranslator(
identityTrustDomain string,
forceOpaqueTransport,
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,
extEndpointZoneWeights bool,
meshedHTTP2ClientParams *pb.Http2ClientParams,
service string,
Expand All @@ -110,9 +98,6 @@ func newEndpointTranslator(
if err != nil {
log.Errorf("Failed to get node topology zone for node %s: %s", srcNodeName, err)
}
availableEndpoints := newEmptyAddressSet()

filteredSnapshot := newEmptyAddressSet()

counter, err := updatesQueueOverflowCounter.GetMetricWith(prometheus.Labels{"service": service})
if err != nil {
Expand All @@ -123,17 +108,12 @@ func newEndpointTranslator(
controllerNS,
identityTrustDomain,
nodeTopologyZone,
srcNodeName,
defaultOpaquePorts,
forceOpaqueTransport,
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,
extEndpointZoneWeights,
meshedHTTP2ClientParams,

availableEndpoints,
filteredSnapshot,
stream,
endStream,
log,
Expand All @@ -151,11 +131,7 @@ func (et *endpointTranslator) Remove(set watcher.AddressSet) {
et.enqueueUpdate(&removeUpdate{set})
}

func (et *endpointTranslator) NoEndpoints(exists bool) {
et.enqueueUpdate(&noEndpointsUpdate{exists})
}

// Add, Remove, and NoEndpoints are called from a client-go informer callback
// Add and Remove are called from a client-go informer callback
// and therefore must not block. For each of these, we enqueue an update in
// a channel so that it can be processed asyncronously. To ensure that enqueuing
// does not block, we first check to see if there is capacity in the buffered
Expand Down Expand Up @@ -214,201 +190,12 @@ func (et *endpointTranslator) DrainAndStop() {
func (et *endpointTranslator) processUpdate(update interface{}) {
switch update := update.(type) {
case *addUpdate:
et.add(update.set)
et.sendClientAdd(update.set)
case *removeUpdate:
et.remove(update.set)
case *noEndpointsUpdate:
et.noEndpoints(update.exists)
}
}

func (et *endpointTranslator) add(set watcher.AddressSet) {
for id, address := range set.Addresses {
et.availableEndpoints.Addresses[id] = address
}

et.availableEndpoints.Labels = set.Labels
et.availableEndpoints.LocalTrafficPolicy = set.LocalTrafficPolicy

et.sendFilteredUpdate()
}

func (et *endpointTranslator) remove(set watcher.AddressSet) {
for id := range set.Addresses {
delete(et.availableEndpoints.Addresses, id)
}

et.sendFilteredUpdate()
}

func (et *endpointTranslator) noEndpoints(exists bool) {
et.log.Debugf("NoEndpoints(%+v)", exists)

et.availableEndpoints.Addresses = map[watcher.ID]watcher.Address{}

et.sendFilteredUpdate()
}

func (et *endpointTranslator) sendFilteredUpdate() {
filtered := et.filterAddresses()
filtered = et.selectAddressFamily(filtered)
diffAdd, diffRemove := et.diffEndpoints(filtered)

if len(diffAdd.Addresses) > 0 {
et.sendClientAdd(diffAdd)
}
if len(diffRemove.Addresses) > 0 {
et.sendClientRemove(diffRemove)
}

et.filteredSnapshot = filtered
}

func (et *endpointTranslator) selectAddressFamily(addresses watcher.AddressSet) watcher.AddressSet {
filtered := make(map[watcher.ID]watcher.Address)
for id, addr := range addresses.Addresses {
if id.IPFamily == corev1.IPv6Protocol && !et.enableIPv6 {
continue
}

if id.IPFamily == corev1.IPv4Protocol && et.enableIPv6 {
// Only consider IPv4 address for which there's not already an IPv6
// alternative
altID := id
altID.IPFamily = corev1.IPv6Protocol
if _, ok := addresses.Addresses[altID]; ok {
continue
}
}

filtered[id] = addr
}

return watcher.AddressSet{
Addresses: filtered,
Labels: addresses.Labels,
LocalTrafficPolicy: addresses.LocalTrafficPolicy,
et.sendClientRemove(update.set)
}
}

// filterAddresses is responsible for filtering endpoints based on the node's
// topology zone. The client will only receive endpoints with the same
// consumption zone as the node. An endpoints consumption zone is set
// by its Hints field and can be different than its actual Topology zone.
// when service.spec.internalTrafficPolicy is set to local, Topology Aware
// Hints are not used.
func (et *endpointTranslator) filterAddresses() watcher.AddressSet {
filtered := make(map[watcher.ID]watcher.Address)

// If endpoint filtering is disabled, return all available addresses.
if !et.enableEndpointFiltering {
for k, v := range et.availableEndpoints.Addresses {
filtered[k] = v
}
return watcher.AddressSet{
Addresses: filtered,
Labels: et.availableEndpoints.Labels,
}
}

// If service.spec.internalTrafficPolicy is set to local, filter and return the addresses
// for local node only
if et.availableEndpoints.LocalTrafficPolicy {
et.log.Debugf("Filtering through addresses that should be consumed by node %s", et.nodeName)
for id, address := range et.availableEndpoints.Addresses {
if address.Pod != nil && address.Pod.Spec.NodeName == et.nodeName {
filtered[id] = address
}
}
et.log.Debugf("Filtered from %d to %d addresses", len(et.availableEndpoints.Addresses), len(filtered))
return watcher.AddressSet{
Addresses: filtered,
Labels: et.availableEndpoints.Labels,
LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
}
}
// If any address does not have a hint, then all hints are ignored and all
// available addresses are returned. This replicates kube-proxy behavior
// documented in the KEP: https://github.com/kubernetes/enhancements/blob/master/keps/sig-network/2433-topology-aware-hints/README.md#kube-proxy
for _, address := range et.availableEndpoints.Addresses {
if len(address.ForZones) == 0 {
for k, v := range et.availableEndpoints.Addresses {
filtered[k] = v
}
et.log.Debugf("Hints not available on endpointslice. Zone Filtering disabled. Falling back to routing to all pods")
return watcher.AddressSet{
Addresses: filtered,
Labels: et.availableEndpoints.Labels,
LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
}
}
}

// Each address that has a hint matching the node's zone should be added
// to the set of addresses that will be returned.
et.log.Debugf("Filtering through addresses that should be consumed by zone %s", et.nodeTopologyZone)
for id, address := range et.availableEndpoints.Addresses {
for _, zone := range address.ForZones {
if zone.Name == et.nodeTopologyZone {
filtered[id] = address
}
}
}
if len(filtered) > 0 {
et.log.Debugf("Filtered from %d to %d addresses", len(et.availableEndpoints.Addresses), len(filtered))
return watcher.AddressSet{
Addresses: filtered,
Labels: et.availableEndpoints.Labels,
LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
}
}

// If there were no filtered addresses, then fall to using endpoints from
// all zones.
for k, v := range et.availableEndpoints.Addresses {
filtered[k] = v
}
return watcher.AddressSet{
Addresses: filtered,
Labels: et.availableEndpoints.Labels,
LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
}
}

// diffEndpoints calculates the difference between the filtered set of
// endpoints in the current (Add/Remove) operation and the snapshot of
// previously filtered endpoints. This diff allows the client to receive only
// the endpoints that match the topological zone, by adding new endpoints and
// removing stale ones.
func (et *endpointTranslator) diffEndpoints(filtered watcher.AddressSet) (watcher.AddressSet, watcher.AddressSet) {
add := make(map[watcher.ID]watcher.Address)
remove := make(map[watcher.ID]watcher.Address)

for id, new := range filtered.Addresses {
old, ok := et.filteredSnapshot.Addresses[id]
if !ok {
add[id] = new
} else if !reflect.DeepEqual(old, new) {
add[id] = new
}
}

for id, address := range et.filteredSnapshot.Addresses {
if _, ok := filtered.Addresses[id]; !ok {
remove[id] = address
}
}

return watcher.AddressSet{
Addresses: add,
Labels: filtered.Labels,
},
watcher.AddressSet{
Addresses: remove,
Labels: filtered.Labels,
}
}

func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
addrs := []*pb.WeightedAddr{}
for _, address := range set.Addresses {
Expand Down Expand Up @@ -724,13 +511,6 @@ func getNodeTopologyZone(k8sAPI *k8s.MetadataAPI, srcNode string) (string, error
return "", nil
}

func newEmptyAddressSet() watcher.AddressSet {
return watcher.AddressSet{
Addresses: make(map[watcher.ID]watcher.Address),
Labels: make(map[string]string),
}
}

// getInboundPort gets the inbound port from the proxy container's environment
// variable.
func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) {
Expand Down
Loading
Loading