This a learning project.
This implementation isn't performant nor optimal.
Some operations throw StackOverflowError
exceptions with big streams (ex: size, forEach).
To use the XStream
stream api you should call methods defined by the XStreams
object.
object XStreams {
/**
* Returns an empty stream
*
* @param T
* @return
*/
def empty[T]: XFiniteStream[T]
/**
* Returns a finite stream that has one element
*
* @param elem
* @tparam T
* @return
*/
def once[T](elem: T): XFiniteStream[T]
/**
* Returns an infinite stream that always returns the same element
*
* @param elem
* @tparam T
* @return
*/
def fixed[T](elem: T): XStream[T]
/**
* Returns an infinite stream that construct the next element from the last one using the op function
*
* @param elem
* @param op
* @tparam T
* @return
*/
def iterate[T](elem: T, op: T => T): XStream[T]
/**
* Returns an infinite stream that construct the element using the supplier
*
* @param supplier
* @tparam T
* @return
*/
def generate[T](supplier: () => T): XStream[T]
/**
* Returns an infinite stream built from a finite sequence using a circular index
*
* @param elems
* @tparam T
* @return
*/
def circular[T](elems: Seq[T]): XStream[T]
/**
* Returns a finite stream backed by the supplied sequence
*
* @param items
* @tparam T
* @return
*/
def finite[T](items: Seq[T]): XFiniteStream[T]
/** Returns a stream backed by the given iterator.
*
* @param iterator
* @tparam T
* @return
*/
def fromIterator[T](iterator: Iterator[T]): XStream[T]
}
/**
* XStream public API
*
* @tparam T
*/
trait XStream[T] {
/**
* Returns a new finite stream having at most nbr elements
*
* @param nbr
* @return
*/
def take(nbr: Int): XFiniteStream[T]
/**
* Returns a new stream having element that satisfy the predicate
*
* @param predicate
* @return
*/
def takeWhile(predicate: T => Boolean): XStream[T]
/**
* Returns a new stream with the nbr elements of the original stream skipped
*
* @param nbr
* @return
*/
def skip(nbr: Int): XStream[T]
/**
* Returns a new stream starting from the first element that don't match the predicate
*
* @param predicate
* @return
*/
def skipWhile(predicate: T => Boolean): XStream[T]
/**
* Returns a new stream with only elements that satisfy the given predicate
*
* @param predicate
* @return
*/
def filter(predicate: T => Boolean): XStream[T]
/**
* Returns a new stream where each element is transformed version of the original one (applying the given mapping function)
*
* @param mapping
* @tparam B
* @return
*/
def map[B](mapping: T => B): XStream[B]
/** Returns a new stream that merges all the sub stream into one
*
* @param asIterableOne
* @tparam B
* @return
*/
def flatten[B](implicit asIterableOne: T => IterableOnce[B]): XStream[B]
/** Returns a new stream that merges all the sub stream into one, after
* applying the mapping function to each one of the elements
*
* @param mapping
* @tparam B
* @return
*/
def flatMap[B, C](mapping: T => B)(implicit asIterableOne: B => IterableOnce[C]): XStream[C]
/**
* Returns a new stream that concatenate this stream with another one
*
* @param other
* @return
*/
def concat(other: => XStream[T]): XStream[T]
/**
* Returns a new stream where each element is a tuple of the elements of this stream and another.
* The length of this new stream is the length of the smallest one.
*
* @param other
* @tparam B
* @return
*/
def zip[B](other: XStream[B]): XStream[(T, B)]
/**
* Returns a new stream where each element is a finite stream of windowSize elements from this element
*
* @param windowSize
* @return
*/
def window(windowSize: Int): XStream[XFiniteStream[T]]
/**
* Returns a new stream that execute the given consumer on each element
*
* @param consumer
* @return
*/
def peek(consumer: T => Unit): XStream[T]
}
/**
* Trait that defines operation that are safe to execute only (has also a meaning) on a finite stream
* <p>NB : A finite stream extends the API of the infinite stream.</p>
*
* @tparam T
*/
trait FiniteXStream[T] extends XStream[T] {
/**
* Left reducer
*
* @param initial
* @param combinator
* @tparam B
* @return
*/
def foldLeft[B](initial: B, combinator: (B, T) => B): B
/**
* Right reducer
* @param initial
* @param combinator
* @tparam B
* @return
*/
def foldRight[B](initial: B, combinator: (B, T) => B): B
/**
* Collect stream data into a bag (List, Set, ...)
*
* @param bag
* @param collector
* @tparam B
* @return
*/
def collect[B[_]](bag: B[T], collector: (B[T], T) => B[T]): B[T]
/**
* List implementation of the collect
*
* @return
*/
def toList: List[T]
/**
* Returns a Map of the grouped data
*
* @param keyGenerator
* @param initial
* @param combinator
* @tparam K
* @tparam B
* @return
*/
def groupBy[K, B](keyGenerator: T => K, initial: B, combinator: (B, T) => B): Map[K, B]
/**
* List implementation of the groupBy
*
* @param keyGenerator
* @tparam K
* @return
*/
def groupBy[K](keyGenerator: T => K): Map[K, List[T]]
/**
* Execute a given consumer over the elements of the stream
*
* @param consumer
*/
def forEach(consumer: T => Unit): Unit
/**
* Returns an iterator over the elements of the stream
*
* @return
*/
def iterator: Iterator[T]
/**
* Returns the element count of this stream
*
* @return
*/
def size: Int
/**
* Returns the biggest element of this stream using the given comparator.
* <ul>
* <li>None if the stream is empty.</li>
* <li>The biggest even there is duplicates.</li>
* </ul>
*
* @param comparator
* @return
*/
def max(comparator: Comparator[T]): Option[T]
/**
* Returns the smallest element of this stream using the given comparator.
* <ul>
* <li>None if the stream is empty.</li>
* <li>The smallest even there is duplicates.</li>
* </ul>
*
* @param comparator
* @return
*/
def min(comparator: Comparator[T]): Option[T]
/**
* Returns a new finite stream that traverse elements in the reverse order
*
* @return
*/
def reversed: FiniteXStream[T]
/**
* Returns a new finite stream that concatenate this stream with another one.
*
*<p>This overload of the concat method, force the return type to be FiniteXStream if the other stream is also finite</p>
*
* @param other
* @return
*/
def concat(other: FiniteXStream[T]): FiniteXStream[T]
/**
* Returns True if all the elements of this stream matches the given predicate. Otherwise it returns False
* <p>It doesn't traverse all the stream in all cases. It stops on the first element that don't match the predicate</p>
*
* @param predicate
* @return
*/
def matchAll(predicate: T => Boolean): Boolean
/**
* Returns True if any of the elements of this stream matches the given predicate. Otherwise it returns False.
*
* <p>It doesn't traverse all the stream in all the cases. It stops on the first element that don't match the predicate</p>
*
* @param predicate
* @return
*/
def matchAny(predicate: T => Boolean): Boolean
}
Imports :
import org.example.xstreams.api.*
import org.example.xstreams.impl.XStreams.*
Create an infinite stream representing all integer numbers :
val stream: XStream[Int] = iterate(0, x => x + 1)
Filter only even numbers then print the first 10 numbers :
stream
.filter(n => n % 2 == 0)//Infinite stream
.take(5)//Finite stream
.forEach(println)
0
2
4
6
8
Flatmap a stream of streams :
println(
stream
.skip(1)
.flatMap(n => once("A" * n))
.take(5)
.toList
)
List(A, AA, AAA, AAAA, AAAAA)
Flatten example :
case class Bag(item1: String, item2: String, item3: String)
val bagStream = finite(
Seq(Bag("A", "B", "C"), Bag("E", "F", "G"), Bag("H", "I", "J"))
)
//Needed by the flatten function, because the Bag class doesn't implements IterableOne[+A] trait
implicit val bagAsIterableOnce: Bag => IterableOnce[String] = bag =>
new IterableOnce[String] {
override def iterator: Iterator[String] =
List(bag.item1, bag.item2, bag.item3).iterator
}
bagStream
.flatten
.take(100)
.forEach(println)
A
B
C
E
F
G
H
I
J
More examples :
Open this repository on any editor supporting scala (VS Code/Metal, Intellij Idea, GitPod, ...).
Then run the App.scala
main class.