-
Notifications
You must be signed in to change notification settings - Fork 559
/
Copy pathnodeserver.go
1416 lines (1221 loc) · 43.3 KB
/
nodeserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright 2018 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/fscrypt"
"github.com/ceph/ceph-csi/internal/util/log"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/kubernetes/pkg/volume"
mount "k8s.io/mount-utils"
utilexec "k8s.io/utils/exec"
)
// NodeServer struct of ceph rbd driver with supported methods of CSI
// node server spec.
type NodeServer struct {
*csicommon.DefaultNodeServer
// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID) return an Aborted error
VolumeLocks *util.VolumeLocks
// readAffinityMapOptions contains map options to enable read affinity.
readAffinityMapOptions string
}
// stageTransaction struct represents the state a transaction was when it either completed
// or failed
// this transaction state can be used to rollback the transaction.
type stageTransaction struct {
// isStagePathCreated represents whether the mount path to stage the volume on was created or not
isStagePathCreated bool
// isMounted represents if the volume was mounted or not
isMounted bool
// isBlockEncrypted represents if the volume was encrypted or not
isBlockEncrypted bool
// devicePath represents the path where rbd device is mapped
devicePath string
}
const (
// values for xfsHasReflink.
xfsReflinkUnset int = iota
xfsReflinkNoSupport
xfsReflinkSupport
staticVol = "staticVolume"
volHealerCtx = "volumeHealerContext"
tryOtherMounters = "tryOtherMounters"
)
var (
kernelRelease = ""
// deepFlattenSupport holds the list of kernel which support mapping rbd
// image with deep-flatten image feature
//nolint:gomnd // numbers specify Kernel versions.
deepFlattenSupport = []util.KernelVersion{
{
Version: 5,
PatchLevel: 1,
SubLevel: 0,
ExtraVersion: 0,
Distribution: "",
Backport: false,
}, // standard 5.1+ versions
{
Version: 4,
PatchLevel: 18,
SubLevel: 0,
ExtraVersion: 193,
Distribution: ".el8",
Backport: true,
}, // RHEL 8.2
}
// xfsHasReflink is set by xfsSupportsReflink(), use the function when
// checking the support for reflink.
xfsHasReflink = xfsReflinkUnset
mkfsDefaultArgs = map[string][]string{
"ext4": {"-m0", "-Enodiscard,lazy_itable_init=1,lazy_journal_init=1"},
"xfs": {"-K"},
}
mountDefaultOpts = map[string][]string{
"xfs": {"nouuid"},
}
)
// parseBoolOption checks if parameters contain option and parse it. If it is
// empty or not set return default.
//
//nolint:unparam // currently defValue is always false, this can change in the future
func parseBoolOption(ctx context.Context, parameters map[string]string, optionName string, defValue bool) bool {
boolVal := defValue
if val, ok := parameters[optionName]; ok {
var err error
if boolVal, err = strconv.ParseBool(val); err != nil {
log.ErrorLog(ctx, "failed to parse value of %q: %q", optionName, val)
}
}
return boolVal
}
// healerStageTransaction attempts to attach the rbd Image with previously
// updated device path at stashFile.
func healerStageTransaction(ctx context.Context, cr *util.Credentials, volOps *rbdVolume, metaDataPath string) error {
imgInfo, err := lookupRBDImageMetadataStash(metaDataPath)
if err != nil {
log.ErrorLog(ctx, "failed to find image metadata, at stagingPath: %s, err: %v", metaDataPath, err)
return err
}
if imgInfo.DevicePath == "" {
return fmt.Errorf("device is empty in image metadata, at stagingPath: %s", metaDataPath)
}
var devicePath string
devicePath, err = attachRBDImage(ctx, volOps, imgInfo.DevicePath, cr)
if err != nil {
return err
}
log.DebugLog(ctx, "rbd volID: %s was successfully attached to device: %s", volOps.VolID, devicePath)
return nil
}
// populateRbdVol update the fields in rbdVolume struct based on the request it received.
// this function also receive the credentials and secrets args as it differs in its data.
// The credentials are used directly by functions like voljournal.Connect() and other functions
// like genVolFromVolumeOptions() make use of secrets.
func (ns *NodeServer) populateRbdVol(
ctx context.Context,
req *csi.NodeStageVolumeRequest,
cr *util.Credentials,
) (*rbdVolume, error) {
var err error
volID := req.GetVolumeId()
isBlock := req.GetVolumeCapability().GetBlock() != nil
disableInUseChecks := false
// MULTI_NODE_MULTI_WRITER is supported by default for Block access type volumes
if req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER {
if !isBlock {
log.WarningLog(
ctx,
"MULTI_NODE_MULTI_WRITER currently only supported with volumes of access type `block`,"+
"invalid AccessMode for volume: %v",
req.GetVolumeId(),
)
return nil, status.Error(
codes.InvalidArgument,
"rbd: RWX access mode request is only valid for volumes with access type `block`",
)
}
disableInUseChecks = true
}
var rv *rbdVolume
isStaticVol := parseBoolOption(ctx, req.GetVolumeContext(), staticVol, false)
// get rbd image name from the volume journal
// for static volumes, the image name is actually the volume ID itself
if isStaticVol {
if req.GetVolumeContext()[intreeMigrationKey] == intreeMigrationLabel {
// if migration static volume, use imageName as volID
volID = req.GetVolumeContext()["imageName"]
}
rv, err = genVolFromVolumeOptions(ctx, req.GetVolumeContext(), disableInUseChecks, true)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
rv.RbdImageName = volID
} else {
rv, err = GenVolFromVolID(ctx, volID, cr, req.GetSecrets())
if err != nil {
rv.Destroy()
log.ErrorLog(ctx, "error generating volume %s: %v", volID, err)
return nil, status.Errorf(codes.Internal, "error generating volume %s: %v", volID, err)
}
rv.DataPool = req.GetVolumeContext()["dataPool"]
var ok bool
if rv.Mounter, ok = req.GetVolumeContext()["mounter"]; !ok {
rv.Mounter = rbdDefaultMounter
}
}
rv.DisableInUseChecks = disableInUseChecks
err = rv.Connect(cr)
if err != nil {
log.ErrorLog(ctx, "failed to connect to volume %s: %v", rv, err)
return nil, status.Error(codes.Internal, err.Error())
}
// in case of any error call Destroy for cleanup.
defer func() {
if err != nil {
rv.Destroy()
}
}()
// get the image details from the ceph cluster.
err = rv.getImageInfo()
if err != nil {
log.ErrorLog(ctx, "failed to get image details %s: %v", rv, err)
return nil, status.Error(codes.Internal, err.Error())
}
err = rv.initKMS(req.GetVolumeContext(), req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
features := strings.Join(rv.ImageFeatureSet.Names(), ",")
isFeatureExist, err := isKrbdFeatureSupported(ctx, features)
if err != nil && !errors.Is(err, os.ErrNotExist) {
log.ErrorLog(ctx, "failed checking krbd features %q: %v", features, err)
return nil, status.Error(codes.Internal, err.Error())
}
if rv.Mounter == rbdDefaultMounter && !isFeatureExist {
if !parseBoolOption(ctx, req.GetVolumeContext(), tryOtherMounters, false) {
log.ErrorLog(ctx, "unsupported krbd Feature, set `tryOtherMounters:true` or fix krbd driver")
err = errors.New("unsupported krbd Feature")
return nil, status.Error(codes.Internal, err.Error())
}
// fallback to rbd-nbd,
rv.Mounter = rbdNbdMounter
}
err = getMapOptions(req, rv)
if err != nil {
return nil, err
}
ns.appendReadAffinityMapOptions(rv)
rv.VolID = volID
rv.LogDir = req.GetVolumeContext()["cephLogDir"]
if rv.LogDir == "" {
rv.LogDir = defaultLogDir
}
rv.LogStrategy = req.GetVolumeContext()["cephLogStrategy"]
if rv.LogStrategy == "" {
rv.LogStrategy = defaultLogStrategy
}
return rv, err
}
// appendReadAffinityMapOptions appends readAffinityMapOptions to mapOptions
// if mounter is rbdDefaultMounter and readAffinityMapOptions is not empty.
func (ns NodeServer) appendReadAffinityMapOptions(rv *rbdVolume) {
switch {
case ns.readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter:
return
case rv.MapOptions != "":
rv.MapOptions += "," + ns.readAffinityMapOptions
default:
rv.MapOptions = ns.readAffinityMapOptions
}
}
// NodeStageVolume mounts the volume to a staging path on the node.
// Implementation notes:
// - stagingTargetPath is the directory passed in the request where the volume needs to be staged
// - We stage the volume into a directory, named after the VolumeID inside stagingTargetPath if
// it is a file system
// - We stage the volume into a file, named after the VolumeID inside stagingTargetPath if it is
// a block volume
// - Order of operation execution: (useful for defer stacking and when Unstaging to ensure steps
// are done in reverse, this is done in undoStagingTransaction)
// - Stash image metadata under staging path
// - Map the image (creates a device)
// - Create the staging file/directory under staging path
// - Stage the device (mount the device mapped for image)
func (ns *NodeServer) NodeStageVolume(
ctx context.Context,
req *csi.NodeStageVolumeRequest,
) (*csi.NodeStageVolumeResponse, error) {
var err error
if err = util.ValidateNodeStageVolumeRequest(req); err != nil {
return nil, err
}
volID := req.GetVolumeId()
cr, err := util.NewUserCredentialsWithMigration(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
defer cr.DeleteCredentials()
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
}
defer ns.VolumeLocks.Release(volID)
stagingParentPath := req.GetStagingTargetPath()
stagingTargetPath := stagingParentPath + "/" + volID
isHealer := parseBoolOption(ctx, req.GetVolumeContext(), volHealerCtx, false)
if !isHealer {
var isNotMnt bool
// check if stagingPath is already mounted
isNotMnt, err = isNotMountPoint(ns.Mounter, stagingTargetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else if !isNotMnt {
log.DebugLog(ctx, "rbd: volume %s is already mounted to %s, skipping", volID, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
}
}
isStaticVol := parseBoolOption(ctx, req.GetVolumeContext(), staticVol, false)
rv, err := ns.populateRbdVol(ctx, req, cr)
if err != nil {
return nil, err
}
defer rv.Destroy()
rv.NetNamespaceFilePath, err = util.GetRBDNetNamespaceFilePath(util.CsiConfigFile, rv.ClusterID)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if isHealer {
err = healerStageTransaction(ctx, cr, rv, stagingParentPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.NodeStageVolumeResponse{}, nil
}
// Stash image details prior to mapping the image (useful during Unstage as it has no
// voloptions passed to the RPC as per the CSI spec)
err = stashRBDImageMetadata(rv, stagingParentPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
// perform the actual staging and if this fails, have undoStagingTransaction
// cleans up for us
txn, err := ns.stageTransaction(ctx, req, cr, rv, isStaticVol)
defer func() {
if err != nil {
ns.undoStagingTransaction(ctx, req, txn, rv)
}
}()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
log.DebugLog(
ctx,
"rbd: successfully mounted volume %s to stagingTargetPath %s",
volID,
stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
}
func (ns *NodeServer) stageTransaction(
ctx context.Context,
req *csi.NodeStageVolumeRequest,
cr *util.Credentials,
volOptions *rbdVolume,
staticVol bool,
) (*stageTransaction, error) {
transaction := &stageTransaction{}
var err error
// Allow image to be mounted on multiple nodes if it is ROX
if req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY {
log.ExtendedLog(ctx, "setting disableInUseChecks on rbd volume to: %v", req.GetVolumeId)
volOptions.DisableInUseChecks = true
volOptions.readOnly = true
}
err = flattenImageBeforeMapping(ctx, volOptions)
if err != nil {
return transaction, err
}
// Mapping RBD image
var devicePath string
devicePath, err = attachRBDImage(ctx, volOptions, devicePath, cr)
if err != nil {
return transaction, err
}
transaction.devicePath = devicePath
log.DebugLog(ctx, "rbd image: %s was successfully mapped at %s\n",
volOptions, devicePath)
// userspace mounters like nbd need the device path as a reference while
// restarting the userspace processes on a nodeplugin restart. For kernel
// mounter(krbd) we don't need it as there won't be any process running
// in userspace, hence we don't store the device path for krbd devices.
if volOptions.Mounter == rbdNbdMounter {
err = updateRBDImageMetadataStash(req.GetStagingTargetPath(), devicePath)
if err != nil {
return transaction, err
}
}
if volOptions.isBlockEncrypted() {
devicePath, err = ns.processEncryptedDevice(ctx, volOptions, devicePath)
if err != nil {
return transaction, err
}
transaction.isBlockEncrypted = true
}
if volOptions.isFileEncrypted() {
if err = fscrypt.InitializeNode(ctx); err != nil {
return transaction, fmt.Errorf("file encryption setup for %s failed: %w", volOptions.VolID, err)
}
}
stagingTargetPath := getStagingTargetPath(req)
isBlock := req.GetVolumeCapability().GetBlock() != nil
err = ns.createStageMountPoint(ctx, stagingTargetPath, isBlock)
if err != nil {
return transaction, err
}
transaction.isStagePathCreated = true
// nodeStage Path
err = ns.mountVolumeToStagePath(ctx, req, staticVol, stagingTargetPath, devicePath, volOptions.isFileEncrypted())
if err != nil {
return transaction, err
}
transaction.isMounted = true
if volOptions.isFileEncrypted() {
log.DebugLog(ctx, "rbd fscrypt: trying to unlock filesystem on %s image %s", stagingTargetPath, volOptions.VolID)
err = fscrypt.Unlock(ctx, volOptions.fileEncryption, stagingTargetPath, volOptions.VolID)
if err != nil {
return transaction, fmt.Errorf("file system encryption unlock in %s image %s failed: %w",
stagingTargetPath, volOptions.VolID, err)
}
}
// As we are supporting the restore of a volume to a bigger size and
// creating bigger size clone from a volume, we need to check filesystem
// resize is required, if required resize filesystem.
// in case of encrypted block PVC resize only the LUKS device.
err = resizeNodeStagePath(ctx, isBlock, transaction, req.GetVolumeId(), stagingTargetPath)
if err != nil {
return transaction, err
}
return transaction, err
}
// resizeNodeStagePath resizes the device if its encrypted and it also resizes
// the stagingTargetPath if filesystem needs resize.
func resizeNodeStagePath(ctx context.Context,
isBlock bool,
transaction *stageTransaction,
volID,
stagingTargetPath string,
) error {
var err error
devicePath := transaction.devicePath
var ok bool
// if its a non encrypted block device we dont need any expansion
if isBlock && !transaction.isBlockEncrypted {
return nil
}
resizer := mount.NewResizeFs(utilexec.New())
if transaction.isBlockEncrypted {
devicePath, err = resizeEncryptedDevice(ctx, volID, stagingTargetPath, devicePath)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
// If this is a AccessType=Block volume, do not attempt
// filesystem resize. The application is in charge of the data
// on top of the raw block-device, we can not assume there is a
// filesystem at all.
if isBlock {
return nil
}
}
// check stagingPath needs resize.
ok, err = resizer.NeedResize(devicePath, stagingTargetPath)
if err != nil {
return status.Errorf(codes.Internal,
"need resize check failed on devicePath %s and staingPath %s, error: %v",
devicePath,
stagingTargetPath,
err)
}
// return nil if no resize is required
if !ok {
return nil
}
ok, err = resizer.Resize(devicePath, stagingTargetPath)
if !ok {
return status.Errorf(codes.Internal,
"resize failed on path %s, error: %v", stagingTargetPath, err)
}
return nil
}
func resizeEncryptedDevice(ctx context.Context, volID, stagingTargetPath, devicePath string) (string, error) {
rbdDevSize, err := getDeviceSize(ctx, devicePath)
if err != nil {
return "", fmt.Errorf(
"failed to get device size of %s and staingPath %s, error: %w",
devicePath,
stagingTargetPath,
err)
}
_, mapperPath := util.VolumeMapper(volID)
encDevSize, err := getDeviceSize(ctx, mapperPath)
if err != nil {
return "", fmt.Errorf(
"failed to get device size of %s and staingPath %s, error: %w",
mapperPath,
stagingTargetPath,
err)
}
// if the rbd device `/dev/rbd0` size is greater than LUKS device size
// we need to resize the LUKS device.
if rbdDevSize > encDevSize {
// The volume is encrypted, resize an active mapping
err = util.ResizeEncryptedVolume(ctx, mapperPath)
if err != nil {
log.ErrorLog(ctx, "failed to resize device %s: %v",
mapperPath, err)
return "", fmt.Errorf(
"failed to resize device %s: %w", mapperPath, err)
}
}
return mapperPath, nil
}
func flattenImageBeforeMapping(
ctx context.Context,
volOptions *rbdVolume,
) error {
var err error
var feature bool
var depth uint
if kernelRelease == "" {
// fetch the current running kernel info
kernelRelease, err = util.GetKernelVersion()
if err != nil {
return err
}
}
if !util.CheckKernelSupport(kernelRelease, deepFlattenSupport) && !skipForceFlatten {
feature, err = volOptions.checkImageChainHasFeature(ctx, librbd.FeatureDeepFlatten)
if err != nil {
return err
}
depth, err = volOptions.getCloneDepth(ctx)
if err != nil {
return err
}
if feature || depth != 0 {
err = volOptions.flattenRbdImage(ctx, true, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
return err
}
}
}
return nil
}
func (ns *NodeServer) undoStagingTransaction(
ctx context.Context,
req *csi.NodeStageVolumeRequest,
transaction *stageTransaction,
volOptions *rbdVolume,
) {
var err error
stagingTargetPath := getStagingTargetPath(req)
if transaction.isMounted {
err = ns.Mounter.Unmount(stagingTargetPath)
if err != nil {
log.ErrorLog(ctx, "failed to unmount stagingtargetPath: %s with error: %v", stagingTargetPath, err)
return
}
}
// remove the file/directory created on staging path
if transaction.isStagePathCreated {
err = os.Remove(stagingTargetPath)
if err != nil {
log.ErrorLog(ctx, "failed to remove stagingtargetPath: %s with error: %v", stagingTargetPath, err)
// continue on failure to unmap the image, as leaving stale images causes more issues than a stale
// file/directory
}
}
volID := req.GetVolumeId()
// Unmapping rbd device
if transaction.devicePath != "" {
err = detachRBDDevice(ctx, transaction.devicePath, volID, volOptions.UnmapOptions, transaction.isBlockEncrypted)
if err != nil {
log.ErrorLog(
ctx,
"failed to unmap rbd device: %s for volume %s with error: %v",
transaction.devicePath,
volID,
err)
// continue on failure to delete the stash file, as kubernetes will fail to delete the staging path
// otherwise
}
}
// Cleanup the stashed image metadata
if err = cleanupRBDImageMetadataStash(req.GetStagingTargetPath()); err != nil {
log.ErrorLog(ctx, "failed to cleanup image metadata stash (%v)", err)
return
}
}
func (ns *NodeServer) createStageMountPoint(ctx context.Context, mountPath string, isBlock bool) error {
if isBlock {
// #nosec:G304, intentionally creating file mountPath, not a security issue
pathFile, err := os.OpenFile(mountPath, os.O_CREATE|os.O_RDWR, 0o600)
if err != nil {
log.ErrorLog(ctx, "failed to create mountPath:%s with error: %v", mountPath, err)
return status.Error(codes.Internal, err.Error())
}
if err = pathFile.Close(); err != nil {
log.ErrorLog(ctx, "failed to close mountPath:%s with error: %v", mountPath, err)
return status.Error(codes.Internal, err.Error())
}
return nil
}
err := os.Mkdir(mountPath, 0o750)
if err != nil {
if !os.IsExist(err) {
log.ErrorLog(ctx, "failed to create mountPath:%s with error: %v", mountPath, err)
return status.Error(codes.Internal, err.Error())
}
}
return nil
}
// NodePublishVolume mounts the volume mounted to the device path to the target
// path.
func (ns *NodeServer) NodePublishVolume(
ctx context.Context,
req *csi.NodePublishVolumeRequest,
) (*csi.NodePublishVolumeResponse, error) {
err := util.ValidateNodePublishVolumeRequest(req)
if err != nil {
return nil, err
}
targetPath := req.GetTargetPath()
isBlock := req.GetVolumeCapability().GetBlock() != nil
stagingPath := req.GetStagingTargetPath()
volID := req.GetVolumeId()
stagingPath += "/" + volID
// Considering kubelet make sure the stage and publish operations
// are serialized, we dont need any extra locking in nodePublish
// Check if that target path exists properly
notMnt, err := ns.createTargetMountPath(ctx, targetPath, isBlock)
if err != nil {
return nil, err
}
if !notMnt {
return &csi.NodePublishVolumeResponse{}, nil
}
fileEncrypted, err := IsFileEncrypted(ctx, req.GetVolumeContext())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if fileEncrypted {
stagingPath = fscrypt.AppendEncyptedSubdirectory(stagingPath)
if err = fscrypt.IsDirectoryUnlocked(stagingPath, req.GetVolumeCapability().GetMount().GetFsType()); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
// Publish Path
err = ns.mountVolume(ctx, stagingPath, req)
if err != nil {
return nil, err
}
log.DebugLog(ctx, "rbd: successfully mounted stagingPath %s to targetPath %s", stagingPath, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
func (ns *NodeServer) mountVolumeToStagePath(
ctx context.Context,
req *csi.NodeStageVolumeRequest,
staticVol bool,
stagingPath, devicePath string,
fileEncryption bool,
) error {
readOnly := false
fsType := req.GetVolumeCapability().GetMount().GetFsType()
diskMounter := &mount.SafeFormatAndMount{Interface: ns.Mounter, Exec: utilexec.New()}
// rbd images are thin-provisioned and return zeros for unwritten areas. A freshly created
// image will not benefit from discard and we also want to avoid as much unnecessary zeroing
// as possible. Open-code mkfs here because FormatAndMount() doesn't accept custom mkfs
// options.
//
// Note that "freshly" is very important here. While discard is more of a nice to have,
// lazy_journal_init=1 is plain unsafe if the image has been written to before and hasn't
// been zeroed afterwards (unlike the name suggests, it leaves the journal completely
// uninitialized and carries a risk until the journal is overwritten and wraps around for
// the first time).
existingFormat, err := diskMounter.GetDiskFormat(devicePath)
if err != nil {
log.ErrorLog(ctx, "failed to get disk format for path %s, error: %v", devicePath, err)
return err
}
opt := mountDefaultOpts[fsType]
opt = append(opt, "_netdev")
opt = csicommon.ConstructMountOptions(opt, req.GetVolumeCapability())
isBlock := req.GetVolumeCapability().GetBlock() != nil
rOnly := "ro"
if req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY ||
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY {
if !csicommon.MountOptionContains(opt, rOnly) {
opt = append(opt, rOnly)
}
}
if csicommon.MountOptionContains(opt, rOnly) {
readOnly = true
}
if existingFormat == "" && !staticVol && !readOnly && !isBlock {
args := mkfsDefaultArgs[fsType]
// if the VolumeContext contains "mkfsOptions", use those as args instead
volumeCtx := req.GetVolumeContext()
if volumeCtx != nil {
mkfsOptions := volumeCtx["mkfsOptions"]
if mkfsOptions != "" {
args = strings.Split(mkfsOptions, " ")
}
}
// add extra arguments depending on the filesystem
mkfs := "mkfs." + fsType
switch fsType {
case "ext4":
if fileEncryption {
args = append(args, "-Oencrypt")
}
case "xfs":
// always disable reflink
// TODO: make enabling an option, see ceph/ceph-csi#1256
if ns.xfsSupportsReflink() {
args = append(args, "-m", "reflink=0")
}
case "":
// no filesystem type specified, just use "mkfs"
mkfs = "mkfs"
}
// add device as last argument
args = append(args, devicePath)
cmdOut, cmdErr := diskMounter.Exec.Command(mkfs, args...).CombinedOutput()
if cmdErr != nil {
log.ErrorLog(ctx, "failed to run mkfs.%s (%v) error: %v, output: %v", fsType, args, cmdErr, string(cmdOut))
return cmdErr
}
}
if isBlock {
opt = append(opt, "bind")
err = diskMounter.MountSensitiveWithoutSystemd(devicePath, stagingPath, fsType, opt, nil)
} else {
err = diskMounter.FormatAndMount(devicePath, stagingPath, fsType, opt)
}
if err != nil {
log.ErrorLog(ctx,
"failed to mount device path (%s) to staging path (%s) for volume "+
"(%s) error: %s Check dmesg logs if required.",
devicePath,
stagingPath,
req.GetVolumeId(),
err)
}
return err
}
func (ns *NodeServer) mountVolume(ctx context.Context, stagingPath string, req *csi.NodePublishVolumeRequest) error {
// Publish Path
fsType := req.GetVolumeCapability().GetMount().GetFsType()
readOnly := req.GetReadonly()
mountOptions := []string{"bind", "_netdev"}
isBlock := req.GetVolumeCapability().GetBlock() != nil
targetPath := req.GetTargetPath()
mountOptions = csicommon.ConstructMountOptions(mountOptions, req.GetVolumeCapability())
log.DebugLog(ctx, "target %v\nisBlock %v\nfstype %v\nstagingPath %v\nreadonly %v\nmountflags %v\n",
targetPath, isBlock, fsType, stagingPath, readOnly, mountOptions)
if readOnly {
mountOptions = append(mountOptions, "ro")
}
if err := util.Mount(ns.Mounter, stagingPath, targetPath, fsType, mountOptions); err != nil {
return status.Error(codes.Internal, err.Error())
}
return nil
}
func (ns *NodeServer) createTargetMountPath(ctx context.Context, mountPath string, isBlock bool) (bool, error) {
// Check if that mount path exists properly
notMnt, err := ns.Mounter.IsLikelyNotMountPoint(mountPath)
if err == nil {
return notMnt, nil
}
if !os.IsNotExist(err) {
return false, status.Error(codes.Internal, err.Error())
}
if isBlock {
// #nosec
pathFile, e := os.OpenFile(mountPath, os.O_CREATE|os.O_RDWR, 0o750)
if e != nil {
log.DebugLog(ctx, "Failed to create mountPath:%s with error: %v", mountPath, err)
return notMnt, status.Error(codes.Internal, e.Error())
}
if err = pathFile.Close(); err != nil {
log.DebugLog(ctx, "Failed to close mountPath:%s with error: %v", mountPath, err)
return notMnt, status.Error(codes.Internal, err.Error())
}
} else {
// Create a mountpath directory
if err = util.CreateMountPoint(mountPath); err != nil {
return notMnt, status.Error(codes.Internal, err.Error())
}
}
notMnt = true
return notMnt, err
}
// NodeUnpublishVolume unmounts the volume from the target path.
func (ns *NodeServer) NodeUnpublishVolume(
ctx context.Context,
req *csi.NodeUnpublishVolumeRequest,
) (*csi.NodeUnpublishVolumeResponse, error) {
err := util.ValidateNodeUnpublishVolumeRequest(req)
if err != nil {
return nil, err
}
targetPath := req.GetTargetPath()
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.
isMnt, err := ns.Mounter.IsMountPoint(targetPath)
if err != nil {
if os.IsNotExist(err) {
// targetPath has already been deleted
log.DebugLog(ctx, "targetPath: %s has already been deleted", targetPath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
return nil, status.Error(codes.NotFound, err.Error())
}
if !isMnt {
if err = os.RemoveAll(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.NodeUnpublishVolumeResponse{}, nil
}
if err = ns.Mounter.Unmount(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if err = os.RemoveAll(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
log.DebugLog(ctx, "rbd: successfully unbound volume %s from %s", req.GetVolumeId(), targetPath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
// getStagingTargetPath concats either NodeStageVolumeRequest's or
// NodeUnstageVolumeRequest's target path with the volumeID.
func getStagingTargetPath(req interface{}) string {
switch vr := req.(type) {
case *csi.NodeStageVolumeRequest:
return vr.GetStagingTargetPath() + "/" + vr.GetVolumeId()
case *csi.NodeUnstageVolumeRequest:
return vr.GetStagingTargetPath() + "/" + vr.GetVolumeId()
}
return ""
}
// NodeUnstageVolume unstages the volume from the staging path.
func (ns *NodeServer) NodeUnstageVolume(
ctx context.Context,
req *csi.NodeUnstageVolumeRequest,
) (*csi.NodeUnstageVolumeResponse, error) {
var err error
if err = util.ValidateNodeUnstageVolumeRequest(req); err != nil {
return nil, err
}
volID := req.GetVolumeId()
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
}
defer ns.VolumeLocks.Release(volID)
stagingParentPath := req.GetStagingTargetPath()
stagingTargetPath := getStagingTargetPath(req)
isMnt, err := ns.Mounter.IsMountPoint(stagingTargetPath)
if err != nil {
if !os.IsNotExist(err) {
return nil, status.Error(codes.NotFound, err.Error())
}
// Continue on ENOENT errors as we may still have the image mapped
isMnt = false
}
if isMnt {
// Unmounting the image
err = ns.Mounter.Unmount(stagingTargetPath)
if err != nil {
log.ExtendedLog(ctx, "failed to unmount targetPath: %s with error: %v", stagingTargetPath, err)