Skip to content

Commit c5d26d6

Browse files
authored
Merge pull request #233 from kategory/paco-effectsrx2
Add kategory-effects-rx2
2 parents e3256e1 + e129aae commit c5d26d6

File tree

19 files changed

+308
-54
lines changed

19 files changed

+308
-54
lines changed

kategory-core/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ dependencies {
1010
}
1111

1212
apply from: rootProject.file('gradle/gradle-mvn-push.gradle')
13-
apply from: 'generated-kotlin-sources.gradle'
13+
apply from: rootProject.file('gradle/generated-kotlin-sources.gradle')
1414
apply plugin: 'kotlin-kapt'

kategory-core/src/main/kotlin/kategory/data/Try.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ sealed class Try<out A> : TryKind<A> {
218218
* If `fb` is initially applied and throws an exception,
219219
* then `fa` is applied with this exception.
220220
*/
221-
fun <B> fold(fa: (Throwable) -> B, fb: (A) -> B): B =
221+
inline fun <B> fold(fa: (Throwable) -> B, fb: (A) -> B): B =
222222
when (this) {
223223
is Failure -> fa(exception)
224224
is Success -> try {

kategory-core/src/main/kotlin/kategory/typeclasses/Comonad.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ open class ComonadContinuation<F, A : Any>(val CM: Comonad<F>, override val cont
5454
* A coroutine is initiated and inside `MonadContinuation` suspended yielding to `flatMap` once all the flatMap binds are completed
5555
* the underlying monad is returned from the act of executing the coroutine
5656
*/
57-
fun <F, B : Any> Comonad<F>.cobinding(coroutineContext: CoroutineContext = EmptyCoroutineContext, c: suspend ComonadContinuation<F, *>.() -> B): B {
57+
fun <F, B : Any> Comonad<F>.cobinding(c: suspend ComonadContinuation<F, *>.() -> B): B {
5858
val continuation = ComonadContinuation<F, B>(this)
5959
c.startCoroutine(continuation, continuation)
6060
return continuation.returnedMonad

kategory-core/src/main/kotlin/kategory/typeclasses/Monad.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ open class MonadContinuation<F, A>(M: Monad<F>, override val context: CoroutineC
7676
* A coroutine is initiated and inside `MonadContinuation` suspended yielding to `flatMap` once all the flatMap binds are completed
7777
* the underlying monad is returned from the act of executing the coroutine
7878
*/
79-
fun <F, B> Monad<F>.binding(coroutineContext: CoroutineContext = EmptyCoroutineContext, c: suspend MonadContinuation<F, *>.() -> HK<F, B>): HK<F, B> {
80-
val continuation = MonadContinuation<F, B>(this, coroutineContext)
79+
fun <F, B> Monad<F>.binding(c: suspend MonadContinuation<F, *>.() -> HK<F, B>): HK<F, B> {
80+
val continuation = MonadContinuation<F, B>(this)
8181
c.startCoroutine(continuation, continuation)
8282
return continuation.returnedMonad()
8383
}
@@ -126,9 +126,9 @@ open class StackSafeMonadContinuation<F, A>(M: Monad<F>, override val context: C
126126
* This combinator ultimately returns computations lifting to Free to automatically for comprehend in a stack-safe way
127127
* over any stack-unsafe monads
128128
*/
129-
fun <F, B> Monad<F>.bindingStackSafe(coroutineContext: CoroutineContext = EmptyCoroutineContext, c: suspend StackSafeMonadContinuation<F, *>.() -> Free<F, B>):
129+
fun <F, B> Monad<F>.bindingStackSafe(c: suspend StackSafeMonadContinuation<F, *>.() -> Free<F, B>):
130130
Free<F, B> {
131-
val continuation = StackSafeMonadContinuation<F, B>(this, coroutineContext)
131+
val continuation = StackSafeMonadContinuation<F, B>(this)
132132
c.startCoroutine(continuation, continuation)
133133
return continuation.returnedMonad()
134134
}

kategory-core/src/main/kotlin/kategory/typeclasses/MonadError.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package kategory
22

33
import java.io.Serializable
4+
import kotlin.coroutines.experimental.CoroutineContext
5+
import kotlin.coroutines.experimental.EmptyCoroutineContext
46
import kotlin.coroutines.experimental.RestrictsSuspension
57
import kotlin.coroutines.experimental.startCoroutine
68

@@ -20,7 +22,8 @@ inline fun <reified F, A, reified E> HK<F, A>.ensure(
2022
noinline predicate: (A) -> Boolean): HK<F, A> = FT.ensure(this, error, predicate)
2123

2224
@RestrictsSuspension
23-
class MonadErrorContinuation<F, A>(val ME: MonadError<F, Throwable>) : Serializable, MonadContinuation<F, A>(ME) {
25+
class MonadErrorContinuation<F, A>(val ME: MonadError<F, Throwable>, override val context: CoroutineContext = EmptyCoroutineContext) :
26+
Serializable, MonadContinuation<F, A>(ME) {
2427

2528
override fun resumeWithException(exception: Throwable) {
2629
returnedMonad = ME.raiseError(exception)

kategory-core/src/test/kotlin/kategory/data/IdTest.kt

-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package kategory
22

33
import io.kotlintest.KTestJUnitRunner
4-
import io.kotlintest.matchers.shouldBe
54
import org.junit.runner.RunWith
65

76
@RunWith(KTestJUnitRunner::class)

kategory-effects-rx2/build.gradle

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
dependencies {
2+
compile project(':kategory-core')
3+
compile project(':kategory-effects')
4+
compile "org.jetbrains.kotlin:kotlin-stdlib-jre7:$kotlinVersion"
5+
compile project(':kategory-annotations')
6+
kapt project(':kategory-annotations-processor')
7+
kaptTest project(':kategory-annotations-processor')
8+
compileOnly project(':kategory-annotations-processor')
9+
testCompileOnly project(':kategory-annotations-processor')
10+
testCompile "io.kotlintest:kotlintest:$kotlinTestVersion"
11+
testCompile project(':kategory-effects-test')
12+
13+
compile "io.reactivex.rxjava2:rxjava:2.1.3"
14+
}
15+
16+
apply from: rootProject.file('gradle/gradle-mvn-push.gradle')
17+
apply from: rootProject.file('gradle/generated-kotlin-sources.gradle')
18+
apply plugin: 'kotlin-kapt'
+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Maven publishing configuration
2+
POM_NAME=Kategory-Effects-Rx2
3+
POM_ARTIFACT_ID=kategory-effects-rx2
4+
POM_PACKAGING=jar
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package kategory
2+
3+
import io.reactivex.Observable
4+
import io.reactivex.ObservableEmitter
5+
6+
fun <A> Observable<A>.k(): ObservableW<A> = ObservableW(this)
7+
8+
fun <A> ObservableWKind<A>.value(): Observable<A> =
9+
this.ev().observable
10+
11+
@higherkind
12+
@deriving(Functor::class, Applicative::class, AsyncContext::class)
13+
data class ObservableW<A>(val observable: Observable<A>) : ObservableWKind<A> {
14+
fun <B> map(f: (A) -> B): ObservableW<B> =
15+
observable.map(f).k()
16+
17+
fun <B> ap(fa: ObservableWKind<(A) -> B>): ObservableW<B> =
18+
flatMap { a -> fa.ev().map { ff -> ff(a) } }
19+
20+
fun <B> flatMap(f: (A) -> ObservableW<B>): ObservableW<B> =
21+
observable.flatMap { f(it).observable }.k()
22+
23+
fun <B> concatMap(f: (A) -> ObservableW<B>): ObservableW<B> =
24+
observable.concatMap { f(it).observable }.k()
25+
26+
fun <B> switchMap(f: (A) -> ObservableW<B>): ObservableW<B> =
27+
observable.switchMap { f(it).observable }.k()
28+
29+
companion object {
30+
fun <A> pure(a: A): ObservableW<A> =
31+
Observable.just(a).k()
32+
33+
fun <A> raiseError(t: Throwable): ObservableW<A> =
34+
Observable.error<A>(t).k()
35+
36+
fun <A> runAsync(fa: Proc<A>): ObservableW<A> =
37+
Observable.create { emitter: ObservableEmitter<A> ->
38+
fa { either: Either<Throwable, A> ->
39+
either.fold({
40+
emitter.onError(it)
41+
}, {
42+
emitter.onNext(it)
43+
emitter.onComplete()
44+
})
45+
46+
}
47+
}.k()
48+
49+
fun monadFlat(): ObservableWFlatMonadInstance = ObservableWFlatMonadInstanceImplicits.instance()
50+
51+
fun monadConcat(): ObservableWConcatMonadInstance = ObservableWConcatMonadInstanceImplicits.instance()
52+
53+
fun monadSwitch(): ObservableWSwitchMonadInstance = ObservableWSwitchMonadInstanceImplicits.instance()
54+
55+
fun monadErrorFlat(): ObservableWFlatMonadErrorInstance = ObservableWFlatMonadErrorInstanceImplicits.instance()
56+
57+
fun monadErrorConcat(): ObservableWConcatMonadErrorInstance = ObservableWConcatMonadErrorInstanceImplicits.instance()
58+
59+
fun monadErrorSwitch(): ObservableWSwitchMonadErrorInstance = ObservableWSwitchMonadErrorInstanceImplicits.instance()
60+
}
61+
}
62+
63+
fun <A> ObservableW<A>.handleErrorWith(function: (Throwable) -> ObservableW<A>): ObservableW<A> =
64+
this.observable.onErrorResumeNext { t: Throwable -> function(t).observable }.k()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package kategory
2+
3+
object ObservableWMonadInstanceImplicits {
4+
@JvmStatic
5+
fun instance(): ObservableWFlatMonadInstance = ObservableWFlatMonadInstanceImplicits.instance()
6+
}
7+
8+
object ObservableWMonadErrorInstanceImplicits {
9+
@JvmStatic
10+
fun instance(): ObservableWFlatMonadErrorInstance = ObservableWFlatMonadErrorInstanceImplicits.instance()
11+
}
12+
13+
interface ObservableWFlatMonadInstance :
14+
ObservableWApplicativeInstance,
15+
Monad<ObservableWHK> {
16+
override fun <A, B> ap(fa: ObservableWKind<A>, ff: ObservableWKind<(A) -> B>): ObservableW<B> =
17+
fa.ev().ap(ff)
18+
19+
override fun <A, B> flatMap(fa: ObservableWKind<A>, f: (A) -> ObservableWKind<B>): ObservableWKind<B> =
20+
fa.ev().flatMap { f(it).ev() }
21+
22+
override fun <A, B> tailRecM(a: A, f: (A) -> ObservableWKind<Either<A, B>>): ObservableWKind<B> =
23+
f(a).ev().flatMap {
24+
it.fold({ tailRecM(a, f).ev() }, { pure(it).ev() })
25+
}
26+
}
27+
28+
object ObservableWFlatMonadInstanceImplicits {
29+
@JvmStatic
30+
fun instance(): ObservableWFlatMonadInstance = object : ObservableWFlatMonadInstance {}
31+
}
32+
33+
interface ObservableWFlatMonadErrorInstance :
34+
ObservableWFlatMonadInstance,
35+
MonadError<ObservableWHK, Throwable> {
36+
override fun <A> raiseError(e: Throwable): ObservableW<A> =
37+
ObservableW.raiseError(e)
38+
39+
override fun <A> handleErrorWith(fa: ObservableWKind<A>, f: (Throwable) -> ObservableWKind<A>): ObservableW<A> =
40+
fa.ev().handleErrorWith { f(it).ev() }
41+
}
42+
43+
object ObservableWFlatMonadErrorInstanceImplicits {
44+
@JvmStatic
45+
fun instance(): ObservableWFlatMonadErrorInstance = object : ObservableWFlatMonadErrorInstance {}
46+
}
47+
48+
interface ObservableWConcatMonadInstance :
49+
ObservableWApplicativeInstance,
50+
Monad<ObservableWHK> {
51+
override fun <A, B> ap(fa: ObservableWKind<A>, ff: ObservableWKind<(A) -> B>): ObservableW<B> =
52+
fa.ev().ap(ff)
53+
54+
override fun <A, B> flatMap(fa: ObservableWKind<A>, f: (A) -> ObservableWKind<B>): ObservableW<B> =
55+
fa.ev().concatMap { f(it).ev() }
56+
57+
override fun <A, B> tailRecM(a: A, f: (A) -> ObservableWKind<Either<A, B>>): ObservableW<B> =
58+
f(a).ev().concatMap {
59+
it.fold({ tailRecM(a, f).ev() }, { pure(it).ev() })
60+
}
61+
}
62+
63+
object ObservableWConcatMonadInstanceImplicits {
64+
@JvmStatic
65+
fun instance(): ObservableWConcatMonadInstance = object : ObservableWConcatMonadInstance {}
66+
}
67+
68+
interface ObservableWConcatMonadErrorInstance :
69+
ObservableWConcatMonadInstance,
70+
MonadError<ObservableWHK, Throwable> {
71+
override fun <A> raiseError(e: Throwable): ObservableW<A> =
72+
ObservableW.raiseError(e)
73+
74+
override fun <A> handleErrorWith(fa: ObservableWKind<A>, f: (Throwable) -> ObservableWKind<A>): ObservableW<A> =
75+
fa.ev().handleErrorWith { f(it).ev() }
76+
}
77+
78+
object ObservableWConcatMonadErrorInstanceImplicits {
79+
@JvmStatic
80+
fun instance(): ObservableWConcatMonadErrorInstance = object : ObservableWConcatMonadErrorInstance {}
81+
}
82+
83+
interface ObservableWSwitchMonadInstance :
84+
ObservableWApplicativeInstance,
85+
Monad<ObservableWHK> {
86+
override fun <A, B> ap(fa: ObservableWKind<A>, ff: ObservableWKind<(A) -> B>): ObservableW<B> =
87+
fa.ev().ap(ff)
88+
89+
override fun <A, B> flatMap(fa: ObservableWKind<A>, f: (A) -> ObservableWKind<B>): ObservableW<B> =
90+
fa.ev().switchMap { f(it).ev() }
91+
92+
override fun <A, B> tailRecM(a: A, f: (A) -> ObservableWKind<Either<A, B>>): ObservableW<B> =
93+
f(a).ev().switchMap {
94+
it.fold({ tailRecM(a, f).ev() }, { pure(it).ev() })
95+
}
96+
}
97+
98+
object ObservableWSwitchMonadInstanceImplicits {
99+
@JvmStatic
100+
fun instance(): ObservableWSwitchMonadInstance = object : ObservableWSwitchMonadInstance {}
101+
}
102+
103+
interface ObservableWSwitchMonadErrorInstance :
104+
ObservableWSwitchMonadInstance,
105+
MonadError<ObservableWHK, Throwable> {
106+
107+
override fun <A> raiseError(e: Throwable): ObservableW<A> =
108+
ObservableW.raiseError(e)
109+
110+
override fun <A> handleErrorWith(fa: ObservableWKind<A>, f: (Throwable) -> ObservableWKind<A>): ObservableW<A> =
111+
fa.ev().handleErrorWith { f(it).ev() }
112+
}
113+
114+
object ObservableWSwitchMonadErrorInstanceImplicits {
115+
@JvmStatic
116+
fun instance(): ObservableWSwitchMonadErrorInstance = object : ObservableWSwitchMonadErrorInstance {}
117+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package kategory.effects.data
2+
3+
import io.kotlintest.KTestJUnitRunner
4+
import io.kotlintest.matchers.shouldNotBe
5+
import io.reactivex.Observable
6+
import io.reactivex.observers.TestObserver
7+
import io.reactivex.schedulers.Schedulers
8+
import kategory.*
9+
import org.junit.runner.RunWith
10+
import java.util.concurrent.TimeUnit
11+
12+
@RunWith(KTestJUnitRunner::class)
13+
class ObservableWTest : UnitSpec() {
14+
15+
fun <T> EQ(): Eq<ObservableWKind<T>> = object : Eq<ObservableWKind<T>> {
16+
override fun eqv(a: ObservableWKind<T>, b: ObservableWKind<T>): Boolean =
17+
try {
18+
a.value().blockingFirst() == b.value().blockingFirst()
19+
} catch (throwable: Throwable) {
20+
val errA = try {
21+
a.value().blockingFirst()
22+
throw IllegalArgumentException()
23+
} catch (err: Throwable) {
24+
err
25+
}
26+
27+
val errB = try {
28+
b.value().blockingFirst()
29+
throw IllegalStateException()
30+
} catch (err: Throwable) {
31+
err
32+
}
33+
34+
errA == errB
35+
}
36+
}
37+
38+
init {
39+
40+
"instances can be resolved implicitly" {
41+
functor<ObservableWHK>() shouldNotBe null
42+
applicative<ObservableWHK>() shouldNotBe null
43+
monad<ObservableWHK>() shouldNotBe null
44+
monadError<ObservableWHK, Unit>() shouldNotBe null
45+
asyncContext<ObservableWHK>() shouldNotBe null
46+
}
47+
48+
testLaws(AsyncLaws.laws(ObservableW.asyncContext(), ObservableW.monadErrorFlat(), EQ(), EQ()))
49+
testLaws(AsyncLaws.laws(ObservableW.asyncContext(), ObservableW.monadErrorConcat(), EQ(), EQ()))
50+
testLaws(AsyncLaws.laws(ObservableW.asyncContext(), ObservableW.monadErrorSwitch(), EQ(), EQ()))
51+
52+
"Multi-thread Observables finish correctly" {
53+
val value: Observable<Long> = ObservableW.monadErrorFlat().bindingE {
54+
val a = Observable.timer(2, TimeUnit.SECONDS).k().bind()
55+
yields(a)
56+
}.value()
57+
58+
val test: TestObserver<Long> = value.test()
59+
test.awaitDone(5, TimeUnit.SECONDS)
60+
test.assertTerminated().assertComplete().assertNoErrors().assertValue(0)
61+
}
62+
63+
"Multi-thread Observables should run on their required threads" {
64+
val originalThread: Thread = Thread.currentThread()
65+
var nextThread: Thread? = null
66+
val value: Observable<Long> = ObservableW.monadErrorFlat().bindingE {
67+
val a = Observable.timer(2, TimeUnit.SECONDS).k().bind()
68+
nextThread = Thread.currentThread()
69+
val b = Observable.just(a).observeOn(Schedulers.io()).k().bind()
70+
yields(b)
71+
}.value()
72+
val test: TestObserver<Long> = value.test()
73+
val lastThread: Thread = test.awaitDone(5, TimeUnit.SECONDS).lastThread()
74+
nextThread shouldNotBe originalThread
75+
lastThread shouldNotBe originalThread
76+
lastThread shouldNotBe nextThread
77+
}
78+
79+
"Observable cancellation forces binding to cancel without completing too" {
80+
val value: Observable<Long> = ObservableW.monadErrorFlat().bindingE {
81+
val a = Observable.timer(3, TimeUnit.SECONDS).k().bind()
82+
yields(a)
83+
}.value()
84+
val test: TestObserver<Long> = value.doOnSubscribe { subscription -> Observable.timer(1, TimeUnit.SECONDS).subscribe { subscription.dispose() } }.test()
85+
test.awaitTerminalEvent(10, TimeUnit.SECONDS)
86+
87+
test.assertNotTerminated().assertNotComplete().assertNoErrors().assertNoValues()
88+
}
89+
}
90+
}

kategory-effects/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ dependencies {
1111
}
1212

1313
apply from: rootProject.file('gradle/gradle-mvn-push.gradle')
14-
apply from: 'generated-kotlin-sources.gradle'
14+
apply from: rootProject.file('gradle/generated-kotlin-sources.gradle')
1515
apply plugin: 'kotlin-kapt'

kategory-effects/generated-kotlin-sources.gradle

-14
This file was deleted.

0 commit comments

Comments
 (0)