Let's assume you need to process millions of records from some third party data source. This data source will usually provide a pagination mechanism that looks like this:
interface SomeThirdPartyRepository {
fun searchBy(someCriteria: String, pageSize: Int = 100): ResultPage
fun nextPage(cursor: Cursor): ResultPage
data class ResultPage(val results: List<String>, val cursor: Cursor)
data class Cursor(val value: String)
}
Your code to process the records currently looks like this:
class InitialProcessor(
private val thirdPartyRepository: SomeThirdPartyRepository
) : Processor {
override fun execute() {
var currentPage = thirdPartyRepository.searchBy("someCriteria", pageSize = 100)
while (currentPage.results.isNotEmpty()) {
currentPage.results // 100 items
.map { it.uppercase() }
.forEach { someSideEffect(it) }
currentPage = thirdPartyRepository.nextPage(currentPage.cursor)
}
}
}
This code works, but it doesn’t feel right. You are concerned that your domain logic is mixed with the pagination logic. It is also not reusable, your codebase will be littered with references to SomeThirdPartyRepository
. You can do better.
You decide to create your own wrapper around SomeThirdPartyRepository
.
You can then reuse this wrapper across your codebase. Any changes to the logic for fetching data will happen only in this wrapper. You also get the ability to test the pagination logic in isolation for free.
The universe is in balance again 🧘🏽♀️. Good job!
class SimpleSearchRepository(
private val thirdPartyRepository: SomeThirdPartyRepository
) {
fun search(someCriteria: String): List<String> {
val mutableList = mutableListOf<String>()
var currentPage = thirdPartyRepository.searchBy(someCriteria, pageSize = 100)
while (currentPage.results.isNotEmpty()) {
mutableList.addAll(currentPage.results)
currentPage = thirdPartyRepository.nextPage(currentPage.cursor)
}
return mutableList
}
}
class SimpleProcessor(
private val simpleSearchRepository: SimpleSearchRepository
) : Processor {
override fun execute() {
val items = simpleSearchRepository.search("someCriteria")
processItems(items)
}
private fun processItems(items: Collection<String>): Unit =
items // contains millions of records
.map { it.uppercase() } // creates an intermediary collection of same size
.forEach { someSideEffect(it) }
}
Your application suddenly starts running out of memory 🧨💥, and you realize the shortcomings of this implementation.
The issue here is that the repository loads millions🤯 of records in memory. This is one of the problems the third-party repository was trying to avoid by providing a pagination mechanism.
Fortunately, you learned about Kotlin Sequences just in time for this incident.
Sequences are the lazy person’s version of collections, the Kotlin equivalent to Java Streams.
Instead of returning a List<String>
containing millions of records, your repository now returns a Sequence<String>
that provides one item at a time, on-demand.
class SophisticatedSearchRepository( // 🧐
private val thirdPartyRepository: SomeThirdPartyRepository
) {
fun search(someCriteria: String): Sequence<String> =
pageSequence(someCriteria)
.flatMap { it.results }
// flatMap converts Seq[Page(a,b),Page(c,d)] -> Seq[a,b,c,d]
private fun pageSequence(
someCriteria: String
): Sequence<SomeThirdPartyRepository.ResultPage> {
val firstPage = thirdPartyRepository.searchBy(someCriteria, pageSize = 100)
return generateSequence(firstPage) { previousPage ->
if (previousPage.results.isNotEmpty()) {
thirdPartyRepository.nextPage(previousPage.cursor)
} else null // returning null closes the sequence
}
}
}
class SophisticatedProcessor( // sophisticated only by association 😁
private val sophisticatedSearchRepository: SophisticatedSearchRepository
) : Processor {
override fun execute() {
val items = sophisticatedSearchRepository.search("someCriteria")
processItems(items)
}
private fun processItems(items: Sequence<String>): Unit =
items
.map { it.uppercase() } // transforms one item at a time, NO intermediary collection created
.forEach { someSideEffect(it) }
}
The code for processItems()
does not change because Sequence<T>
has a similar API to Collection<T>
.
⚠️ The consumer of the sequence receives one item at a time, but our repository still loads N=pageSize
items in memory.
The appropriate value for pageSize
will depend on the size of an item and the cost of fetching a page. A large page size might consume more memory, but a small one will cause a lot of potentially expensive round trips.