Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: allow get_subsystem concurrent to other grpc requests #784

Merged
merged 2 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ SPDK_CENTOS_REPO_VER="9.0-21.el9"

# Ceph Cluster
CEPH_CLUSTER_VERSION="${CEPH_VERSION}"
CEPH_BRANCH=wip-leonidc-20242808-upgrade-fast-reboot-eval-centos9-only
CEPH_SHA=2be7dce1a9496d2a2ca0de1077336a22a2ac2e75
CEPH_BRANCH=main-nvmeof
CEPH_SHA=b59673c44bd569f9f3db37f87bced695dec5fcbf

CEPH_DEVEL_MGR_PATH=../ceph

Expand Down
87 changes: 27 additions & 60 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import os
import errno
import contextlib
import threading
import time
from typing import Callable
from collections import defaultdict
Expand Down Expand Up @@ -150,9 +151,14 @@ class GatewayService(pb2_grpc.GatewayServicer):
gateway_name: Gateway identifier
gateway_state: Methods for target state persistence
spdk_rpc_client: Client of SPDK RPC server
spdk_rpc_subsystems_client: Client of SPDK RPC server for get_subsystems
spdk_rpc_subsystems_lock: Mutex to hold while using get subsystems SPDK client
shared_state_lock: guard mutex for bdev_cluster and cluster_nonce
subsystem_nsid_bdev_and_uuid: map of nsid to bdev
cluster_nonce: cluster context nonce map
"""

def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rpc_lock, omap_lock: OmapLock, group_id: int, spdk_rpc_client, ceph_utils: CephUtils) -> None:
def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rpc_lock, omap_lock: OmapLock, group_id: int, spdk_rpc_client, spdk_rpc_subsystems_client, ceph_utils: CephUtils) -> None:
"""Constructor"""
self.gw_logger_object = GatewayLogger(config)
self.logger = self.gw_logger_object.logger
Expand Down Expand Up @@ -205,6 +211,9 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp
self.omap_lock = omap_lock
self.group_id = group_id
self.spdk_rpc_client = spdk_rpc_client
self.spdk_rpc_subsystems_client = spdk_rpc_subsystems_client
self.spdk_rpc_subsystems_lock = threading.Lock()
self.shared_state_lock = threading.Lock()
self.gateway_name = self.config.get("gateway", "name")
if not self.gateway_name:
self.gateway_name = socket.gethostname()
Expand Down Expand Up @@ -367,8 +376,9 @@ def _alloc_cluster(self, anagrp: int) -> str:
user = self.rados_id,
core_mask = self.librbd_core_mask,
)
self.logger.info(f"Allocated cluster {name=} {nonce=} {anagrp=}")
self.cluster_nonce[name] = nonce
with self.shared_state_lock:
self.logger.info(f"Allocated cluster {name=} {nonce=} {anagrp=}")
self.cluster_nonce[name] = nonce
return name

def _grpc_function_with_lock(self, func, request, context):
Expand Down Expand Up @@ -446,7 +456,8 @@ def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, bl
block_size=block_size,
uuid=uuid,
)
self.bdev_cluster[name] = cluster_name
with self.shared_state_lock:
self.bdev_cluster[name] = cluster_name
self.bdev_params[name] = {'uuid':uuid, 'pool_name':rbd_pool_name, 'image_name':rbd_image_name, 'image_size':rbd_image_size, 'block_size': block_size}

self.logger.debug(f"bdev_rbd_create: {bdev_name}, cluster_name {cluster_name}")
Expand Down Expand Up @@ -540,8 +551,10 @@ def delete_bdev(self, bdev_name, recycling_mode=False, peer_msg=""):
)
if not recycling_mode:
del self.bdev_params[bdev_name]
self.logger.debug(f"to delete_bdev {bdev_name} cluster {self.bdev_cluster[bdev_name]} ")
self._put_cluster(self.bdev_cluster[bdev_name])
with self.shared_state_lock:
cluster = self.bdev_cluster[bdev_name]
self.logger.debug(f"to delete_bdev {bdev_name} cluster {cluster} ")
self._put_cluster(cluster)
self.logger.debug(f"delete_bdev {bdev_name}: {ret}")
except Exception as ex:
errmsg = f"Failure deleting bdev {bdev_name}"
Expand Down Expand Up @@ -2582,7 +2595,7 @@ def get_subsystems_safe(self, request, context):
self.logger.debug(f"Received request to get subsystems, context: {context}{peer_msg}")
subsystems = []
try:
ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client)
ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_subsystems_client)
except Exception as ex:
self.logger.exception(f"get_subsystems failed")
context.set_code(grpc.StatusCode.INTERNAL)
Expand All @@ -2591,71 +2604,25 @@ def get_subsystems_safe(self, request, context):

for s in ret:
try:
nqn = s["nqn"]
ns_key = "namespaces"
ns_list = []
if ns_key in s:
ns_list = s[ns_key]
if not ns_list:
self.subsystem_nsid_bdev_and_uuid.remove_namespace(nqn)
for n in ns_list:
nsid = n["nsid"]
uuid = n["uuid"]
bdev = n["bdev_name"]
nonce = self.cluster_nonce[self.bdev_cluster[bdev]]
n["nonce"] = nonce
for n in s[ns_key]:
bdev = n["bdev_name"]
with self.shared_state_lock:
nonce = self.cluster_nonce[self.bdev_cluster[bdev]]
n["nonce"] = nonce
# Parse the JSON dictionary into the protobuf message
subsystem = pb2.subsystem()
json_format.Parse(json.dumps(s), subsystem, ignore_unknown_fields=True)
psk_hosts = []
saw_psk = False
# if now host nqn is passed, just check if there is any psk host in subsystem
if self.host_info.is_psk_host(nqn):
for h in subsystem.hosts:
psk_val = False
if self.host_info.is_psk_host(nqn, h.nqn):
psk_val = True
saw_psk = True
psk_hosts.append(pb2.host(nqn=h.nqn, use_psk = psk_val))

secure_listeners = []
saw_secure = False
if nqn in self.subsystem_listeners:
for lstnr in subsystem.listen_addresses:
secure_val = False
# We get the address family as IPv4 and the port as a string, we need to adjust to internal form
if (lstnr.adrfam.lower(), lstnr.traddr, int(lstnr.trsvcid), True) in self.subsystem_listeners[nqn]:
saw_secure = True
secure_val = True
secure_listeners.append(pb2.listen_address(trtype=lstnr.trtype,
adrfam=lstnr.adrfam.lower(),
traddr=lstnr.traddr,
trsvcid=lstnr.trsvcid,
transport=lstnr.transport,
secure=secure_val))

if saw_psk or saw_secure:
secure_subsystem = pb2.subsystem(nqn = subsystem.nqn,
subtype = subsystem.subtype,
listen_addresses = secure_listeners,
hosts = psk_hosts,
allow_any_host = subsystem.allow_any_host,
serial_number = subsystem.serial_number,
max_namespaces = subsystem.max_namespaces,
min_cntlid = subsystem.min_cntlid,
max_cntlid = subsystem.max_cntlid,
namespaces = subsystem.namespaces)
subsystems.append(secure_subsystem)
else:
subsystems.append(subsystem)
subsystems.append(subsystem)
except Exception:
self.logger.exception(f"{s=} parse error")
pass

return pb2.subsystems_info(subsystems=subsystems)

def get_subsystems(self, request, context):
with self.rpc_lock:
with self.spdk_rpc_subsystems_lock:
return self.get_subsystems_safe(request, context)

def list_subsystems(self, request, context=None):
Expand Down
10 changes: 9 additions & 1 deletion control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class GatewayServer:
server: gRPC server instance to receive gateway client requests
spdk_rpc_client: Client of SPDK RPC server
spdk_rpc_ping_client: Ping client of SPDK RPC server
spdk_rpc_subsystems_client: subsystems client of SPDK RPC server
spdk_process: Subprocess running SPDK NVMEoF target application
discovery_pid: Subprocess running Ceph nvmeof discovery service
"""
Expand Down Expand Up @@ -174,7 +175,7 @@ def serve(self):
# Register service implementation with server
gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller, f"gateway-{self.name}")
self.omap_lock = OmapLock(omap_state, gateway_state, self.rpc_lock)
self.gateway_rpc = GatewayService(self.config, gateway_state, self.rpc_lock, self.omap_lock, self.group_id, self.spdk_rpc_client, self.ceph_utils)
self.gateway_rpc = GatewayService(self.config, gateway_state, self.rpc_lock, self.omap_lock, self.group_id, self.spdk_rpc_client, self.spdk_rpc_subsystems_client, self.ceph_utils)
self.server = self._grpc_server(self._gateway_address())
pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server)

Expand Down Expand Up @@ -389,6 +390,13 @@ def _start_spdk(self, omap_state):
log_level=protocol_log_level,
conn_retries=conn_retries,
)
self.spdk_rpc_subsystems_client = rpc_client.JSONRPCClient(
self.spdk_rpc_socket_path,
None,
timeout,
log_level=protocol_log_level,
conn_retries=conn_retries,
)
except Exception:
self.logger.exception(f"Unable to initialize SPDK")
raise
Expand Down
Loading