Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis configuration provider #215

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
merged main, reported changes, fixed conflicts
apemaia99 committed May 26, 2024

Unverified

This user has not yet uploaded their public signing key.
commit 5cea74d50ebd90d44dd4b94c43ed7df7c2634679
2 changes: 1 addition & 1 deletion .github/workflows/api-docs.yml
Original file line number Diff line number Diff line change
@@ -11,4 +11,4 @@ jobs:
with:
package_name: redis
modules: Redis
pathsToInvalidate: /redis
pathsToInvalidate: /redis/*
58 changes: 35 additions & 23 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -17,24 +17,47 @@ jobs:
api-breakage:
if: ${{ !(github.event.pull_request.draft || false) }}
runs-on: ubuntu-latest
container: swift:5.8-jammy
container: swift:jammy
steps:
- name: Check out code
uses: actions/checkout@v3
uses: actions/checkout@v4
with: { 'fetch-depth': 0 }
- name: Run API breakage check action
uses: vapor/ci/.github/actions/ci-swift-check-api-breakage@reusable-workflows
- name: Run API breakage check
run: |
git config --global --add safe.directory "${GITHUB_WORKSPACE}"
swift package diagnose-api-breaking-changes origin/main
# gh-codeql:
# if: ${{ !(github.event.pull_request.draft || false) }}
# runs-on: ubuntu-latest
# permissions: { actions: write, contents: read, security-events: write }
# timeout-minutes: 30
# steps:
# - name: Install latest Swift toolchain
# uses: vapor/[email protected]
# with: { toolchain: latest }
# - name: Check out code
# uses: actions/checkout@v4
# - name: Fix Git configuration
# run: 'git config --global --add safe.directory "${GITHUB_WORKSPACE}"'
# - name: Initialize CodeQL
# uses: github/codeql-action/init@v3
# with: { languages: swift }
# - name: Perform build
# run: swift build
# - name: Run CodeQL analyze
# uses: github/codeql-action/analyze@v3

linux-unit:
if: ${{ !(github.event.pull_request.draft || false) }}
strategy:
fail-fast: false
matrix:
container:
- swift:5.6-focal
- swift:5.7-jammy
- swift:5.8-jammy
- swiftlang/swift:nightly-5.9-jammy
- swift:5.8-focal
- swift:5.9-jammy
- swift:5.10-jammy
- swiftlang/swift:nightly-6.0-jammy
- swiftlang/swift:nightly-main-jammy
redis:
- redis:6
@@ -47,22 +70,11 @@ jobs:
redis-2:
image: ${{ matrix.redis }}
steps:
- name: Save Redis version to env
run: |
echo REDIS_VERSION='${{ matrix.redis }}' >> $GITHUB_ENV
- name: Display versions
shell: bash
run: |
if [[ '${{ contains(matrix.container, 'nightly') }}' == 'true' ]]; then
SWIFT_PLATFORM="$(source /etc/os-release && echo "${ID}${VERSION_ID}")" SWIFT_VERSION="$(cat /.swift_tag)"
printf 'SWIFT_PLATFORM=%s\nSWIFT_VERSION=%s\n' "${SWIFT_PLATFORM}" "${SWIFT_VERSION}" >>"${GITHUB_ENV}"
fi
printf 'OS: %s\nTag: %s\nVersion:\n' "${SWIFT_PLATFORM}-${RUNNER_ARCH}" "${SWIFT_VERSION}" && swift --version
- name: Check out package
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Run unit tests with Thread Sanitizer and coverage
run: swift test --sanitize=thread --enable-code-coverage
- name: Submit coverage report to Codecov.io
uses: vapor/swift-codecov-action@v0.2
- name: Upload coverage data
uses: vapor/swift-codecov-action@v0.3
with:
cc_env_vars: 'SWIFT_VERSION,SWIFT_PLATFORM,RUNNER_OS,RUNNER_ARCH,REDIS_VERSION'
codecov_token: ${{ secrets.CODECOV_TOKEN }}
4 changes: 2 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:5.6
// swift-tools-version:5.8
import PackageDescription

let package = Package(
@@ -15,7 +15,7 @@ let package = Package(
],
dependencies: [
.package(url: "https://github.com/swift-server/RediStack.git", from: "1.4.1"),
.package(url: "https://github.com/vapor/vapor.git", from: "4.77.1"),
.package(url: "https://github.com/vapor/vapor.git", from: "4.100.0"),
],
targets: [
.target(
46 changes: 46 additions & 0 deletions [email protected]
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// swift-tools-version:5.9
import PackageDescription

let package = Package(
name: "redis",
platforms: [
.macOS(.v10_15),
.iOS(.v13),
.tvOS(.v13),
.watchOS(.v6),
],
products: [
.library(name: "Redis", targets: ["Redis"]),
.library(name: "XCTRedis", targets: ["XCTRedis"]),
],
dependencies: [
.package(url: "https://github.com/swift-server/RediStack.git", from: "1.4.1"),
.package(url: "https://github.com/vapor/vapor.git", from: "4.100.0"),
],
targets: [
.target(
name: "Redis",
dependencies: [
.product(name: "RediStack", package: "RediStack"),
.product(name: "Vapor", package: "vapor"),
],
swiftSettings: [.enableExperimentalFeature("StrictConcurrency=complete")]
),
.target(
name: "XCTRedis",
dependencies: [
.target(name: "Redis"),
],
swiftSettings: [.enableExperimentalFeature("StrictConcurrency=complete")]
),
.testTarget(
name: "RedisTests",
dependencies: [
.target(name: "Redis"),
.target(name: "XCTRedis"),
.product(name: "XCTVapor", package: "vapor"),
],
swiftSettings: [.enableExperimentalFeature("StrictConcurrency=complete")]
),
]
)
4 changes: 2 additions & 2 deletions Sources/Redis/Application.Redis+PubSub.swift
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import Vapor
import RediStack
@preconcurrency import RediStack

extension Application.Redis {
private struct PubSubKey: StorageKey, LockKey {
typealias Value = [RedisID: RedisClient]
typealias Value = [RedisID: RedisClient & Sendable]
}

var pubsubClient: RedisClient {
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ import NIOCore
import NIOSSL

/// A protocol which indicates the ability to create a ``RedisClient``
public protocol RedisFactory {
public protocol RedisFactory: Sendable {
/// Configuration on which ``RedisFactory/makeClient(for:logger:)`` is based
var configuration: RedisConfiguration { get }

Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import Foundation

public struct RedisConfigurationFactory {
public struct RedisConfigurationFactory: Sendable {
typealias ValidationError = RedisConfiguration.ValidationError

public let make: () -> RedisFactory
public let make: @Sendable () -> RedisFactory

public init(make: @escaping () -> RedisFactory) {
public init(make: @escaping @Sendable () -> RedisFactory) {
self.make = make
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import Foundation
@preconcurrency import RediStack

extension RedisConfiguration {
public struct PoolOptions {
public struct PoolOptions: Sendable {
public let maximumConnectionCount: RedisConnectionPoolSize
public let minimumConnectionCount: Int
public let connectionBackoffFactor: Float32
public let initialConnectionBackoffDelay: TimeAmount
public let connectionRetryTimeout: TimeAmount?
public let onUnexpectedConnectionClose: ((RedisConnection) -> Void)?
public let onUnexpectedConnectionClose: (@Sendable (RedisConnection) -> Void)?

public init(
maximumConnectionCount: RedisConnectionPoolSize = .maximumActiveConnections(2),
minimumConnectionCount: Int = 0,
connectionBackoffFactor: Float32 = 2,
initialConnectionBackoffDelay: TimeAmount = .milliseconds(100),
connectionRetryTimeout: TimeAmount? = nil,
onUnexpectedConnectionClose: ((RedisConnection) -> Void)? = nil
onUnexpectedConnectionClose: (@Sendable (RedisConnection) -> Void)? = nil
) {
self.maximumConnectionCount = maximumConnectionCount
self.minimumConnectionCount = minimumConnectionCount
3 changes: 1 addition & 2 deletions Sources/Redis/Configuration/RedisConfiguration.swift
Original file line number Diff line number Diff line change
@@ -2,10 +2,9 @@ import Foundation
import Logging
import NIOCore
import NIOSSL
import RediStack

/// Configuration for connecting to a Redis instance
public struct RedisConfiguration {
public struct RedisConfiguration: Sendable {
public typealias ValidationError = RedisConnection.Configuration.ValidationError

public let serverAddresses: [SocketAddress]
24 changes: 17 additions & 7 deletions Sources/Redis/Redis+Cache.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Vapor
import Foundation
import RediStack
@preconcurrency import RediStack
import NIOCore

// MARK: RedisCacheCoder
@@ -40,6 +40,11 @@ extension Application.Caches {

/// A cache configured for a given Redis ID and using the provided encoder and decoder.
public func redis<E: RedisCacheEncoder, D: RedisCacheDecoder>(_ id: RedisID = .default, encoder: E, decoder: D) -> Cache {
RedisCache(encoder: FakeSendable(value: encoder), decoder: FakeSendable(value: decoder), client: self.application.redis(id))
}

/// A cache configured for a given Redis ID and using the provided encoder and decoder wrapped as FakeSendable.
func redis(_ id: RedisID = .default, encoder: FakeSendable<some RedisCacheEncoder>, decoder: FakeSendable<some RedisCacheDecoder>) -> Cache {
RedisCache(encoder: encoder, decoder: decoder, client: self.application.redis(id))
}
}
@@ -59,20 +64,25 @@ extension Application.Caches.Provider {

/// Configures the application cache to use the given Redis ID and the provided encoder and decoder.
public static func redis<E: RedisCacheEncoder, D: RedisCacheDecoder>(_ id: RedisID = .default, encoder: E, decoder: D) -> Self {
.init { $0.caches.use { $0.caches.redis(id, encoder: encoder, decoder: decoder) } }
let wrappedEncoder = FakeSendable(value: encoder)
let wrappedDecoder = FakeSendable(value: decoder)
return .init { $0.caches.use { $0.caches.redis(id, encoder: wrappedEncoder, decoder: wrappedDecoder) } }
}
}

// MARK: - Redis cache driver

/// A wrapper to silence `Sendable` warnings for `JSONDecoder` and `JSONEncoder` when not on macOS.
struct FakeSendable<T>: @unchecked Sendable { let value: T }

/// `Cache` driver for storing cache data in Redis, using a provided encoder and decoder to serialize and deserialize values respectively.
private struct RedisCache<CacheEncoder: RedisCacheEncoder, CacheDecoder: RedisCacheDecoder>: Cache {
let encoder: CacheEncoder
let decoder: CacheDecoder
private struct RedisCache<CacheEncoder: RedisCacheEncoder, CacheDecoder: RedisCacheDecoder>: Cache, Sendable {
let encoder: FakeSendable<CacheEncoder>
let decoder: FakeSendable<CacheDecoder>
let client: RedisClient

func get<T: Decodable>(_ key: String, as type: T.Type) -> EventLoopFuture<T?> {
self.client.get(RedisKey(key), as: CacheDecoder.Input.self).optionalFlatMapThrowing { try self.decoder.decode(T.self, from: $0) }
self.client.get(RedisKey(key), as: CacheDecoder.Input.self).optionalFlatMapThrowing { try self.decoder.value.decode(T.self, from: $0) }
}

func set<T: Encodable>(_ key: String, to value: T?, expiresIn expirationTime: CacheExpirationTime?) -> EventLoopFuture<Void> {
@@ -81,7 +91,7 @@ private struct RedisCache<CacheEncoder: RedisCacheEncoder, CacheDecoder: RedisCa
}

return self.client.eventLoop
.tryFuture { try self.encoder.encode(value) }
.tryFuture { try self.encoder.value.encode(value) }
.flatMap {
if let expirationTime = expirationTime {
return self.client.setex(RedisKey(key), to: $0, expirationInSeconds: expirationTime.seconds)
2 changes: 1 addition & 1 deletion Sources/Redis/Redis+Sessions.swift
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ import RediStack
import NIOCore

/// A delegate object that controls key behavior of an `Application.Redis.Sessions` driver.
public protocol RedisSessionsDelegate {
public protocol RedisSessionsDelegate: Sendable {
/// Makes a new session ID token.
/// - Note: This method is optional to implement.
///
3 changes: 2 additions & 1 deletion Sources/Redis/RedisID.swift
Original file line number Diff line number Diff line change
@@ -16,7 +16,8 @@ public struct RedisID: Hashable,
ExpressibleByStringLiteral,
ExpressibleByStringInterpolation,
CustomStringConvertible,
Comparable {
Comparable,
Sendable {

public let rawValue: String

15 changes: 14 additions & 1 deletion Sources/Redis/RedisStorage/RedisStorage+Lifecycle.swift
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ extension RedisStorage {
/// configurated `RedisID` on each `EventLoop`.
final class Lifecycle: LifecycleHandler {
unowned let redisStorage: RedisStorage

init(redisStorage: RedisStorage) {
self.redisStorage = redisStorage
}
@@ -16,7 +17,19 @@ extension RedisStorage {

/// Close each connection pool
func shutdown(_ application: Application) {
redisStorage.shutdown(application: application)
do {
try redisStorage.shutdown(application: application).wait()
} catch {
application.logger.error("Error shutting down redis connection pools, possibly because the pool never connected to the Redis server: \(error)")
}
}

func shutdownAsync(_ application: Application) async {
do {
try await redisStorage.shutdown(application: application).get()
} catch {
application.logger.error("Error shutting down redis connection pools, possibly because the pool never connected to the Redis server: \(error)")
}
}
}
}
82 changes: 39 additions & 43 deletions Sources/Redis/RedisStorage/RedisStorage.swift
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
import NIOConcurrencyHelpers
@preconcurrency import RediStack
import Vapor

final class RedisStorage {
private let lock: NIOLock

private var configurations: [RedisID: RedisConfigurationFactory]
private var pools: [PoolKey: RedisClient] {
willSet {
guard pools.isEmpty else {
fatalError("Modifying connection pools after application has booted is not supported")
final class RedisStorage: Sendable {
fileprivate struct StorageBox: Sendable {
var configurations: [RedisID: RedisConfigurationFactory]
var pools: [PoolKey: RedisClient] {
willSet {
guard pools.isEmpty else {
fatalError("Modifying connection pools after application has booted is not supported")
}
}
}
}

private let box: NIOLockedValueBox<StorageBox>

init() {
configurations = [:]
pools = [:]
lock = .init()
box = .init(.init(configurations: [:], pools: [:]))
}

func use(_ configuration: RedisConfigurationFactory, as id: RedisID) {
configurations[id] = configuration
box.withLockedValue { $0.configurations[id] = configuration }
}

func pool(for eventLoop: EventLoop, id redisID: RedisID) -> RedisClient {
let key = PoolKey(eventLoopKey: eventLoop.key, redisID: redisID)
guard let pool = pools[key] else {
guard let pool = box.withLockedValue({ $0.pools[key] }) else {
fatalError("No redis found for id \(redisID), or the app may not have finished booting. Also, the eventLoop must be from Application's EventLoopGroup.")
}
return pool
@@ -34,41 +35,36 @@ final class RedisStorage {

extension RedisStorage {
func bootstrap(application: Application) {
lock.lock()
defer { lock.unlock() }
pools = configurations.reduce(into: [PoolKey: RedisClient]()) { pools, instance in
let (id, configuration) = instance
box.withLockedValue {
$0.pools = $0.configurations.reduce(into: [PoolKey: RedisClient]()) { pools, instance in
let (id, configuration) = instance

application
.eventLoopGroup
.makeIterator()
.forEach { eventLoop in
let newKey: PoolKey = .init(eventLoopKey: eventLoop.key, redisID: id)
let newPool: RedisClient = configuration
.make()
.makeClient(for: eventLoop, logger: application.logger)
application
.eventLoopGroup
.makeIterator()
.forEach { eventLoop in
let newKey: PoolKey = .init(eventLoopKey: eventLoop.key, redisID: id)
let newPool: RedisClient = configuration
.make()
.makeClient(for: eventLoop, logger: application.logger)

pools[newKey] = newPool
}
pools[newKey] = newPool
}
}
}
}

func shutdown(application: Application) {
lock.lock()
defer { lock.unlock() }

let shutdownFuture: EventLoopFuture<Void> = pools.values.compactMap { pool in
guard let pool = pool as? RedisConnectionPool else { return nil }

let promise = pool.eventLoop.makePromise(of: Void.self)
pool.close(promise: promise)
return promise.futureResult
}.flatten(on: application.eventLoopGroup.next())

do {
try shutdownFuture.wait()
} catch {
application.logger.error("Error shutting down redis connection pools, possibly because the pool never connected to the Redis server: \(error)")
func shutdown(application: Application) -> EventLoopFuture<Void> {
box.withLockedValue {
let shutdownFuture: EventLoopFuture<Void> = $0.pools.values.compactMap { pool in
guard let pool = pool as? RedisConnectionPool else { return nil }

let promise = pool.eventLoop.makePromise(of: Void.self)
pool.close(promise: promise)
return promise.futureResult
}.flatten(on: application.eventLoopGroup.next())

return shutdownFuture
}
}
}
66 changes: 45 additions & 21 deletions Sources/XCTRedis/ArrayTestRedisClient.swift
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import NIOConcurrencyHelpers
import Redis
import Vapor
import XCTest

// Common base for stubbing responses since at runtime we will have as many instances as many event loops.
public class ArrayTestRedisClient {
public final class ArrayTestRedisClient: Sendable {
public typealias Item = Result<RESPValue, Error>

private(set) var results: [Item] = []
private(set) var publishers: [String: RedisSubscriptionMessageReceiver] = [:]
private(set) var unSubscriptions: [String: RedisSubscriptionChangeHandler] = [:]
fileprivate struct StorageBox: @unchecked Sendable {
var results: [Item] = []
var publishers: [String: RedisSubscriptionMessageReceiver] = [:]
var unSubscriptions: [String: RedisSubscriptionChangeHandler] = [:]
}

private let box: NIOLockedValueBox<StorageBox> = .init(.init())

public init() {}

@@ -18,24 +23,32 @@ public class ArrayTestRedisClient {
}

deinit {
XCTAssert(results.isEmpty)
box.withLockedValue {
XCTAssert($0.results.isEmpty)
}
}

public func prepare(with item: Result<RESPValue, Error>) {
results.append(item)
box.withLockedValue {
$0.results.append(item)
}
}

public func prepare(error: Error?) {
switch error {
case let .some(value):
results.append(.failure(value))
case .none:
results.append(.success(.null))
box.withLockedValue {
switch error {
case let .some(value):
$0.results.append(.failure(value))
case .none:
$0.results.append(.success(.null))
}
}
}

var next: Item {
results.isEmpty ? .failure(TestError.outOfResponses) : results.removeFirst()
box.withLockedValue {
$0.results.isEmpty ? .failure(TestError.outOfResponses) : $0.results.removeFirst()
}
}

func subscribe(
@@ -44,27 +57,38 @@ public class ArrayTestRedisClient {
subHandler: RedisSubscriptionChangeHandler?,
unSubHandler: RedisSubscriptionChangeHandler?
) {
for value in values {
publishers[value] = publisher
unSubscriptions[value] = unSubHandler
subHandler?(value, 1)
box.withLockedValue {
for value in values {
$0.publishers[value] = publisher
$0.unSubscriptions[value] = unSubHandler
}
}
values.forEach({ subHandler?($0, 1) })
}

func unsubscribe(
matching values: [String]
) {
for value in values {
publishers[value] = nil
unSubscriptions[value]?(value, 0)
unSubscriptions[value] = nil
var unSubscriptions: [String: RedisSubscriptionChangeHandler] = [:]

box.withLockedValue {
for value in values {
$0.publishers[value] = nil
if let unSubscription = $0.unSubscriptions[value] {
unSubscriptions[value] = unSubscription
}
$0.unSubscriptions[value] = nil
}
}
unSubscriptions.forEach({ $0.value($0.key, 0) })
}

func yield(with arguments: [RESPValue]) {
let channel = arguments[0].string!
let message = arguments[1]

publishers[channel]?(.init(channel), message)
box.withLockedValue {
$0.publishers[channel]
}?(.init(channel), message)
}
}
2 changes: 1 addition & 1 deletion Tests/RedisTests/RedisTests.swift
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ import Redis
import Vapor
import Logging
import XCTVapor
import RediStack
@preconcurrency import RediStack
import XCTest

extension String {
You are viewing a condensed version of this merge commit. You can view the full changes here.