Skip to content

Commit

Permalink
Moving out scrub tests to a separate file
Browse files Browse the repository at this point in the history
Signed-off-by: Aishwarya Mathuria <[email protected]>
  • Loading branch information
amathuria committed Sep 6, 2022
1 parent 9b2e21a commit 1a2c63c
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 213 deletions.
214 changes: 1 addition & 213 deletions cluster/ceph.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import json

from .cluster import Cluster
from .scrub_tests import ScrubbingTestThreadBackground, ScrubRecoveryThreadBackground


logger = logging.getLogger("cbt")
Expand Down Expand Up @@ -1292,216 +1293,3 @@ def run(self):
self.states[self.state]()
common.pdsh(settings.getnodes('head'), self.logcmd('Exiting recovery test thread. Last state was: %s' % self.state)).communicate()


class ScrubbingTestThreadBackground(threading.Thread):
def __init__(self, config, cluster, callback, stoprequest, haltrequest, startiorequest):
threading.Thread.__init__(self)
self.config = config
self.cluster = cluster
self.callback = callback
self.state = 'pre'
self.states = {'pre': self.pre, 'osdout': self.osdout, 'osdin':self.osdin,
'post': self.post, 'done': self.done}
self.startiorequest = startiorequest
self.stoprequest = stoprequest
self.haltrequest = haltrequest
self.outhealthtries = 0
self.inhealthtries = 0
self.maxhealthtries = 60
self.health_checklist = ["peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery"]
self.ceph_cmd = self.cluster.ceph_cmd
self.lasttime = time.time()

def logcmd(self, message):
return 'echo "[`date`] %s" >> %s/scrubbing.log' % (message, self.config.get('run_dir'))

def pre(self):
pre_time = self.config.get("pre_time", 60)
common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrubbing Test Thread, waiting %s seconds.' % pre_time)).communicate()
time.sleep(pre_time)
self.state = 'osdout'

def osdout(self):
scrub_log = "%s/scrub.log" % self.config.get('run_dir')
scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir')
ret = self.cluster.check_health(self.health_checklist, None, None)

common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate()

self.cluster.maybe_populate_scrubbing_pool()
common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating scrubbing pool.")).communicate()
time.sleep(10)
self.lasttime = time.time()
self.state = "osdin"

def osdin(self):
scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir')
self.startiorequest.set()
self.cluster.initiate_scrubbing()
ret = self.cluster.check_scrub(scrub_stats_log)
if ret == 1:
self.state = "post"

def post(self):
if self.stoprequest.isSet():
common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but stoprequest is set, finishing now.')).communicate()
self.haltrequest.set()
return

if self.config.get("repeat", False):
# reset counters
self.outhealthtries = 0
self.inhealthtries = 0

common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but repeat is set. Moving to "osdout" state.')).communicate()
self.state = "osdout"
return

common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, finishing up...')).communicate()
self.state = "done"

def done(self):
common.pdsh(settings.getnodes('head'), self.logcmd("Done. Calling parent callback function.")).communicate()
self.callback()
self.haltrequest.set()

def join(self, timeout=None):
common.pdsh(settings.getnodes('head'), self.logcmd('Received notification that parent is finished and waiting.')).communicate()
super(ScrubbingTestThreadBackground, self).join(timeout)

def run(self):
self.haltrequest.clear()
self.stoprequest.clear()
self.startiorequest.clear()
while not self.haltrequest.isSet():
self.states[self.state]()
common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrubbing test thread. Last state was: %s' % self.state)).communicate()


class ScrubRecoveryThreadBackground(threading.Thread):
def __init__(self, config, cluster, callback, stoprequest, haltrequest, startiorequest):
threading.Thread.__init__(self)
self.config = config
self.cluster = cluster
self.callback = callback
self.state = 'pre'
self.states = {'pre': self.pre, 'markdown': self.markdown, 'osdout': self.osdout, 'osdin':self.osdin,
'post': self.post, 'done': self.done}
self.startiorequest = startiorequest
self.stoprequest = stoprequest
self.haltrequest = haltrequest
self.outhealthtries = 0
self.inhealthtries = 0
self.maxhealthtries = 60
self.health_checklist = ["peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery"]
self.ceph_cmd = self.cluster.ceph_cmd
self.lasttime = time.time()

def logcmd(self, message):
return 'echo "[`date`] %s" >> %s/scrub_recov.log' % (message, self.config.get('run_dir'))

def pre(self):
pre_time = self.config.get("pre_time", 60)
common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrub+Recovery Test Thread, waiting %s seconds.' % pre_time)).communicate()
time.sleep(pre_time)
lcmd = self.logcmd("Setting the ceph osd noup flag")
common.pdsh(settings.getnodes('head'), '%s -c %s osd set noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate()
self.state = 'markdown'

def markdown(self):
for osdnum in self.config.get('osds'):
lcmd = self.logcmd("Marking OSD %s down." % osdnum)
common.pdsh(settings.getnodes('head'), '%s -c %s osd down %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate()
lcmd = self.logcmd("Marking OSD %s out." % osdnum)
common.pdsh(settings.getnodes('head'), '%s -c %s osd out %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate()
common.pdsh(settings.getnodes('head'), self.logcmd('Waiting for the cluster to break and heal')).communicate()
self.lasttime = time.time()
self.state = 'osdout'


def osdout(self):
reclog = "%s/recovery.log" % self.config.get('run_dir')
recstatslog = "%s/recovery_stats.log" % self.config.get('run_dir')
ret = self.cluster.check_health(self.health_checklist, reclog, recstatslog)

common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate()

if ret == 0:
common.pdsh(settings.getnodes('head'), self.logcmd('Cluster never went unhealthy.')).communicate()
else:
common.pdsh(settings.getnodes('head'), self.logcmd('Cluster appears to have healed.')).communicate()
rectime = str(time.time() - self.lasttime)
common.pdsh(settings.getnodes('head'), 'echo Time: %s >> %s' % (rectime, recstatslog)).communicate()
common.pdsh(settings.getnodes('head'), self.logcmd('Time: %s' % rectime)).communicate()

# Populate the recovery pool
self.cluster.maybe_populate_recovery_pool()

common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating recovery pool.")).communicate()
time.sleep(10)
lcmd = self.logcmd("Unsetting the ceph osd noup flag")
self.cluster.disable_recovery()
common.pdsh(settings.getnodes('head'), '%s -c %s osd unset noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate()
for osdnum in self.config.get('osds'):
lcmd = self.logcmd("Marking OSD %s up." % osdnum)
common.pdsh(settings.getnodes('head'), '%s -c %s osd up %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate()
lcmd = self.logcmd("Marking OSD %s in." % osdnum)
common.pdsh(settings.getnodes('head'), '%s -c %s osd in %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate()
self.lasttime = time.time()
# Populate the scrub pool
logger.info("Sleep before scrub populate")
time.sleep(10)
self.cluster.maybe_populate_scrubbing_pool()
self.state = "osdin"


def osdin(self):
#Start scrub
self.startiorequest.set()
self.cluster.initiate_scrubbing()
self.cluster.enable_recovery()
recstatslog = "%s/recovery_backfill_stats.log" % self.config.get('run_dir')
scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir')
backfill = threading.Thread(target=self.cluster.check_backfill, args=(self.health_checklist, "%s/recovery.log" % self.config.get('run_dir'), recstatslog,))
scrub_check = threading.Thread(target=self.cluster.check_scrub, args=(scrub_stats_log,))
backfill.start()
scrub_check.start()
backfill.join()
scrub_check.join()
self.state = "post"


def post(self):
if self.stoprequest.isSet():
common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but stoprequest is set, finishing now.')).communicate()
self.haltrequest.set()
return

if self.config.get("repeat", False):
# reset counters
self.outhealthtries = 0
self.inhealthtries = 0

common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but repeat is set. Moving to "markdown" state.')).communicate()
self.state = "markdown"
return

common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, finishing up...')).communicate()
self.state = "done"

def done(self):
common.pdsh(settings.getnodes('head'), self.logcmd("Done. Calling parent callback function.")).communicate()
self.callback()
self.haltrequest.set()

def join(self, timeout=None):
common.pdsh(settings.getnodes('head'), self.logcmd('Received notification that parent is finished and waiting.')).communicate()
super(ScrubRecoveryThreadBackground, self).join(timeout)

def run(self):
self.haltrequest.clear()
self.stoprequest.clear()
self.startiorequest.clear()
while not self.haltrequest.isSet():
self.states[self.state]()
common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrub+recovery test thread. Last state was: %s' % self.state)).communicate()
Loading

0 comments on commit 1a2c63c

Please sign in to comment.