As we have learned, Fetch performs batched requests whenever it can. It also exposes a couple knobs for tweaking the maximum batch size and whether multiple batches are run in parallel or sequentially.
When implementing a DataSource
, there is a method we can override called maxBatchSize
.
When implementing it we can specify the maximum size of the batched requests to this data source,
let’s try it out:
object BatchedUsers extends Data[UserId, User] {
def name = "Batched Users"
def source[F[_]: Concurrent]: DataSource[F, UserId, User] = new DataSource[F, UserId, User] {
override def data = BatchedUsers
override def CF = Concurrent[F]
override def maxBatchSize: Option[Int] = Some(2)
override def fetch(id: UserId): F[Option[User]] =
latency[F](s"One User $id") >> CF.pure(userDatabase.get(id))
override def batch(ids: NonEmptyList[UserId]): F[Map[UserId, User]] =
latency[F](s"Batch Users $ids") >> CF.pure(userDatabase.filterKeys(ids.toList.toSet).toMap)
}
}
def getBatchedUser[F[_]: Concurrent](id: Int): Fetch[F, User] =
Fetch(id, BatchedUsers.source)
We have defined the maximum batch size to be 2, let’s see what happens when running a fetch that needs more than two users:
def fetchManyBatchedUsers[F[_]: Concurrent]: Fetch[F, List[User]] =
List(1, 2, 3, 4).traverse(getBatchedUser[F])
Fetch.run[IO](fetchManyBatchedUsers).unsafeRunSync().size shouldBe res0
Batch execution strategy
In the presence of multiple concurrent batches,
we can choose between a sequential or parallel execution strategy.
By default they will be run in parallel,
but you can tweak it by overriding DataSource#batchExection
.
object SequentialUsers extends Data[UserId, User] {
def name = "Sequential Users"
def source[F[_]: Concurrent]: DataSource[F, UserId, User] = new DataSource[F, UserId, User] {
override def data = SequentialUsers
override def CF = Concurrent[F]
override def maxBatchSize: Option[Int] = Some(2)
override def batchExecution: BatchExecution = Sequentially // defaults to `InParallel`
override def fetch(id: UserId): F[Option[User]] =
latency[F](s"One User $id") >> CF.pure(userDatabase.get(id))
override def batch(ids: NonEmptyList[UserId]): F[Map[UserId, User]] =
latency[F](s"Batch Users $ids") >> CF.pure(userDatabase.filterKeys(ids.toList.toSet).toMap)
}
}
def getSequentialUser[F[_]: Concurrent](id: Int): Fetch[F, User] =
Fetch(id, SequentialUsers.source)
We have defined the maximum batch size to be 2 and the batch execution to be sequential, let’s see what happens when running a fetch that needs more than one batch:
def fetchManySeqBatchedUsers[F[_]: Concurrent]: Fetch[F, List[User]] =
List(1, 2, 3, 4).traverse(getSequentialUser[F])
Fetch.run[IO](fetchManySeqBatchedUsers).unsafeRunSync().size shouldBe res0