Skip to content

Commit

Permalink
Use dedicated leaspool for worker config component
Browse files Browse the repository at this point in the history
With this change the first controller on new version can apply the needed versioned resources as it will always be guaranteed to become the leader.

Signed-off-by: Jussi Nummelin <[email protected]>
(cherry picked from commit e5a271b)
(cherry picked from commit a9e79f6)
Signed-off-by: Tom Wieczorek <[email protected]>
(cherry picked from commit a1d7798)
  • Loading branch information
jnummelin authored and twz123 committed May 27, 2024
1 parent 01c78a8 commit 728c6c9
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 4 deletions.
11 changes: 9 additions & 2 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ func (c *command) start(ctx context.Context) error {

// One leader elector per controller
if !c.SingleNode {
leaderElector = leaderelector.NewLeasePool(adminClientFactory)
// The name used to be hardcoded in the component itself
// At some point we need to rename this.
leaderElector = leaderelector.NewLeasePool(adminClientFactory, "k0s-endpoint-reconciler")
} else {
leaderElector = &leaderelector.Dummy{Leader: true}
}
Expand Down Expand Up @@ -467,7 +469,12 @@ func (c *command) start(ctx context.Context) error {
}

if !slices.Contains(c.DisableComponents, constant.WorkerConfigComponentName) {
reconciler, err := workerconfig.NewReconciler(c.K0sVars, nodeConfig.Spec, adminClientFactory, leaderElector, enableKonnectivity)
// Create new dedicated leasepool for worker config reconciler
leaseName := fmt.Sprintf("k0s-%s-%s", constant.WorkerConfigComponentName, constant.KubernetesMajorMinorVersion)
workerConfigLeasePool := leaderelector.NewLeasePool(adminClientFactory, leaseName)
clusterComponents.Add(ctx, workerConfigLeasePool)

reconciler, err := workerconfig.NewReconciler(c.K0sVars, nodeConfig.Spec, adminClientFactory, workerConfigLeasePool, enableKonnectivity)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions inttest/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ check-backup: TIMEOUT=10m
check-ap-ha3x3: K0S_UPDATE_FROM_BIN ?= ../k0s
check-ap-ha3x3: K0S_UPDATE_FROM_PATH ?= $(realpath $(K0S_UPDATE_FROM_BIN))

check-ap-controllerworker: K0S_UPDATE_FROM_BIN ?= ../k0s
check-ap-controllerworker: K0S_UPDATE_FROM_PATH ?= $(realpath $(K0S_UPDATE_FROM_BIN))

check-customports-dynamicconfig: export K0S_ENABLE_DYNAMIC_CONFIG=true
check-customports-dynamicconfig: TEST_PACKAGE=customports

Expand Down
1 change: 1 addition & 0 deletions inttest/Makefile.variables
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ smoketests := \
check-addons \
check-airgap \
check-ap-airgap \
check-ap-controllerworker \
check-ap-ha3x3 \
check-ap-platformselect \
check-ap-quorum \
Expand Down
209 changes: 209 additions & 0 deletions inttest/ap-controllerworker/controllerworker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright 2024 k0s authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controllerworker

import (
"fmt"
"strings"
"testing"
"time"

"github.com/k0sproject/k0s/inttest/common"
aptest "github.com/k0sproject/k0s/inttest/common/autopilot"

apconst "github.com/k0sproject/k0s/pkg/autopilot/constant"
appc "github.com/k0sproject/k0s/pkg/autopilot/controller/plans/core"
"github.com/k0sproject/k0s/pkg/constant"
"github.com/k0sproject/k0s/pkg/kubernetes/watch"

"github.com/stretchr/testify/suite"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type controllerworkerSuite struct {
common.FootlooseSuite
}

const k0sConfigWithMultiController = `
spec:
api:
address: %s
storage:
etcd:
peerAddress: %s
`

const oldVersion = "v1.29.4+k0s.0"

// SetupTest prepares the controller and filesystem, getting it into a consistent
// state which we can run tests against.
func (s *controllerworkerSuite) SetupTest() {
ctx := s.Context()
// ipAddress := s.GetControllerIPAddress(0)
var joinToken string

for idx := 0; idx < s.FootlooseSuite.ControllerCount; idx++ {
nodeName, require := s.ControllerNode(idx), s.Require()
address := s.GetControllerIPAddress(idx)

s.Require().NoError(s.WaitForSSH(nodeName, 2*time.Minute, 1*time.Second))
ssh, err := s.SSH(ctx, nodeName)
require.NoError(err)
defer ssh.Disconnect()
s.PutFile(nodeName, "/tmp/k0s.yaml", fmt.Sprintf(k0sConfigWithMultiController, address, address))
// Install older version of k0s
downloadCmd := fmt.Sprintf("curl -sSfL get.k0s.sh | K0S_VERSION=%s sh", oldVersion)
out, err := ssh.ExecWithOutput(ctx, downloadCmd)
if err != nil {
s.T().Logf("error getting k0s: %s", out)
}
require.NoError(err)
s.T().Logf("downloaded succesfully: %s", out)
// Note that the token is intentionally empty for the first controller
args := []string{
"--debug",
"--disable-components=metrics-server,helm,konnectivity-server",
"--enable-worker",
"--config=/tmp/k0s.yaml",
}
if joinToken != "" {
s.PutFile(nodeName, "/tmp/token", joinToken)
args = append(args, "--token-file=/tmp/token")
}
out, err = ssh.ExecWithOutput(ctx, "k0s install controller "+strings.Join(args, " "))
if err != nil {
s.T().Logf("error installing k0s: %s", out)
}
require.NoError(err)
_, err = ssh.ExecWithOutput(ctx, "k0s start")
require.NoError(err)
// s.Require().NoError(s.InitController(idx, "--config=/tmp/k0s.yaml", "--disable-components=metrics-server", "--enable-worker", joinToken))
s.Require().NoError(s.WaitJoinAPI(nodeName))
kc, err := s.KubeClient(nodeName)
require.NoError(err)
require.NoError(s.WaitForNodeReady(nodeName, kc))

node, err := kc.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
require.NoError(err)
require.Equal("v1.29.4+k0s", node.Status.NodeInfo.KubeletVersion)

client, err := s.ExtensionsClient(s.ControllerNode(0))
s.Require().NoError(err)

s.Require().NoError(aptest.WaitForCRDByName(ctx, client, "plans"))
s.Require().NoError(aptest.WaitForCRDByName(ctx, client, "controlnodes"))

// With the primary controller running, create the join token for subsequent controllers.
if idx == 0 {
token, err := s.GetJoinToken("controller")
s.Require().NoError(err)
joinToken = token
}
}

// Final sanity -- ensure all nodes see each other according to etcd
for idx := 0; idx < s.FootlooseSuite.ControllerCount; idx++ {
s.Require().Len(s.GetMembers(idx), s.FootlooseSuite.ControllerCount)
}
}

// TestApply applies a well-formed `plan` yaml, and asserts that
// all of the correct values across different objects + controllers are correct.
func (s *controllerworkerSuite) TestApply() {

planTemplate := `
apiVersion: autopilot.k0sproject.io/v1beta2
kind: Plan
metadata:
name: autopilot
spec:
id: id123
timestamp: now
commands:
- k0supdate:
version: v0.0.0
forceupdate: true
platforms:
linux-amd64:
url: http://localhost/dist/k0s-new
linux-arm64:
url: http://localhost/dist/k0s-new
targets:
controllers:
discovery:
static:
nodes:
- controller1
- controller2
- controller0
`
ctx := s.Context()
manifestFile := "/tmp/happy.yaml"
s.PutFileTemplate(s.ControllerNode(0), manifestFile, planTemplate, nil)

out, err := s.RunCommandController(0, fmt.Sprintf("/usr/local/bin/k0s kubectl apply -f %s", manifestFile))
s.T().Logf("kubectl apply output: '%s'", out)
s.Require().NoError(err)

client, err := s.AutopilotClient(s.ControllerNode(0))
s.Require().NoError(err)
s.NotEmpty(client)

// The plan has enough information to perform a successful update of k0s, so wait for it.
plan, err := aptest.WaitForPlanState(s.Context(), client, apconst.AutopilotName, appc.PlanCompleted)
s.Require().NoError(err)

s.Equal(1, len(plan.Status.Commands))
cmd := plan.Status.Commands[0]

s.Equal(appc.PlanCompleted, cmd.State)
s.NotNil(cmd.K0sUpdate)
s.NotNil(cmd.K0sUpdate.Controllers)
s.Empty(cmd.K0sUpdate.Workers)

for _, node := range cmd.K0sUpdate.Controllers {
s.Equal(appc.SignalCompleted, node.State)
}

kc, err := s.KubeClient(s.ControllerNode(0))
s.NoError(err)

for idx := 0; idx < s.FootlooseSuite.ControllerCount; idx++ {
nodeName, require := s.ControllerNode(idx), s.Require()
require.NoError(s.WaitForNodeReady(nodeName, kc))
// Wait till we see kubelet reporting the expected version
err := watch.Nodes(kc.CoreV1().Nodes()).
WithObjectName(nodeName).
WithErrorCallback(common.RetryWatchErrors(s.T().Logf)).
Until(ctx, func(node *corev1.Node) (bool, error) {
return strings.Contains(node.Status.NodeInfo.KubeletVersion, fmt.Sprintf("v%s.", constant.KubernetesMajorMinorVersion)), nil
})
require.NoError(err)
}
}

// TestQuorumSuite sets up a suite using 3 controllers for quorum, and runs various
// autopilot upgrade scenarios against them.
func TestQuorumSuite(t *testing.T) {
suite.Run(t, &controllerworkerSuite{
common.FootlooseSuite{
ControllerCount: 3,
WorkerCount: 0,
LaunchMode: common.LaunchModeOpenRC,
},
})
}
6 changes: 4 additions & 2 deletions pkg/component/controller/leaderelector/leasepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,22 @@ type LeasePool struct {

acquiredLeaseCallbacks []func()
lostLeaseCallbacks []func()
name string
}

var _ Interface = (*LeasePool)(nil)
var _ manager.Component = (*LeasePool)(nil)

// NewLeasePool creates a new leader elector using a Kubernetes lease pool.
func NewLeasePool(kubeClientFactory kubeutil.ClientFactoryInterface) *LeasePool {
func NewLeasePool(kubeClientFactory kubeutil.ClientFactoryInterface, name string) *LeasePool {
d := atomic.Value{}
d.Store(false)
return &LeasePool{
stopCh: make(chan struct{}),
kubeClientFactory: kubeClientFactory,
log: logrus.WithFields(logrus.Fields{"component": "poolleaderelector"}),
leaderStatus: d,
name: name,
}
}

Expand All @@ -63,7 +65,7 @@ func (l *LeasePool) Start(ctx context.Context) error {
if err != nil {
return fmt.Errorf("can't create kubernetes rest client for lease pool: %v", err)
}
leasePool, err := leaderelection.NewLeasePool(ctx, client, "k0s-endpoint-reconciler",
leasePool, err := leaderelection.NewLeasePool(ctx, client, l.name,
leaderelection.WithLogger(l.log),
leaderelection.WithContext(ctx))
if err != nil {
Expand Down

0 comments on commit 728c6c9

Please sign in to comment.