Skip to content

Commit

Permalink
rbd: add support for flattenMode option for replication
Browse files Browse the repository at this point in the history
This commit adds support for flattenMode option
for replication.
If the flattenMode is set to "force" in
volumereplicationclass parameters, cephcsi will
add a task to flatten the image if it has parent.
This enable cephcsi to then mirror such images after
flattening them.
The error message when the image's parent is
in trash or unmirrored is improved as well.

Signed-off-by: Rakshith R <[email protected]>
  • Loading branch information
Rakshith-R committed Jun 13, 2024
1 parent ec80175 commit 122a247
Show file tree
Hide file tree
Showing 31 changed files with 185 additions and 1,206 deletions.
42 changes: 42 additions & 0 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ const (
// (optional) StartTime is the time the snapshot schedule
// begins, can be specified using the ISO 8601 time format.
schedulingStartTimeKey = "schedulingStartTime"

// flattenModeKey to get the flattenMode from the parameters.
// (optional) flattenMode decides how to handle images with parent.
// (default) If set to "never", the image with parent will not be flattened.
// If set to "force", the image with parent will be flattened.
flattenModeKey = "flattenMode"
)

// ReplicationServer struct of rbd CSI driver with supported methods of Replication
Expand Down Expand Up @@ -115,6 +121,30 @@ func getForceOption(ctx context.Context, parameters map[string]string) (bool, er
return force, nil
}

// getFlattenMode gets flatten mode from the input GRPC request parameters.
// flattenMode is the key to check the mode in the parameters.
func getFlattenMode(ctx context.Context, parameters map[string]string) corerbd.FlattenMode {
val, ok := parameters[flattenModeKey]
if !ok {
log.DebugLog(ctx, "%s is not set in parameters, setting to default (%v)",
flattenModeKey, false)

return corerbd.FlattenModeNever
}

switch corerbd.FlattenMode(val) {
case corerbd.FlattenModeForce:
return corerbd.FlattenModeForce
case corerbd.FlattenModeNever:
return corerbd.FlattenModeNever
}

log.DebugLog(ctx, "flattenMode %v not supported, setting to default (%v)",
val, corerbd.FlattenModeNever)

return corerbd.FlattenModeNever
}

// getMirroringMode gets the mirroring mode from the input GRPC request parameters.
// mirroringMode is the key to check the mode in the parameters.
func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd.ImageMirrorMode, error) {
Expand Down Expand Up @@ -265,6 +295,11 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
if err != nil {
return nil, err
}
// extract the flatten mode
flattenMode := getFlattenMode(ctx, req.GetParameters())
if err != nil {
return nil, err
}

mirroringInfo, err := rbdVol.GetImageMirroringInfo()
if err != nil {
Expand All @@ -274,6 +309,12 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
}

if mirroringInfo.State != librbd.MirrorImageEnabled {
err = rbdVol.HandleParentImage(ctx, flattenMode)
if err != nil {
log.ErrorLog(ctx, err.Error())

return nil, getGRPCError(err)
}
err = rbdVol.EnableImageMirroring(mirroringMode)
if err != nil {
log.ErrorLog(ctx, err.Error())
Expand Down Expand Up @@ -777,6 +818,7 @@ func getGRPCError(err error) error {

errorStatusMap := map[error]codes.Code{
corerbd.ErrInvalidArgument: codes.InvalidArgument,
corerbd.ErrFlattenInProgress: codes.Aborted,
corerbd.ErrAborted: codes.Aborted,
corerbd.ErrFailedPrecondition: codes.FailedPrecondition,
corerbd.ErrUnavailable: codes.Unavailable,
Expand Down
63 changes: 62 additions & 1 deletion internal/csi-addons/rbd/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"time"

corerbd "github.com/ceph/ceph-csi/internal/rbd"

librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
"github.com/csi-addons/spec/lib/go/replication"
Expand Down Expand Up @@ -641,3 +640,65 @@ func Test_timestampFromString(t *testing.T) {
})
}
}

func Test_getFlattenMode(t *testing.T) {
type args struct {
ctx context.Context
parameters map[string]string
}
tests := []struct {
name string
args args
want corerbd.FlattenMode
}{
{
name: "flattenMode option not set",
args: args{
ctx: context.TODO(),
parameters: map[string]string{},
},
want: corerbd.FlattenModeNever,
},
{
name: "flattenMode option set to never",
args: args{
ctx: context.TODO(),
parameters: map[string]string{
flattenModeKey: string(corerbd.FlattenModeNever),
},
},
want: corerbd.FlattenModeNever,
},
{
name: "flattenMode option set to force",
args: args{
ctx: context.TODO(),
parameters: map[string]string{
flattenModeKey: string(corerbd.FlattenModeForce),
},
},
want: corerbd.FlattenModeForce,
},

{
name: "flattenMode option set to invalid value",
args: args{
ctx: context.TODO(),
parameters: map[string]string{
flattenModeKey: "invalid123",
},
},
want: corerbd.FlattenModeNever,
},
}
for _, tt := range tests {
currentTT := tt
t.Run(currentTT.name, func(t *testing.T) {
t.Parallel()
got := getFlattenMode(currentTT.args.ctx, currentTT.args.parameters)
if !reflect.DeepEqual(got, currentTT.want) {
t.Errorf("getFlattenMode() = %v, want %v", got, currentTT.want)
}
})
}
}
56 changes: 56 additions & 0 deletions internal/rbd/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,62 @@ import (
librbd "github.com/ceph/go-ceph/rbd"
)

// FlattenMode is used to indicate the flatten mode for an RBD image.
type FlattenMode string

const (
// FlattenModeNever indicates that the image should never be flattened.
FlattenModeNever FlattenMode = "never"
// FlattenModeForce indicates that the image with parent should definitely
// be flattened.
FlattenModeForce FlattenMode = "force"
)

// HandleParentImage checks the image's parent.
// if the parent image does not exist and is not in trash, it returns nil.
// if the flattenMode is FlattenModeForce, it flattens the image itself.
// if the parent image is in trash, it returns an error.
// if the parent image exists and is not enabled for mirroring, it returns an error.
func (ri *rbdImage) HandleParentImage(
ctx context.Context,
flattenMode FlattenMode) error {
if ri.ParentName == "" && !ri.ParentInTrash {
return nil
}

if flattenMode == FlattenModeForce {
err := ri.flattenRbdImage(ctx, true, 0, 0)
if err != nil {
return err
}
}

if ri.ParentInTrash {
return fmt.Errorf("%w: failed to enable mirroring on image %q:"+
" parent is in trash",
ErrFailedPrecondition, ri)
}

parent, err := ri.getParent()
if err != nil {
return err
}
parentMirroringInfo, err := parent.GetImageMirroringInfo()
if err != nil {
return fmt.Errorf(
"failed to get mirroring info of parent %q of image %q: %w",
ri, parent, err)
}

if parentMirroringInfo.State == librbd.MirrorImageEnabled {
return nil
}

return fmt.Errorf("failed to enable mirroring on image %q:"+
"parent image %q is not enabled for mirroring",
ri, parent)
}

// EnableImageMirroring enables mirroring on an image.
func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error {
image, err := ri.open()
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 122a247

Please sign in to comment.