Skip to content

Commit

Permalink
[Gitar] Updating Kotlin files
Browse files Browse the repository at this point in the history
  • Loading branch information
Gitar committed Sep 17, 2024
1 parent ee34f44 commit 5ed5a00
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 23 deletions.
18 changes: 9 additions & 9 deletions kotlin/rxjava_to_flow/Streams.kt
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package co.gitar

import io.reactivex.rxjava3.core.Observable

object Streams {
fun stream1(): Observable<String> {
return Observable.just("a", "b", "c", "d", "e", "f")
fun stream1(): Flow<String> {
return flowOf("a", "b", "c", "d", "e", "f")
}

fun stream2(): Observable<String> {
return Observable.fromIterable(listOf("x", "y", "z"))
fun stream2(): Flow<String> {
return flowOf("x", "y", "z")
}

fun stream3(): Observable<String> {
return Observable.fromCallable { call() }
.delay(10, TimeUnit.MILLISECONDS)
fun stream3(): Flow<String> {
return flow {
delay(10)
emit(call())
}
}

private fun call(): String {
Expand Down
31 changes: 17 additions & 14 deletions kotlin/rxjava_to_flow/Test.kt
Original file line number Diff line number Diff line change
@@ -1,55 +1,58 @@
package co.gitar

import co.gitar.Streams
import io.reactivex.rxjava3.core.Observable
import kotlin.test.Test
import org.junit.jupiter.api.Assertions.assertEquals

class Test {

@Test
fun testSimple() {
val observable = Observable.just("Hello World")
val observable = flowOf("Hello World")
val res = mutableListOf<String>()
observable.subscribe { res += it }
runBlocking { observable.collect { res += it } }
assertEquals(listOf("Hello World"), res)
}

@Test
fun testMBasic() {
Streams.stream1().map { it.first().code }
.filter { it > 98 }
.take(3)
.subscribe { println(it) }
runBlocking {
Streams.stream1()
.map { it.first().code }
.filter { it > 98 }
.take(3)
.collect { println(it) }
}
}

@Test
fun testZipWith() {
val res = mutableListOf<String>()
Streams.stream1().zipWith(Streams.stream2(), Streams::concat).subscribe { res += it }
runBlocking {
Streams.stream1().zip(Streams.stream2(), Streams::concat).collect { res += it }
}
assertEquals(listOf("ax", "by", "cz"), res)
}

@Test
fun testStartWith() {
val res = mutableListOf<String>()
Streams.stream1()
.startWith(Streams.stream2())
.subscribe { res += it }
runBlocking {
Streams.stream1().onStart { emitAll(Streams.stream2()) }.collect { res += it }
}
assertEquals(listOf("x", "y", "z", "a", "b", "c", "d", "e", "f"), res)
}

@Test
fun testMergeWith() {
val res = mutableListOf<String>()
Streams.stream1().mergeWith(Streams.stream2()).subscribe { res += it }
runBlocking { merge(Streams.stream1(), Streams.stream2()).collect { res += it } }
assertEquals(listOf("a", "b", "c", "d", "e", "f", "x", "y", "z"), res)
}

@Test
fun testFlatMap() {
val res = mutableListOf<String>()
Streams.stream1().flatMap { Streams.stream2() }.subscribe { res += it }
runBlocking { Streams.stream1().flatMapConcat { Streams.stream2() }.collect { res += it } }
assertEquals(
listOf(
"x",
Expand Down

0 comments on commit 5ed5a00

Please sign in to comment.