Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kategory-effects-rx2 #233

Merged
merged 26 commits into from
Sep 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kategory-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ dependencies {
}

apply from: rootProject.file('gradle/gradle-mvn-push.gradle')
apply from: 'generated-kotlin-sources.gradle'
apply from: rootProject.file('gradle/generated-kotlin-sources.gradle')
apply plugin: 'kotlin-kapt'
2 changes: 1 addition & 1 deletion kategory-core/src/main/kotlin/kategory/data/Try.kt
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ sealed class Try<out A> : TryKind<A> {
* If `fb` is initially applied and throws an exception,
* then `fa` is applied with this exception.
*/
fun <B> fold(fa: (Throwable) -> B, fb: (A) -> B): B =
inline fun <B> fold(fa: (Throwable) -> B, fb: (A) -> B): B =
when (this) {
is Failure -> fa(exception)
is Success -> try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ open class ComonadContinuation<F, A : Any>(val CM: Comonad<F>, override val cont
* A coroutine is initiated and inside `MonadContinuation` suspended yielding to `flatMap` once all the flatMap binds are completed
* the underlying monad is returned from the act of executing the coroutine
*/
fun <F, B : Any> Comonad<F>.cobinding(coroutineContext: CoroutineContext = EmptyCoroutineContext, c: suspend ComonadContinuation<F, *>.() -> B): B {
fun <F, B : Any> Comonad<F>.cobinding(c: suspend ComonadContinuation<F, *>.() -> B): B {
val continuation = ComonadContinuation<F, B>(this)
c.startCoroutine(continuation, continuation)
return continuation.returnedMonad
Expand Down
8 changes: 4 additions & 4 deletions kategory-core/src/main/kotlin/kategory/typeclasses/Monad.kt
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ open class MonadContinuation<F, A>(M: Monad<F>, override val context: CoroutineC
* A coroutine is initiated and inside `MonadContinuation` suspended yielding to `flatMap` once all the flatMap binds are completed
* the underlying monad is returned from the act of executing the coroutine
*/
fun <F, B> Monad<F>.binding(coroutineContext: CoroutineContext = EmptyCoroutineContext, c: suspend MonadContinuation<F, *>.() -> HK<F, B>): HK<F, B> {
val continuation = MonadContinuation<F, B>(this, coroutineContext)
fun <F, B> Monad<F>.binding(c: suspend MonadContinuation<F, *>.() -> HK<F, B>): HK<F, B> {
val continuation = MonadContinuation<F, B>(this)
c.startCoroutine(continuation, continuation)
return continuation.returnedMonad()
}
Expand Down Expand Up @@ -126,9 +126,9 @@ open class StackSafeMonadContinuation<F, A>(M: Monad<F>, override val context: C
* This combinator ultimately returns computations lifting to Free to automatically for comprehend in a stack-safe way
* over any stack-unsafe monads
*/
fun <F, B> Monad<F>.bindingStackSafe(coroutineContext: CoroutineContext = EmptyCoroutineContext, c: suspend StackSafeMonadContinuation<F, *>.() -> Free<F, B>):
fun <F, B> Monad<F>.bindingStackSafe(c: suspend StackSafeMonadContinuation<F, *>.() -> Free<F, B>):
Free<F, B> {
val continuation = StackSafeMonadContinuation<F, B>(this, coroutineContext)
val continuation = StackSafeMonadContinuation<F, B>(this)
c.startCoroutine(continuation, continuation)
return continuation.returnedMonad()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kategory

import java.io.Serializable
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.EmptyCoroutineContext
import kotlin.coroutines.experimental.RestrictsSuspension
import kotlin.coroutines.experimental.startCoroutine

Expand All @@ -20,7 +22,8 @@ inline fun <reified F, A, reified E> HK<F, A>.ensure(
noinline predicate: (A) -> Boolean): HK<F, A> = FT.ensure(this, error, predicate)

@RestrictsSuspension
class MonadErrorContinuation<F, A>(val ME: MonadError<F, Throwable>) : Serializable, MonadContinuation<F, A>(ME) {
class MonadErrorContinuation<F, A>(val ME: MonadError<F, Throwable>, override val context: CoroutineContext = EmptyCoroutineContext) :
Serializable, MonadContinuation<F, A>(ME) {

override fun resumeWithException(exception: Throwable) {
returnedMonad = ME.raiseError(exception)
Expand Down
1 change: 0 additions & 1 deletion kategory-core/src/test/kotlin/kategory/data/IdTest.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kategory

import io.kotlintest.KTestJUnitRunner
import io.kotlintest.matchers.shouldBe
import org.junit.runner.RunWith

@RunWith(KTestJUnitRunner::class)
Expand Down
18 changes: 18 additions & 0 deletions kategory-effects-rx2/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
dependencies {
compile project(':kategory-core')
compile project(':kategory-effects')
compile "org.jetbrains.kotlin:kotlin-stdlib-jre7:$kotlinVersion"
compile project(':kategory-annotations')
kapt project(':kategory-annotations-processor')
kaptTest project(':kategory-annotations-processor')
compileOnly project(':kategory-annotations-processor')
testCompileOnly project(':kategory-annotations-processor')
testCompile "io.kotlintest:kotlintest:$kotlinTestVersion"
testCompile project(':kategory-effects-test')

compile "io.reactivex.rxjava2:rxjava:2.1.3"
}

apply from: rootProject.file('gradle/gradle-mvn-push.gradle')
apply from: rootProject.file('gradle/generated-kotlin-sources.gradle')
apply plugin: 'kotlin-kapt'
4 changes: 4 additions & 0 deletions kategory-effects-rx2/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Maven publishing configuration
POM_NAME=Kategory-Effects-Rx2
POM_ARTIFACT_ID=kategory-effects-rx2
POM_PACKAGING=jar
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package kategory

import io.reactivex.Observable
import io.reactivex.ObservableEmitter

fun <A> Observable<A>.k(): ObservableW<A> = ObservableW(this)

fun <A> ObservableWKind<A>.value(): Observable<A> =
this.ev().observable

@higherkind
@deriving(Functor::class, Applicative::class, AsyncContext::class)
data class ObservableW<A>(val observable: Observable<A>) : ObservableWKind<A> {
fun <B> map(f: (A) -> B): ObservableW<B> =
observable.map(f).k()

fun <B> ap(fa: ObservableWKind<(A) -> B>): ObservableW<B> =
flatMap { a -> fa.ev().map { ff -> ff(a) } }

fun <B> flatMap(f: (A) -> ObservableW<B>): ObservableW<B> =
observable.flatMap { f(it).observable }.k()

fun <B> concatMap(f: (A) -> ObservableW<B>): ObservableW<B> =
observable.concatMap { f(it).observable }.k()

fun <B> switchMap(f: (A) -> ObservableW<B>): ObservableW<B> =
observable.switchMap { f(it).observable }.k()

companion object {
fun <A> pure(a: A): ObservableW<A> =
Observable.just(a).k()

fun <A> raiseError(t: Throwable): ObservableW<A> =
Observable.error<A>(t).k()

fun <A> runAsync(fa: Proc<A>): ObservableW<A> =
Observable.create { emitter: ObservableEmitter<A> ->
fa { either: Either<Throwable, A> ->
either.fold({
emitter.onError(it)
}, {
emitter.onNext(it)
emitter.onComplete()
})

}
}.k()

fun monadFlat(): ObservableWFlatMonadInstance = ObservableWFlatMonadInstanceImplicits.instance()

fun monadConcat(): ObservableWConcatMonadInstance = ObservableWConcatMonadInstanceImplicits.instance()

fun monadSwitch(): ObservableWSwitchMonadInstance = ObservableWSwitchMonadInstanceImplicits.instance()

fun monadErrorFlat(): ObservableWFlatMonadErrorInstance = ObservableWFlatMonadErrorInstanceImplicits.instance()

fun monadErrorConcat(): ObservableWConcatMonadErrorInstance = ObservableWConcatMonadErrorInstanceImplicits.instance()

fun monadErrorSwitch(): ObservableWSwitchMonadErrorInstance = ObservableWSwitchMonadErrorInstanceImplicits.instance()
}
}

fun <A> ObservableW<A>.handleErrorWith(function: (Throwable) -> ObservableW<A>): ObservableW<A> =
this.observable.onErrorResumeNext { t: Throwable -> function(t).observable }.k()
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package kategory

object ObservableWMonadInstanceImplicits {
@JvmStatic
fun instance(): ObservableWFlatMonadInstance = ObservableWFlatMonadInstanceImplicits.instance()
}

object ObservableWMonadErrorInstanceImplicits {
@JvmStatic
fun instance(): ObservableWFlatMonadErrorInstance = ObservableWFlatMonadErrorInstanceImplicits.instance()
}

interface ObservableWFlatMonadInstance :
ObservableWApplicativeInstance,
Monad<ObservableWHK> {
override fun <A, B> ap(fa: ObservableWKind<A>, ff: ObservableWKind<(A) -> B>): ObservableW<B> =
fa.ev().ap(ff)

override fun <A, B> flatMap(fa: ObservableWKind<A>, f: (A) -> ObservableWKind<B>): ObservableWKind<B> =
fa.ev().flatMap { f(it).ev() }

override fun <A, B> tailRecM(a: A, f: (A) -> ObservableWKind<Either<A, B>>): ObservableWKind<B> =
f(a).ev().flatMap {
it.fold({ tailRecM(a, f).ev() }, { pure(it).ev() })
}
}

object ObservableWFlatMonadInstanceImplicits {
@JvmStatic
fun instance(): ObservableWFlatMonadInstance = object : ObservableWFlatMonadInstance {}
}

interface ObservableWFlatMonadErrorInstance :
ObservableWFlatMonadInstance,
MonadError<ObservableWHK, Throwable> {
override fun <A> raiseError(e: Throwable): ObservableW<A> =
ObservableW.raiseError(e)

override fun <A> handleErrorWith(fa: ObservableWKind<A>, f: (Throwable) -> ObservableWKind<A>): ObservableW<A> =
fa.ev().handleErrorWith { f(it).ev() }
}

object ObservableWFlatMonadErrorInstanceImplicits {
@JvmStatic
fun instance(): ObservableWFlatMonadErrorInstance = object : ObservableWFlatMonadErrorInstance {}
}

interface ObservableWConcatMonadInstance :
ObservableWApplicativeInstance,
Monad<ObservableWHK> {
override fun <A, B> ap(fa: ObservableWKind<A>, ff: ObservableWKind<(A) -> B>): ObservableW<B> =
fa.ev().ap(ff)

override fun <A, B> flatMap(fa: ObservableWKind<A>, f: (A) -> ObservableWKind<B>): ObservableW<B> =
fa.ev().concatMap { f(it).ev() }

override fun <A, B> tailRecM(a: A, f: (A) -> ObservableWKind<Either<A, B>>): ObservableW<B> =
f(a).ev().concatMap {
it.fold({ tailRecM(a, f).ev() }, { pure(it).ev() })
}
}

object ObservableWConcatMonadInstanceImplicits {
@JvmStatic
fun instance(): ObservableWConcatMonadInstance = object : ObservableWConcatMonadInstance {}
}

interface ObservableWConcatMonadErrorInstance :
ObservableWConcatMonadInstance,
MonadError<ObservableWHK, Throwable> {
override fun <A> raiseError(e: Throwable): ObservableW<A> =
ObservableW.raiseError(e)

override fun <A> handleErrorWith(fa: ObservableWKind<A>, f: (Throwable) -> ObservableWKind<A>): ObservableW<A> =
fa.ev().handleErrorWith { f(it).ev() }
}

object ObservableWConcatMonadErrorInstanceImplicits {
@JvmStatic
fun instance(): ObservableWConcatMonadErrorInstance = object : ObservableWConcatMonadErrorInstance {}
}

interface ObservableWSwitchMonadInstance :
ObservableWApplicativeInstance,
Monad<ObservableWHK> {
override fun <A, B> ap(fa: ObservableWKind<A>, ff: ObservableWKind<(A) -> B>): ObservableW<B> =
fa.ev().ap(ff)

override fun <A, B> flatMap(fa: ObservableWKind<A>, f: (A) -> ObservableWKind<B>): ObservableW<B> =
fa.ev().switchMap { f(it).ev() }

override fun <A, B> tailRecM(a: A, f: (A) -> ObservableWKind<Either<A, B>>): ObservableW<B> =
f(a).ev().switchMap {
it.fold({ tailRecM(a, f).ev() }, { pure(it).ev() })
}
}

object ObservableWSwitchMonadInstanceImplicits {
@JvmStatic
fun instance(): ObservableWSwitchMonadInstance = object : ObservableWSwitchMonadInstance {}
}

interface ObservableWSwitchMonadErrorInstance :
ObservableWSwitchMonadInstance,
MonadError<ObservableWHK, Throwable> {

override fun <A> raiseError(e: Throwable): ObservableW<A> =
ObservableW.raiseError(e)

override fun <A> handleErrorWith(fa: ObservableWKind<A>, f: (Throwable) -> ObservableWKind<A>): ObservableW<A> =
fa.ev().handleErrorWith { f(it).ev() }
}

object ObservableWSwitchMonadErrorInstanceImplicits {
@JvmStatic
fun instance(): ObservableWSwitchMonadErrorInstance = object : ObservableWSwitchMonadErrorInstance {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package kategory.effects.data

import io.kotlintest.KTestJUnitRunner
import io.kotlintest.matchers.shouldNotBe
import io.reactivex.Observable
import io.reactivex.observers.TestObserver
import io.reactivex.schedulers.Schedulers
import kategory.*
import org.junit.runner.RunWith
import java.util.concurrent.TimeUnit

@RunWith(KTestJUnitRunner::class)
class ObservableWTest : UnitSpec() {

fun <T> EQ(): Eq<ObservableWKind<T>> = object : Eq<ObservableWKind<T>> {
override fun eqv(a: ObservableWKind<T>, b: ObservableWKind<T>): Boolean =
try {
a.value().blockingFirst() == b.value().blockingFirst()
} catch (throwable: Throwable) {
val errA = try {
a.value().blockingFirst()
throw IllegalArgumentException()
} catch (err: Throwable) {
err
}

val errB = try {
b.value().blockingFirst()
throw IllegalStateException()
} catch (err: Throwable) {
err
}

errA == errB
}
}

init {

"instances can be resolved implicitly" {
functor<ObservableWHK>() shouldNotBe null
applicative<ObservableWHK>() shouldNotBe null
monad<ObservableWHK>() shouldNotBe null
monadError<ObservableWHK, Unit>() shouldNotBe null
asyncContext<ObservableWHK>() shouldNotBe null
}

testLaws(AsyncLaws.laws(ObservableW.asyncContext(), ObservableW.monadErrorFlat(), EQ(), EQ()))
testLaws(AsyncLaws.laws(ObservableW.asyncContext(), ObservableW.monadErrorConcat(), EQ(), EQ()))
testLaws(AsyncLaws.laws(ObservableW.asyncContext(), ObservableW.monadErrorSwitch(), EQ(), EQ()))

"Multi-thread Observables finish correctly" {
val value: Observable<Long> = ObservableW.monadErrorFlat().bindingE {
val a = Observable.timer(2, TimeUnit.SECONDS).k().bind()
yields(a)
}.value()

val test: TestObserver<Long> = value.test()
test.awaitDone(5, TimeUnit.SECONDS)
test.assertTerminated().assertComplete().assertNoErrors().assertValue(0)
}

"Multi-thread Observables should run on their required threads" {
val originalThread: Thread = Thread.currentThread()
var nextThread: Thread? = null
val value: Observable<Long> = ObservableW.monadErrorFlat().bindingE {
val a = Observable.timer(2, TimeUnit.SECONDS).k().bind()
nextThread = Thread.currentThread()
val b = Observable.just(a).observeOn(Schedulers.io()).k().bind()
yields(b)
}.value()
val test: TestObserver<Long> = value.test()
val lastThread: Thread = test.awaitDone(5, TimeUnit.SECONDS).lastThread()
nextThread shouldNotBe originalThread
lastThread shouldNotBe originalThread
lastThread shouldNotBe nextThread
}

"Observable cancellation forces binding to cancel without completing too" {
val value: Observable<Long> = ObservableW.monadErrorFlat().bindingE {
val a = Observable.timer(3, TimeUnit.SECONDS).k().bind()
yields(a)
}.value()
val test: TestObserver<Long> = value.doOnSubscribe { subscription -> Observable.timer(1, TimeUnit.SECONDS).subscribe { subscription.dispose() } }.test()
test.awaitTerminalEvent(10, TimeUnit.SECONDS)

test.assertNotTerminated().assertNotComplete().assertNoErrors().assertNoValues()
}
}
}
2 changes: 1 addition & 1 deletion kategory-effects/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ dependencies {
}

apply from: rootProject.file('gradle/gradle-mvn-push.gradle')
apply from: 'generated-kotlin-sources.gradle'
apply from: rootProject.file('gradle/generated-kotlin-sources.gradle')
apply plugin: 'kotlin-kapt'
14 changes: 0 additions & 14 deletions kategory-effects/generated-kotlin-sources.gradle

This file was deleted.

Loading