Skip to content

Commit

Permalink
Add e2e downgrade automatic cancellation test
Browse files Browse the repository at this point in the history
Verify that the downgrade can be cancelled
automatically when the downgrade is completed
(using `no inflight downgrade job`` as the indicator)

Please see: #19365 (comment)
Reference: #17976

Signed-off-by: Chun-Hung Tseng <[email protected]>
  • Loading branch information
Chun-Hung Tseng committed Feb 15, 2025
1 parent 14cf669 commit 121fa72
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 10 deletions.
93 changes: 85 additions & 8 deletions tests/e2e/cluster_downgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
})
}
cc := epc.Etcdctl()
t.Logf("Cluster created")
t.Log("Cluster created")
if len(epc.Procs) > 1 {
t.Log("Waiting health interval to required to make membership changes")
time.Sleep(etcdserver.HealthInterval)
Expand All @@ -132,7 +132,7 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
resp, err := cc.MemberAddAsLearner(context.Background(), "fake1", []string{"http://127.0.0.1:1001"})
require.NoError(t, err)
if triggerSnapshot {
t.Logf("Generating snapshot")
t.Log("Generating snapshot")
generateSnapshot(t, snapshotCount, cc)
verifySnapshot(t, epc)
}
Expand All @@ -142,7 +142,7 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
beforeMembers, beforeKV := getMembersAndKeys(t, cc)

if triggerCancellation == cancelRightBeforeEnable {
t.Logf("Cancelling downgrade before enabling")
t.Log("Cancelling downgrade before enabling")
e2e.DowngradeCancel(t, epc)
t.Log("Downgrade cancelled, validating if cluster is in the right state")
e2e.ValidateMemberVersions(t, epc, generateIdenticalVersions(clusterSize, currentVersion))
Expand All @@ -151,7 +151,7 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
}
e2e.DowngradeEnable(t, epc, lastVersion)
if triggerCancellation == cancelRightAfterEnable {
t.Logf("Cancelling downgrade right after enabling (no node is downgraded yet)")
t.Log("Cancelling downgrade right after enabling (no node is downgraded yet)")
e2e.DowngradeCancel(t, epc)
t.Log("Downgrade cancelled, validating if cluster is in the right state")
e2e.ValidateMemberVersions(t, epc, generateIdenticalVersions(clusterSize, currentVersion))
Expand Down Expand Up @@ -188,7 +188,7 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
resp, err = cc.MemberAddAsLearner(context.Background(), "fake2", []string{"http://127.0.0.1:1002"})
require.NoError(t, err)
if triggerSnapshot {
t.Logf("Generating snapshot")
t.Log("Generating snapshot")
generateSnapshot(t, snapshotCount, cc)
verifySnapshot(t, epc)
}
Expand All @@ -207,6 +207,77 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
assert.Equal(t, beforeMembers.Members, afterMembers.Members)
}

func TestAutomaticDowngradeCancellationAfterCompletingDowngradingInClusterOf3(t *testing.T) {
clusterSize := 3

currentEtcdBinary := e2e.BinPath.Etcd
lastReleaseBinary := e2e.BinPath.EtcdLastRelease
if !fileutil.Exist(lastReleaseBinary) {
t.Skipf("%q does not exist", lastReleaseBinary)
}

currentVersion, err := e2e.GetVersionFromBinary(currentEtcdBinary)
require.NoError(t, err)
// wipe any pre-release suffix like -alpha.0 we see commonly in builds
currentVersion.PreRelease = ""

lastVersion, err := e2e.GetVersionFromBinary(lastReleaseBinary)
require.NoError(t, err)

require.Equalf(t, lastVersion.Minor, currentVersion.Minor-1, "unexpected minor version difference")
currentVersionStr := currentVersion.String()
lastVersionStr := lastVersion.String()

lastClusterVersion := semver.New(lastVersionStr)
lastClusterVersion.Patch = 0

e2e.BeforeTest(t)

t.Logf("Create cluster with version %s", currentVersionStr)
var snapshotCount uint64 = 10
epc := newCluster(t, clusterSize, snapshotCount)
for i := 0; i < len(epc.Procs); i++ {
e2e.ValidateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: currentVersionStr,
Server: version.Version,
Storage: currentVersionStr,
})
}
cc := epc.Etcdctl()
t.Log("Cluster created")
if len(epc.Procs) > 1 {
t.Log("Waiting health interval to required to make membership changes")
time.Sleep(etcdserver.HealthInterval)
}

t.Log("Adding member to test membership, but a learner avoid breaking quorum")
resp, err := cc.MemberAddAsLearner(context.Background(), "fake1", []string{"http://127.0.0.1:1001"})
require.NoError(t, err)
t.Log("Removing learner to test membership")
_, err = cc.MemberRemove(context.Background(), resp.Member.ID)
require.NoError(t, err)
beforeMembers, beforeKV := getMembersAndKeys(t, cc)

e2e.DowngradeEnable(t, epc, lastVersion)

t.Logf("Starting downgrade process for all nodes to %q", lastVersionStr)
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, []int{0, 1, 2}, currentVersion, lastClusterVersion)
require.NoError(t, err)

afterMembers, afterKV := getMembersAndKeys(t, cc)
assert.Equal(t, beforeKV.Kvs, afterKV.Kvs)
assert.Equal(t, beforeMembers.Members, afterMembers.Members)

if len(epc.Procs) > 1 {
t.Log("Waiting health interval to required to make membership changes")
time.Sleep(etcdserver.HealthInterval)
}

e2e.DowngradeAutoCancelCheck(t, epc)
t.Log("Downgrade cancellation is automatically cancelled since the cluster has been downgraded, validating if cluster is in the right state")
e2e.ValidateMemberVersions(t, epc, generateIdenticalVersions(clusterSize, lastClusterVersion))
}

func newCluster(t *testing.T, clusterSize int, snapshotCount uint64) *e2e.EtcdProcessCluster {
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithClusterSize(clusterSize),
Expand Down Expand Up @@ -250,7 +321,7 @@ func generateSnapshot(t *testing.T, snapshotCount uint64, cc *e2e.EtcdctlV3) {
defer cancel()

var i uint64
t.Logf("Adding keys")
t.Log("Adding keys")
for i = 0; i < snapshotCount*3; i++ {
err := cc.Put(ctx, fmt.Sprintf("%d", i), "1", config.PutOptions{})
assert.NoError(t, err)
Expand All @@ -264,7 +335,7 @@ func verifySnapshot(t *testing.T, epc *e2e.EtcdProcessCluster) {
_, err := ss.Load()
require.NoError(t, err)
}
t.Logf("All members have a valid snapshot")
t.Log("All members have a valid snapshot")
}

func verifySnapshotMembers(t *testing.T, epc *e2e.EtcdProcessCluster, expectedMembers *clientv3.MemberListResponse) {
Expand Down Expand Up @@ -301,11 +372,17 @@ func getMembersAndKeys(t *testing.T, cc *e2e.EtcdctlV3) (*clientv3.MemberListRes
func generateIdenticalVersions(clusterSize int, ver *semver.Version) []*version.Versions {
ret := make([]*version.Versions, clusterSize)

// storage version string is non-empty starting from 3.6.0
storageStr := ver.String()
if ver.LessThan(version.V3_6) {
storageStr = ""
}

for i := range clusterSize {
ret[i] = &version.Versions{
Cluster: ver.String(),
Server: ver.String(),
Storage: ver.String(),
Storage: storageStr,
}
}

Expand Down
26 changes: 24 additions & 2 deletions tests/framework/e2e/downgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/tests/v3/framework/testutils"
)

Expand Down Expand Up @@ -58,7 +59,7 @@ func DowngradeCancel(t *testing.T, epc *EtcdProcessCluster) {
var err error
testutils.ExecuteWithTimeout(t, 1*time.Minute, func() {
for {
t.Logf("etcdctl downgrade cancel")
t.Log("etcdctl downgrade cancel")
err = c.DowngradeCancel(context.TODO())
if err != nil {
if strings.Contains(err.Error(), "no inflight downgrade job") {
Expand All @@ -72,7 +73,7 @@ func DowngradeCancel(t *testing.T, epc *EtcdProcessCluster) {
continue
}

t.Logf("etcdctl downgrade cancel executed successfully")
t.Log("etcdctl downgrade cancel executed successfully")
break
}
})
Expand All @@ -82,6 +83,19 @@ func DowngradeCancel(t *testing.T, epc *EtcdProcessCluster) {
t.Log("Cluster downgrade cancellation is completed")
}

func DowngradeAutoCancelCheck(t *testing.T, epc *EtcdProcessCluster) {
c := epc.Etcdctl()

var err error
testutils.ExecuteWithTimeout(t, 1*time.Minute, func() {
t.Log("etcdctl downgrade cancel")
err = c.DowngradeCancel(context.TODO())
require.Errorf(t, err, "no inflight downgrade job")
})

t.Log("Cluster downgrade is completed")
}

func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, numberOfMembersToChange int, currentVersion, targetVersion *semver.Version) error {
membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange]
t.Logf("Elect members for operations on members: %v", membersToChange)
Expand Down Expand Up @@ -117,6 +131,14 @@ func DowngradeUpgradeMembersByID(t *testing.T, lg *zap.Logger, clus *EtcdProcess
return err
}
}

if opString == "downgrading" && len(membersToChange) == len(clus.Procs) {
lg.Info("Waiting for downgrade completion log line")
leader := clus.WaitLeader(t)
_, err := clus.Procs[leader].Logs().ExpectWithContext(context.TODO(), expect.ExpectedResponse{Value: "the cluster has been downgraded"})
require.NoError(t, err)
}

lg.Info("Validating versions")
for _, memberID := range membersToChange {
member := clus.Procs[memberID]
Expand Down

0 comments on commit 121fa72

Please sign in to comment.