Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rbd: add support for flattenMode option for replication #4678

Merged
merged 1 commit into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ const (
// (optional) StartTime is the time the snapshot schedule
// begins, can be specified using the ISO 8601 time format.
schedulingStartTimeKey = "schedulingStartTime"

// flattenModeKey to get the flattenMode from the parameters.
// (optional) flattenMode decides how to handle images with parent.
// (default) If set to "never", the image with parent will not be flattened.
// If set to "force", the image with parent will be flattened.
flattenModeKey = "flattenMode"
)

// ReplicationServer struct of rbd CSI driver with supported methods of Replication
Expand Down Expand Up @@ -115,6 +121,27 @@ func getForceOption(ctx context.Context, parameters map[string]string) (bool, er
return force, nil
}

// getFlattenMode gets flatten mode from the input GRPC request parameters.
// flattenMode is the key to check the mode in the parameters.
func getFlattenMode(ctx context.Context, parameters map[string]string) (corerbd.FlattenMode, error) {
val, ok := parameters[flattenModeKey]
if !ok {
log.DebugLog(ctx, "%q is not set in parameters, setting to default (%v)",
flattenModeKey, corerbd.FlattenModeNever)

return corerbd.FlattenModeNever, nil
}

mode := corerbd.FlattenMode(val)
switch mode {
case corerbd.FlattenModeForce, corerbd.FlattenModeNever:
return mode, nil
}
log.ErrorLog(ctx, "%q=%q is not supported", flattenModeKey, val)

return mode, status.Errorf(codes.InvalidArgument, "%q=%q is not supported", flattenModeKey, val)
}

// getMirroringMode gets the mirroring mode from the input GRPC request parameters.
// mirroringMode is the key to check the mode in the parameters.
func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd.ImageMirrorMode, error) {
Expand Down Expand Up @@ -265,6 +292,11 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
if err != nil {
return nil, err
}
// extract the flatten mode
flattenMode, err := getFlattenMode(ctx, req.GetParameters())
if err != nil {
return nil, err
}
Comment on lines +297 to +299
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check seems to be wrong?


mirroringInfo, err := rbdVol.GetImageMirroringInfo()
if err != nil {
Expand All @@ -274,6 +306,12 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
}

if mirroringInfo.State != librbd.MirrorImageEnabled {
err = rbdVol.HandleParentImageExistence(ctx, flattenMode)
if err != nil {
log.ErrorLog(ctx, err.Error())

return nil, getGRPCError(err)
}
err = rbdVol.EnableImageMirroring(mirroringMode)
if err != nil {
log.ErrorLog(ctx, err.Error())
Expand Down Expand Up @@ -777,6 +815,7 @@ func getGRPCError(err error) error {

errorStatusMap := map[error]codes.Code{
corerbd.ErrInvalidArgument: codes.InvalidArgument,
corerbd.ErrFlattenInProgress: codes.Aborted,
corerbd.ErrAborted: codes.Aborted,
corerbd.ErrFailedPrecondition: codes.FailedPrecondition,
corerbd.ErrUnavailable: codes.Unavailable,
Expand Down
66 changes: 66 additions & 0 deletions internal/csi-addons/rbd/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,3 +641,69 @@ func Test_timestampFromString(t *testing.T) {
})
}
}

func Test_getFlattenMode(t *testing.T) {
t.Parallel()
type args struct {
ctx context.Context
parameters map[string]string
}
tests := []struct {
name string
args args
want corerbd.FlattenMode
wantErr bool
}{
{
name: "flattenMode option not set",
args: args{
ctx: context.TODO(),
parameters: map[string]string{},
},
want: corerbd.FlattenModeNever,
},
{
name: "flattenMode option set to never",
args: args{
ctx: context.TODO(),
parameters: map[string]string{
flattenModeKey: string(corerbd.FlattenModeNever),
},
},
want: corerbd.FlattenModeNever,
},
{
name: "flattenMode option set to force",
args: args{
ctx: context.TODO(),
parameters: map[string]string{
flattenModeKey: string(corerbd.FlattenModeForce),
},
},
want: corerbd.FlattenModeForce,
},

{
name: "flattenMode option set to invalid value",
args: args{
ctx: context.TODO(),
parameters: map[string]string{
flattenModeKey: "invalid123",
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
got, err := getFlattenMode(tt.args.ctx, tt.args.parameters)
if (err != nil) != tt.wantErr {
t.Errorf("getFlattenMode() error = %v, wantErr %v", err, tt.wantErr)
}
if !tt.wantErr && !reflect.DeepEqual(got, tt.want) {
t.Errorf("getFlattenMode() = %v, want %v", got, tt.want)
}
})
}
}
16 changes: 3 additions & 13 deletions internal/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,21 +1020,11 @@ func cleanupRBDImage(ctx context.Context,

// delete the temporary rbd image created as part of volume clone during
// create volume
tempClone := rbdVol.generateTempClone()
err = tempClone.deleteImage(ctx)
err = rbdVol.DeleteTempImage(ctx)
if err != nil {
if errors.Is(err, ErrImageNotFound) {
err = tempClone.ensureImageCleanup(ctx)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
} else {
// return error if it is not ErrImageNotFound
log.ErrorLog(ctx, "failed to delete rbd image: %s with error: %v",
tempClone, err)
log.ErrorLog(ctx, "failed to delete temporary rbd image: %v", err)

return nil, status.Error(codes.Internal, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}

// Deleting rbd image
Expand Down
63 changes: 63 additions & 0 deletions internal/rbd/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,69 @@ import (
librbd "github.com/ceph/go-ceph/rbd"
)

// FlattenMode is used to indicate the flatten mode for an RBD image.
type FlattenMode string

const (
// FlattenModeNever indicates that the image should never be flattened.
FlattenModeNever FlattenMode = "never"
// FlattenModeForce indicates that the image with the parent must be flattened.
FlattenModeForce FlattenMode = "force"
)

// HandleParentImageExistence checks the image's parent.
// if the parent image does not exist and is not in trash, it returns nil.
// if the flattenMode is FlattenModeForce, it flattens the image itself.
// if the parent image is in trash, it returns an error.
// if the parent image exists and is not enabled for mirroring, it returns an error.
func (rv *rbdVolume) HandleParentImageExistence(
ctx context.Context,
flattenMode FlattenMode,
) error {
if rv.ParentName == "" && !rv.ParentInTrash {
return nil
}

if flattenMode == FlattenModeForce {
// Delete temp image that exists for volume datasource since
// it is no longer required when the live image is flattened.
err := rv.DeleteTempImage(ctx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a comment why we are deleting the temporary clone here

if err != nil {
return fmt.Errorf("failed to delete temporary rbd image: %w", err)
}

err = rv.flattenRbdImage(ctx, true, 0, 0)
if err != nil {
return err
}
}

if rv.ParentInTrash {
return fmt.Errorf("%w: failed to enable mirroring on image %q:"+
" parent is in trash",
ErrFailedPrecondition, rv)
}

parent, err := rv.getParent()
if err != nil {
return err
}
parentMirroringInfo, err := parent.GetImageMirroringInfo()
if err != nil {
return fmt.Errorf(
"failed to get mirroring info of parent %q of image %q: %w",
parent, rv, err)
}

if parentMirroringInfo.State != librbd.MirrorImageEnabled {
return fmt.Errorf("%w: failed to enable mirroring on image %q: "+
"parent image %q is not enabled for mirroring",
ErrFailedPrecondition, rv, parent)
}

return nil
}

// EnableImageMirroring enables mirroring on an image.
func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error {
image, err := ri.open()
Expand Down
16 changes: 16 additions & 0 deletions internal/rbd/rbd_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,22 @@ func (ri *rbdImage) trashRemoveImage(ctx context.Context) error {
return nil
}

// DeleteTempImage deletes the temporary image created for volume datasource.
func (rv *rbdVolume) DeleteTempImage(ctx context.Context) error {
tempClone := rv.generateTempClone()
err := tempClone.deleteImage(ctx)
if err != nil {
if errors.Is(err, ErrImageNotFound) {
return tempClone.ensureImageCleanup(ctx)
} else {
// return error if it is not ErrImageNotFound
return err
}
}

return nil
}

func (ri *rbdImage) getCloneDepth(ctx context.Context) (uint, error) {
var depth uint
vol := rbdVolume{}
Expand Down