Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions cluster/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
tpubsub "github.com/troian/pubsub"
"k8s.io/apimachinery/pkg/api/resource"

"cosmossdk.io/log"

Expand Down Expand Up @@ -407,15 +408,39 @@ type inventoryServiceState struct {
reservations []*reservation
}

func countPendingIPs(state *inventoryServiceState) uint {
pending := uint(0)
func countReservedIPs(state *inventoryServiceState) uint {
reserved := uint(0)
for _, entry := range state.reservations {
if !entry.ipsConfirmed {
pending += entry.endpointQuantity
reserved += entry.endpointQuantity
}
}

return pending
return reserved
}

func availableLeasedIPs(total, inUse, reserved uint) uint {
if inUse >= total {
return 0
}

available := total - inUse
if reserved >= available {
return 0
}

return available - reserved
}

func leasedIPStatus(state *inventoryServiceState) inventoryV1.ResourcePair {
reserved := countReservedIPs(state)
allocated := state.ipAddrUsage.InUse + reserved

return inventoryV1.NewResourcePair(
int64(state.ipAddrUsage.Available), // nolint: gosec
int64(state.ipAddrUsage.Available), // nolint: gosec
int64(allocated), // nolint: gosec
resource.DecimalSI)
}

func (is *inventoryService) handleRequest(req inventoryRequest, state *inventoryServiceState) {
Expand All @@ -434,15 +459,15 @@ func (is *inventoryService) handleRequest(req inventoryRequest, state *inventory
req.ch <- inventoryResponse{err: errNoLeasedIPsAvailable}
return
}
numIPUnused := state.ipAddrUsage.Available - state.ipAddrUsage.InUse
pending := countPendingIPs(state)
if reservation.endpointQuantity > (numIPUnused - pending) {
reserved := countReservedIPs(state)
available := availableLeasedIPs(state.ipAddrUsage.Available, state.ipAddrUsage.InUse, reserved)
if reservation.endpointQuantity > available {
is.log.Info("insufficient number of IP addresses available", "order", req.order)
req.ch <- inventoryResponse{err: fmt.Errorf("%w: unable to reserve %d", errInsufficientIPs, reservation.endpointQuantity)}
return
}

is.log.Info("reservation used leased IPs", "used", reservation.endpointQuantity, "available", state.ipAddrUsage.Available, "in-use", state.ipAddrUsage.InUse, "pending", pending)
is.log.Info("reservation used leased IPs", "used", reservation.endpointQuantity, "available", state.ipAddrUsage.Available, "in-use", state.ipAddrUsage.InUse, "reserved", reserved, "remaining", available)
} else {
reservation.ipsConfirmed = true // No IPs, just mark it as confirmed implicitly
}
Expand Down Expand Up @@ -816,7 +841,8 @@ func (is *inventoryService) getStatusV1(state *inventoryServiceState) (*provider
}

status := &provider.Inventory{
Cluster: state.inventory.Snapshot(),
Cluster: state.inventory.Snapshot(),
LeasedIP: leasedIPStatus(state),
Reservations: provider.Reservations{
Pending: provider.ReservationsMetric{
Count: 0,
Expand Down
61 changes: 61 additions & 0 deletions cluster/inventory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,67 @@ func makeGroupForInventoryTest(sharedHTTP, nodePort, leasedIP bool) manifest.Gro
return group
}

func TestInventory_StatusV1ReportsLeasedIPResourcePair(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

inv := <-cinventory.NewNull(ctx, "nodeA").ResultChan()
reservationWithIPs := func(quantity uint, confirmed bool) *reservation {
return &reservation{
resources: &dvbeta.GroupSpec{},
endpointQuantity: quantity,
ipsConfirmed: confirmed,
}
}

status, err := (&inventoryService{}).getStatusV1(&inventoryServiceState{
inventory: inv,
ipAddrUsage: cip.AddressUsage{
Available: 10,
InUse: 4,
},
reservations: []*reservation{
reservationWithIPs(2, false),
reservationWithIPs(3, true),
reservationWithIPs(1, false),
},
})
require.NoError(t, err)

leasedIP := status.GetLeasedIP()
require.Equal(t, int64(10), leasedIP.GetCapacity().Value())
require.Equal(t, int64(10), leasedIP.GetAllocatable().Value())
require.Equal(t, int64(7), leasedIP.GetAllocated().Value())
require.Equal(t, int64(3), leasedIP.Available().Value())
}

func TestInventory_StatusV1SaturatesLeasedIPResourcePairAvailable(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

inv := <-cinventory.NewNull(ctx, "nodeA").ResultChan()
status, err := (&inventoryService{}).getStatusV1(&inventoryServiceState{
inventory: inv,
ipAddrUsage: cip.AddressUsage{
Available: 3,
InUse: 2,
},
reservations: []*reservation{
{
resources: &dvbeta.GroupSpec{},
endpointQuantity: 4,
},
},
})
require.NoError(t, err)

leasedIP := status.GetLeasedIP()
require.Equal(t, int64(3), leasedIP.GetCapacity().Value())
require.Equal(t, int64(3), leasedIP.GetAllocatable().Value())
require.Equal(t, int64(6), leasedIP.GetAllocated().Value())
require.Equal(t, int64(0), leasedIP.Available().Value())
}

func TestInventory_ReserveIPNoIPOperator(t *testing.T) {
config := Config{
InventoryResourcePollPeriod: 5 * time.Second,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
k8s.io/client-go v0.34.1
k8s.io/code-generator v0.34.1
k8s.io/kubectl v0.33.3
pkg.akt.dev/go v0.2.10
pkg.akt.dev/go v0.2.12
pkg.akt.dev/go/cli v0.2.2
pkg.akt.dev/go/sdl v0.2.0
pkg.akt.dev/node v1.2.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3878,8 +3878,8 @@ nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y=
nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
pgregory.net/rapid v0.5.5 h1:jkgx1TjbQPD/feRoK+S/mXw9e1uj6WilpHrXJowi6oA=
pgregory.net/rapid v0.5.5/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=
pkg.akt.dev/go v0.2.10 h1:oRrzQIaolZiaL3+U9fgpfod9E7NT2OzHMs+7fPEB+Dc=
pkg.akt.dev/go v0.2.10/go.mod h1:6yV8oyP8xFm4ocyuf+9nx8S3T4mJz3e64Cl7ln/BczM=
pkg.akt.dev/go v0.2.12 h1:1iG7AWEhwZZrqxVuAw/IBTD8HHM2sPx1w9KgwBpBWIQ=
pkg.akt.dev/go v0.2.12/go.mod h1:6yV8oyP8xFm4ocyuf+9nx8S3T4mJz3e64Cl7ln/BczM=
pkg.akt.dev/go/cli v0.2.2 h1:PWDAAeHkVtcZ9qE76yh4IhJ2J/42ekhwSyrGWLPGi/g=
pkg.akt.dev/go/cli v0.2.2/go.mod h1:MHm9lU8hb+xQ8BX3b9c9S1pMyZKUob5tVjHXQ4T1uwU=
pkg.akt.dev/go/sdl v0.2.0 h1:hY74GjN4itV92REf8HqGt1rQDtXsruzE/iIzd/FpUB8=
Expand Down
Loading