Skip to content

Commit

Permalink
Merge PR ceph#52670 into wip-vshankar-testing-20240217.015652
Browse files Browse the repository at this point in the history
* refs/pull/52670/head:
	doc: add the reject the clone when threads are not available feature in the document
	qa: add test cases for the support to reject clones feature
	mgr/volumes: support to reject CephFS clones if cloner threads are not available

Reviewed-by: Kotresh Hiremath Ravishankar <[email protected]>
Reviewed-by: Venky Shankar <[email protected]>
  • Loading branch information
vshankar committed Feb 17, 2024
2 parents 1d007aa + 6a44322 commit 1c920af
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 12 deletions.
9 changes: 9 additions & 0 deletions PendingReleaseNotes
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ CephFS: Disallow delegating preallocated inode ranges to clients. Config
isn't scalable. So we have removed the 'network_ping_times' section from
the output. Details in the tracker: https://tracker.ceph.com/issues/57460

* CephFS: The `subvolume snapshot clone` command now depends on the config option
`snapshot_clone_no_wait` which is used to reject the clone operation when
all the cloner threads are busy. This config option is enabled by default which means
that if no cloner threads are free, the clone request errors out with EAGAIN.
The value of the config option can be fetched by using:
`ceph config get mgr mgr/volumes/snapshot_clone_no_wait`
and it can be disabled by using:
`ceph config set mgr mgr/volumes/snapshot_clone_no_wait false`

>=18.0.0

* The RGW policy parser now rejects unknown principals by default. If you are
Expand Down
31 changes: 25 additions & 6 deletions doc/cephfs/fs-volumes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,8 @@ To initiate a clone operation use:

ceph fs subvolume snapshot clone <vol_name> <subvol_name> <snap_name> <target_subvol_name>

.. note:: ``subvolume snapshot clone`` command depends upon the above mentioned config option ``snapshot_clone_no_wait``

If a snapshot (source subvolume) is a part of non-default group, the group name needs to be specified:

.. prompt:: bash #
Expand All @@ -597,12 +599,6 @@ Similar to specifying a pool layout when creating a subvolume, pool layout can b

ceph fs subvolume snapshot clone <vol_name> <subvol_name> <snap_name> <target_subvol_name> --pool_layout <pool_layout>

Configure the maximum number of concurrent clones. The default is 4:

.. prompt:: bash #

ceph config set mgr mgr/volumes/max_concurrent_clones <value>

To check the status of a clone operation use:

.. prompt:: bash #
Expand Down Expand Up @@ -728,6 +724,29 @@ On successful cancellation, the cloned subvolume is moved to the ``canceled`` st

.. note:: The canceled cloned may be deleted by supplying the ``--force`` option to the `fs subvolume rm` command.

Configurables
~~~~~~~~~~~~~

Configure the maximum number of concurrent clone operations. The default is 4:

.. prompt:: bash #

ceph config set mgr mgr/volumes/max_concurrent_clones <value>

Configure the snapshot_clone_no_wait option :

.. prompt:: bash #

``snapshot_clone_no_wait`` config option is used to reject the clone creation request when the cloner threads
( which can be configured using above option i.e. ``max_concurrent_clones``) are not available.
It is enabled by default i.e. the value set is True, whereas it can be configured by using below command.

ceph config set mgr mgr/volumes/snapshot_clone_no_wait <bool>

The current value of ``snapshot_clone_no_wait`` can be fetched by using below command.

ceph config get mgr mgr/volumes/snapshot_clone_no_wait


.. _subvol-pinning:

Expand Down
158 changes: 158 additions & 0 deletions qa/tasks/cephfs/test_volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7000,6 +7000,11 @@ def test_subvolume_snapshot_clone_cancel_pending(self):
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)

# Disable the snapshot_clone_no_wait config option
self.config_set('mgr', 'mgr/volumes/snapshot_clone_no_wait', False)
threads_available = self.config_get('mgr', 'mgr/volumes/snapshot_clone_no_wait')
self.assertEqual(threads_available, 'false')

# schedule clones
for clone in clones:
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
Expand Down Expand Up @@ -7485,6 +7490,159 @@ def test_subvolume_under_group_snapshot_clone(self):
# verify trash dir is clean
self._wait_for_trash_empty()

def test_subvolume_snapshot_clone_with_no_wait_enabled(self):
subvolume = self._gen_subvol_name()
snapshot = self._gen_subvol_snap_name()
clone1, clone2, clone3 = self._gen_subvol_clone_name(3)

# create subvolume
self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")

# do some IO
self._do_subvolume_io(subvolume, number_of_files=10)

# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)

# Decrease number of cloner threads
self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2)
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
self.assertEqual(max_concurrent_clones, 2)

# Enable the snapshot_clone_no_wait config option
self.config_set('mgr', 'mgr/volumes/snapshot_clone_no_wait', True)
threads_available = self.config_get('mgr', 'mgr/volumes/snapshot_clone_no_wait')
self.assertEqual(threads_available, 'true')

# Insert delay of 15 seconds at the beginning of the snapshot clone
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 15)

# schedule a clone1
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1)

# schedule a clone2
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone2)

# schedule a clone3
cmd_ret = self.run_ceph_cmd(
args=["fs", "subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone3], check_status=False, stdout=StringIO(),
stderr=StringIO())
self.assertEqual(cmd_ret.returncode, errno.EAGAIN, "Expecting EAGAIN error")

# check clone1 status
self._wait_for_clone_to_complete(clone1)

# verify clone1
self._verify_clone(subvolume, snapshot, clone1)

# check clone2 status
self._wait_for_clone_to_complete(clone2)

# verify clone2
self._verify_clone(subvolume, snapshot, clone2)

# schedule clone3 , it should be successful this time
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone3)

# check clone3 status
self._wait_for_clone_to_complete(clone3)

# verify clone3
self._verify_clone(subvolume, snapshot, clone3)

# set number of cloner threads to default
self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 4)
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
self.assertEqual(max_concurrent_clones, 4)

# set the snapshot_clone_delay to default
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 0)

# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)

# remove subvolumes
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
self._fs_cmd("subvolume", "rm", self.volname, clone1)
self._fs_cmd("subvolume", "rm", self.volname, clone2)
self._fs_cmd("subvolume", "rm", self.volname, clone3)

# verify trash dir is clean
self._wait_for_trash_empty()

def test_subvolume_snapshot_clone_with_no_wait_not_enabled(self):
subvolume = self._gen_subvol_name()
snapshot = self._gen_subvol_snap_name()
clone1, clone2, clone3 = self._gen_subvol_clone_name(3)

# create subvolume
self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")

# do some IO
self._do_subvolume_io(subvolume, number_of_files=10)

# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)

# Disable the snapshot_clone_no_wait config option
self.config_set('mgr', 'mgr/volumes/snapshot_clone_no_wait', False)
threads_available = self.config_get('mgr', 'mgr/volumes/snapshot_clone_no_wait')
self.assertEqual(threads_available, 'false')

# Decrease number of cloner threads
self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2)
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
self.assertEqual(max_concurrent_clones, 2)

# schedule a clone1
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1)

# schedule a clone2
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone2)

# schedule a clone3
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone3)

# check clone1 status
self._wait_for_clone_to_complete(clone1)

# verify clone1
self._verify_clone(subvolume, snapshot, clone1)

# check clone2 status
self._wait_for_clone_to_complete(clone2)

# verify clone2
self._verify_clone(subvolume, snapshot, clone2)

# check clone3 status
self._wait_for_clone_to_complete(clone3)

# verify clone3
self._verify_clone(subvolume, snapshot, clone3)

# set the snapshot_clone_no_wait config option to default
self.config_set('mgr', 'mgr/volumes/snapshot_clone_no_wait', True)
threads_available = self.config_get('mgr', 'mgr/volumes/snapshot_clone_no_wait')
self.assertEqual(threads_available, 'true')

# set number of cloner threads to default
self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 4)
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
self.assertEqual(max_concurrent_clones, 4)

# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)

# remove subvolumes
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
self._fs_cmd("subvolume", "rm", self.volname, clone1)
self._fs_cmd("subvolume", "rm", self.volname, clone2)
self._fs_cmd("subvolume", "rm", self.volname, clone3)

# verify trash dir is clean
self._wait_for_trash_empty()


class TestMisc(TestVolumesHelper):
"""Miscellaneous tests related to FS volume, subvolume group, and subvolume operations."""
Expand Down
3 changes: 3 additions & 0 deletions qa/workunits/fs/full/subvolume_clone.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ ceph fs subvolume snapshot create cephfs sub_0 snap_0
# Set clone snapshot delay
ceph config set mgr mgr/volumes/snapshot_clone_delay 15

# Disable the snapshot_clone_no_wait config option
ceph config set mgr mgr/volumes/snapshot_clone_no_wait false

# Schedule few clones, some would fail with no space
for i in $(eval echo {1..$NUM_CLONES});do ceph fs subvolume snapshot clone cephfs sub_0 snap_0 clone_$i;done

Expand Down
6 changes: 5 additions & 1 deletion src/pybind/mgr/volumes/fs/async_cloner.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,10 @@ class Cloner(AsyncJobs):
this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
the driver. file types supported are directories, symbolic links and regular files.
"""
def __init__(self, volume_client, tp_size, snapshot_clone_delay):
def __init__(self, volume_client, tp_size, snapshot_clone_delay, clone_no_wait):
self.vc = volume_client
self.snapshot_clone_delay = snapshot_clone_delay
self.snapshot_clone_no_wait = clone_no_wait
self.state_table = {
SubvolumeStates.STATE_PENDING : handle_clone_pending,
SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress,
Expand All @@ -355,6 +356,9 @@ def reconfigure_max_concurrent_clones(self, tp_size):
def reconfigure_snapshot_clone_delay(self, timeout):
self.snapshot_clone_delay = timeout

def reconfigure_reject_clones(self, clone_no_wait):
self.snapshot_clone_no_wait = clone_no_wait

def is_clone_cancelable(self, clone_state):
return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))

Expand Down
27 changes: 26 additions & 1 deletion src/pybind/mgr/volumes/fs/operations/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
import orchestrator

from .lock import GlobalLock
from ..exception import VolumeException
from ..exception import VolumeException, IndexException
from ..fs_util import create_pool, remove_pool, rename_pool, create_filesystem, \
remove_filesystem, rename_filesystem, create_mds, volume_exists, listdir
from .trash import Trash
from mgr_util import open_filesystem, CephfsConnectionException
from .clone_index import open_clone_index

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -260,6 +261,30 @@ def get_pending_subvol_deletions_count(fs, path):
return {'pending_subvolume_deletions': num_pending_subvol_del}


def get_all_pending_clones_count(self, mgr, vol_spec):
pending_clones_cnt = 0
index_path = ""
fs_map = mgr.get('fs_map')
for fs in fs_map['filesystems']:
volname = fs['mdsmap']['fs_name']
try:
with open_volume(self, volname) as fs_handle:
with open_clone_index(fs_handle, vol_spec) as index:
index_path = index.path.decode('utf-8')
pending_clones_cnt = pending_clones_cnt \
+ len(listdir(fs_handle, index_path,
filter_entries=None, filter_files=False))
except IndexException as e:
if e.errno == -errno.ENOENT:
continue
raise VolumeException(-e.args[0], e.args[1])
except VolumeException as ve:
log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve))
raise ve

return pending_clones_cnt


@contextmanager
def open_volume(vc, volname):
"""
Expand Down
13 changes: 10 additions & 3 deletions src/pybind/mgr/volumes/fs/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
from .operations.group import open_group, create_group, remove_group, \
open_group_unique, set_group_attrs
from .operations.volume import create_volume, delete_volume, rename_volume, \
list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count
list_volumes, open_volume, get_pool_names, get_pool_ids, \
get_pending_subvol_deletions_count, get_all_pending_clones_count
from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
create_clone

from .vol_spec import VolSpec
from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
from .exception import VolumeException, ClusterError, ClusterTimeout, \
EvictionError, IndexException
from .async_cloner import Cloner
from .purge_queue import ThreadPoolPurgeQueueMixin
from .operations.template import SubvolumeOpType
Expand Down Expand Up @@ -53,7 +55,8 @@ def __init__(self, mgr):
super().__init__(mgr)
# volume specification
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay,
self.mgr.snapshot_clone_no_wait)
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
# on startup, queue purge job for available volumes to kickstart
# purge for leftover subvolume entries in trash. note that, if the
Expand Down Expand Up @@ -764,6 +767,10 @@ def clone_subvolume_snapshot(self, **kwargs):
s_groupname = kwargs['group_name']

try:
if self.mgr.snapshot_clone_no_wait and \
get_all_pending_clones_count(self, self.mgr, self.volspec) >= self.mgr.max_concurrent_clones:
raise(VolumeException(-errno.EAGAIN, "all cloner threads are busy, please try again later"))

with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, s_groupname) as s_group:
with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
Expand Down
10 changes: 9 additions & 1 deletion src/pybind/mgr/volumes/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,12 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
'periodic_async_work',
type='bool',
default=False,
desc='Periodically check for async work')
desc='Periodically check for async work'),
Option(
'snapshot_clone_no_wait',
type='bool',
default=True,
desc='Reject subvolume clone request when cloner threads are busy')
]

def __init__(self, *args, **kwargs):
Expand All @@ -498,6 +503,7 @@ def __init__(self, *args, **kwargs):
self.max_concurrent_clones = None
self.snapshot_clone_delay = None
self.periodic_async_work = False
self.snapshot_clone_no_wait = None
self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
# Initialize config option members
Expand Down Expand Up @@ -532,6 +538,8 @@ def config_notify(self):
else:
self.vc.cloner.unset_wakeup_timeout()
self.vc.purge_queue.unset_wakeup_timeout()
elif opt['name'] == "snapshot_clone_no_wait":
self.vc.cloner.reconfigure_reject_clones(self.snapshot_clone_no_wait)

def handle_command(self, inbuf, cmd):
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")
Expand Down

0 comments on commit 1c920af

Please sign in to comment.