Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test] Add e2e downgrade automatic cancellation test #19399

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 85 additions & 8 deletions tests/e2e/cluster_downgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
})
}
cc := epc.Etcdctl()
t.Logf("Cluster created")
t.Log("Cluster created")
if len(epc.Procs) > 1 {
t.Log("Waiting health interval to required to make membership changes")
time.Sleep(etcdserver.HealthInterval)
Expand All @@ -140,7 +140,7 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
resp, err := cc.MemberAddAsLearner(context.Background(), "fake1", []string{"http://127.0.0.1:1001"})
require.NoError(t, err)
if triggerSnapshot {
t.Logf("Generating snapshot")
t.Log("Generating snapshot")
generateSnapshot(t, snapshotCount, cc)
verifySnapshot(t, epc)
}
Expand All @@ -150,7 +150,7 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
beforeMembers, beforeKV := getMembersAndKeys(t, cc)

if triggerCancellation == cancelRightBeforeEnable {
t.Logf("Cancelling downgrade before enabling")
t.Log("Cancelling downgrade before enabling")
e2e.DowngradeCancel(t, epc)
t.Log("Downgrade cancelled, validating if cluster is in the right state")
e2e.ValidateMemberVersions(t, epc, generateIdenticalVersions(clusterSize, currentVersion))
Expand All @@ -163,7 +163,7 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
e2e.ValidateDowngradeInfo(t, epc, &pb.DowngradeInfo{Enabled: true, TargetVersion: lastClusterVersion.String()})

if triggerCancellation == cancelRightAfterEnable {
t.Logf("Cancelling downgrade right after enabling (no node is downgraded yet)")
t.Log("Cancelling downgrade right after enabling (no node is downgraded yet)")
e2e.DowngradeCancel(t, epc)
t.Log("Downgrade cancelled, validating if cluster is in the right state")
e2e.ValidateMemberVersions(t, epc, generateIdenticalVersions(clusterSize, currentVersion))
Expand Down Expand Up @@ -200,7 +200,7 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
resp, err = cc.MemberAddAsLearner(context.Background(), "fake2", []string{"http://127.0.0.1:1002"})
require.NoError(t, err)
if triggerSnapshot {
t.Logf("Generating snapshot")
t.Log("Generating snapshot")
generateSnapshot(t, snapshotCount, cc)
verifySnapshot(t, epc)
}
Expand Down Expand Up @@ -228,6 +228,77 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
assert.Equal(t, beforeMembers.Members, afterMembers.Members)
}

func TestDowngradeAutoCancelAfterCompletion(t *testing.T) {
clusterSize := 3

currentEtcdBinary := e2e.BinPath.Etcd
lastReleaseBinary := e2e.BinPath.EtcdLastRelease
if !fileutil.Exist(lastReleaseBinary) {
t.Skipf("%q does not exist", lastReleaseBinary)
}

currentVersion, err := e2e.GetVersionFromBinary(currentEtcdBinary)
require.NoError(t, err)
// wipe any pre-release suffix like -alpha.0 we see commonly in builds
currentVersion.PreRelease = ""

lastVersion, err := e2e.GetVersionFromBinary(lastReleaseBinary)
require.NoError(t, err)

require.Equalf(t, lastVersion.Minor, currentVersion.Minor-1, "unexpected minor version difference")
currentVersionStr := currentVersion.String()
lastVersionStr := lastVersion.String()

lastClusterVersion := semver.New(lastVersionStr)
lastClusterVersion.Patch = 0

e2e.BeforeTest(t)

t.Logf("Create cluster with version %s", currentVersionStr)
var snapshotCount uint64 = 10
epc := newCluster(t, clusterSize, snapshotCount)
for i := 0; i < len(epc.Procs); i++ {
e2e.ValidateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: currentVersionStr,
Server: version.Version,
Storage: currentVersionStr,
})
}
cc := epc.Etcdctl()
t.Log("Cluster created")
if len(epc.Procs) > 1 {
t.Log("Waiting health interval to required to make membership changes")
time.Sleep(etcdserver.HealthInterval)
}

t.Log("Adding member to test membership, but a learner avoid breaking quorum")
resp, err := cc.MemberAddAsLearner(context.Background(), "fake1", []string{"http://127.0.0.1:1001"})
require.NoError(t, err)
t.Log("Removing learner to test membership")
_, err = cc.MemberRemove(context.Background(), resp.Member.ID)
require.NoError(t, err)
beforeMembers, beforeKV := getMembersAndKeys(t, cc)

e2e.DowngradeEnable(t, epc, lastVersion)

t.Logf("Starting downgrade process for all nodes to %q", lastVersionStr)
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, []int{0, 1, 2}, true, currentVersion, lastClusterVersion)
require.NoError(t, err)

afterMembers, afterKV := getMembersAndKeys(t, cc)
assert.Equal(t, beforeKV.Kvs, afterKV.Kvs)
assert.Equal(t, beforeMembers.Members, afterMembers.Members)

if len(epc.Procs) > 1 {
t.Log("Waiting health interval to required to make membership changes")
time.Sleep(etcdserver.HealthInterval)
}

e2e.DowngradeAutoCancelCheck(t, epc)
Copy link
Member

@ahrtr ahrtr Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be flaky, it depends on when the downgrade job gets triggered. A simple way is just to monitor/wait for the log entry "the cluster has been downgraded"

m.lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it's better to

  • monitor/wait for the log entry "the cluster has been downgraded"
  • call downgradeCancellation to verify that it's indeed cancelled

Right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first item is required, the second one is nice to have (optional).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! Will impl. in a bit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically, when you see the log message "the cluster has been downgraded", the leader hasn't send out the downgradeCancel request yet. So when you try to send downgradeCancel yourself in e2e.DowngradeAutoCancelCheck, it may succeed. I think it's easy to verify if you add a sleep in between.

But in practice, it's unlikely because there is a 5s (etcdserver.HealthInterval) sleep after the downgrade completion.

So either just fix comment https://github.com/etcd-io/etcd/pull/19399/files#r1957283593 or just remove e2e.DowngradeAutoCancelCheck (preferred)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest add a TODO item to followup once the downgrade query API is implemented. #19439 (comment)

t.Log("Downgrade cancellation is automatically cancelled since the cluster has been downgraded, validating if cluster is in the right state")
e2e.ValidateMemberVersions(t, epc, generateIdenticalVersions(clusterSize, lastClusterVersion))
}

func newCluster(t *testing.T, clusterSize int, snapshotCount uint64) *e2e.EtcdProcessCluster {
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithClusterSize(clusterSize),
Expand All @@ -250,7 +321,7 @@ func generateSnapshot(t *testing.T, snapshotCount uint64, cc *e2e.EtcdctlV3) {
defer cancel()

var i uint64
t.Logf("Adding keys")
t.Log("Adding keys")
for i = 0; i < snapshotCount*3; i++ {
err := cc.Put(ctx, fmt.Sprintf("%d", i), "1", config.PutOptions{})
assert.NoError(t, err)
Expand All @@ -264,7 +335,7 @@ func verifySnapshot(t *testing.T, epc *e2e.EtcdProcessCluster) {
_, err := ss.Load()
require.NoError(t, err)
}
t.Logf("All members have a valid snapshot")
t.Log("All members have a valid snapshot")
}

func verifySnapshotMembers(t *testing.T, epc *e2e.EtcdProcessCluster, expectedMembers *clientv3.MemberListResponse) {
Expand Down Expand Up @@ -301,11 +372,17 @@ func getMembersAndKeys(t *testing.T, cc *e2e.EtcdctlV3) (*clientv3.MemberListRes
func generateIdenticalVersions(clusterSize int, ver *semver.Version) []*version.Versions {
ret := make([]*version.Versions, clusterSize)

// storage version string is non-empty starting from 3.6.0
storageStr := ver.String()
if ver.LessThan(version.V3_6) {
storageStr = ""
}

for i := range clusterSize {
ret[i] = &version.Versions{
Cluster: ver.String(),
Server: ver.String(),
Storage: ver.String(),
Storage: storageStr,
}
}

Expand Down
27 changes: 25 additions & 2 deletions tests/framework/e2e/downgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/tests/v3/framework/testutils"
)
Expand Down Expand Up @@ -59,7 +60,7 @@ func DowngradeCancel(t *testing.T, epc *EtcdProcessCluster) {
var err error
testutils.ExecuteWithTimeout(t, 1*time.Minute, func() {
for {
t.Logf("etcdctl downgrade cancel")
t.Log("etcdctl downgrade cancel")
err = c.DowngradeCancel(context.TODO())
if err != nil {
if strings.Contains(err.Error(), "no inflight downgrade job") {
Expand All @@ -73,7 +74,7 @@ func DowngradeCancel(t *testing.T, epc *EtcdProcessCluster) {
continue
}

t.Logf("etcdctl downgrade cancel executed successfully")
t.Log("etcdctl downgrade cancel executed successfully")
break
}
})
Expand Down Expand Up @@ -128,6 +129,19 @@ func ValidateDowngradeInfo(t *testing.T, clus *EtcdProcessCluster, expected *pb.
}
}

func DowngradeAutoCancelCheck(t *testing.T, epc *EtcdProcessCluster) {
c := epc.Etcdctl()

var err error
testutils.ExecuteWithTimeout(t, 1*time.Minute, func() {
t.Log("etcdctl downgrade cancel")
err = c.DowngradeCancel(context.TODO())
require.ErrorContains(t, err, "no inflight downgrade job")
})

t.Log("Cluster downgrade is completed")
}

func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, numberOfMembersToChange int, downgradeEnabled bool, currentVersion, targetVersion *semver.Version) error {
membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange]
t.Logf("Elect members for operations on members: %v", membersToChange)
Expand Down Expand Up @@ -166,6 +180,15 @@ func DowngradeUpgradeMembersByID(t *testing.T, lg *zap.Logger, clus *EtcdProcess
t.Log("Waiting health interval to make sure the leader propagates version to new processes")
time.Sleep(etcdserver.HealthInterval)

if opString == "downgrading" && len(membersToChange) == len(clus.Procs) {
testutils.ExecuteWithTimeout(t, 30*time.Second, func() {
lg.Info("Waiting for downgrade completion log line")
leader := clus.WaitLeader(t)
_, err := clus.Procs[leader].Logs().ExpectWithContext(context.Background(), expect.ExpectedResponse{Value: "the cluster has been downgraded"})
require.NoError(t, err)
})
}

lg.Info("Validating versions")
clusterVersion := targetVersion
if !isDowngrade {
Expand Down