On Github easel / akka-streams-intro
May 11, 2016
What could be easier?
def getUrlContent(url: String) = Http(url).asString.body val FirstQuestionId = 4 val LastQuestionId = 37120353 val ids = Range(FirstQuestionId, LastQuestionId) // URL's are always http://stackoverflow.com/questions/$i. val urls = ids.map(id => s"http://stackoverflow.com/questions/$id") urls.take(10).map(getUrlContent).toList.map(_.length))
scala> /**
     |  * Time a function call returning a tuple of the elapsed time and the result
     |  */
     | def ptime[A](f: => A): (String, A) = {
     |   val t0 = System.nanoTime
     |   val ans = f
     |   val elapsed = f"${((System.nanoTime-t0)*1e-9)}%.3f sec"
     |   (elapsed, ans)
     | }
ptime: [A](f: => A)(String, A)
scala> ptime("My function result")
res1: (String, String) = (0.000 sec,My function result)
import scala.concurrent.duration._ import scala.concurrent._ import scala.concurrent.ExecutionContext.Implicits.global import scalaj.http._
java.lang.OutOfMemoryError: Java heap space at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:68) at java.lang.StringBuilder.<init>(StringBuilder.java:112) at scala.StringContext.standardInterpolator(StringContext.scala:123) at scala.StringContext.s(StringContext.scala:95) at $anonfun$1.apply(<console>:13) at $anonfun$1.apply(<console>:13) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) ... 21 elided
Scala attempted to materialize the entire list in memory.
scala> def getUrlContent(url: String) = Http(url).asString.body getUrlContent: (url: String)String scala> val idsIter = Range(4, 37120353).iterator idsIter: Iterator[Int] = non-empty iterator scala> val urls = idsIter.map(id => s"http://stackoverflow.com/questions/$id") urls: Iterator[String] = non-empty iterator scala> urls.next() res2: String = http://stackoverflow.com/questions/4 scala> ptime(urls.take(10).map(getUrlContent).toList.map(_.length)) res3: (String, List[Int]) = (0.641 sec,List(195, 210, 195, 166, 166, 172, 171, 177, 157, 178))
def getUrlContentThrottled(url: String) = {
    Thread.sleep(100)
    Http(url).asString.body
}
scala> ptime(urls.take(10).map(getUrlContentThrottled).toList.map(_.length)) res4: (String, List[Int]) = (1.256 sec,List(184, 190, 151, 157, 181, 172, 172, 172, 174, 164))
scala> def getAsync(url: String) = Future(getUrlContentThrottled(url)) getAsync: (url: String)scala.concurrent.Future[String] scala> lazy val futures = urls.take(10).toList.map(getAsync) futures: List[scala.concurrent.Future[String]] = <lazy> scala> lazy val futureSeq = Future.sequence(futures) futureSeq: scala.concurrent.Future[List[String]] = <lazy> scala> ptime(Await.result(futureSeq, 10.seconds).map(_.length)) res5: (String, List[Int]) = (0.274 sec,List(162, 157, 177, 163, 163, 168, 216, 168, 184, 170))
import scala.concurrent.duration._ import scala.concurrent._ import scala.concurrent.ExecutionContext.Implicits.global import akka.actor._, akka.stream._, akka.stream.scaladsl._, scalaj.http._
     | val idsIter = Range(4, 37120353).iterator
idsIter: Iterator[Int] = non-empty iterator
scala> val urls = idsIter.map(id => s"http://stackoverflow.com/questions/$id")
urls: Iterator[String] = non-empty iterator
scala> def getUrlContent(url: String) = Http(url).asString.body
getUrlContent: (url: String)String
scala> def getUrlContentThrottled(url: String) = {
     |     Thread.sleep(100)
     |     Http(url).asString.body
     | }
getUrlContentThrottled: (url: String)String
scala> def getAsync(url: String) = Future(getUrlContentThrottled(url))
getAsync: (url: String)scala.concurrent.Future[String]
scala> /**
     |  * Time a function call returning a tuple of the elapsed time and the result
     |  */
     | def ptime[A](f: => A): (String, A) = {
     |   val t0 = System.nanoTime
     |   val ans = f
     |   val elapsed = f"${((System.nanoTime-t0)*1e-9)}%.3f sec"
     |   (elapsed, ans)
     | }
ptime: [A](f: => A)(String, A)
scala> ptime("My function result")
res2: (String, String) = (0.000 sec,My function result)
object SpiderActor {
  sealed abstract class Message
  case class Request(url: String) extends Message
  case class Response(body: String) extends Message
  def getUrlContent(url: String): Future[Response] = Future{
    Response(Http(url).asString.body)
  }
  val ThrottleDelay = 100.milliseconds
  val MaxInFlight = 10
  def run(urls: Iterator[String]): Future[Seq[Int]] = {
    val promise = Promise[Seq[Int]]
    var lengths = Seq.empty[Int]
    def persist(body: String) = Future.successful {
      lengths :+= body.length
    }
    val system = ActorSystem()
    println("Starting SpiderActor")
    val actorRef = system.actorOf(Props(new SpiderActor(urls, persist)), "spider")
    println("SpiderActor Started")
    system.whenTerminated.map { x =>
      println("SpiderActor Shut Down")
      promise.complete(Try(lengths))
    }
    promise.future
  }
}
class SpiderActor(urls: Iterator[String], persist: (String) => Future[Unit]) extends Actor {
  import SpiderActor._
  var inFlight = 0
  private def nextRequest() = {
    if(inFlight < MaxInFlight && urls.nonEmpty) {
      inFlight += 1
      context.system.scheduler.scheduleOnce(ThrottleDelay, self, Request(urls.next))
    }
    else if (inFlight == 0 && urls.isEmpty) self ! PoisonPill
  }
  nextRequest()
  def receive: Receive = {
    case Request(url) =>
      nextRequest()
      getUrlContent(url).pipeTo(self)
    case Response(body: String) =>
      inFlight -= 1
      persist(body).map(_ => nextRequest())
  }
  @scala.throws[Exception](classOf[Exception])
  override def postStop(): Unit = {
    super.postStop()
    context.system.terminate()
  }
}
scala> import com.github.easel._ import com.github.easel._ scala> val u = urls.take(10) u: Iterator[String] = non-empty iterator scala> ptime(Await.result(SpiderActor.run(u), 10.seconds)) Starting SpiderActor SpiderActor Started res3: (String, Seq[Int]) = (0.950 sec,List(191, 195, 210, 195, 166, 166, 172, 171, 177, 157))
import scala.concurrent.duration._, scala.concurrent._, scala.concurrent.ExecutionContext.Implicits.global import akka.actor._, akka.stream._, akka.stream.scaladsl._, scalaj.http._, com.github.easel._ val source: Source[Int, akka.NotUsed] = Source(0 to 3) val flow: Flow[Int, String, akka.NotUsed] = Flow[Int].map(_.toString) val sink: Sink[Any, Future[akka.Done]] = Sink.foreach(println)
scala> def run = {
     |   implicit val actorSystem = ActorSystem()
     |   implicit val materializer = ActorMaterializer()
     |   val result = source.via(flow).runWith(sink)
     |   Await.result(result, 1.seconds)
     |   actorSystem.terminate()
     | }
run: scala.concurrent.Future[akka.actor.Terminated]
scala> run
0
1
2
3
res0: scala.concurrent.Future[akka.actor.Terminated] = List()
scala> val idsIter = Range(4, 37120353).iterator
idsIter: Iterator[Int] = non-empty iterator
scala> val urls = idsIter.map(id => s"http://stackoverflow.com/questions/$id")
urls: Iterator[String] = non-empty iterator
scala> def getUrlContent(url: String) = Http(url).asString.body
getUrlContent: (url: String)String
scala> def getUrlContentThrottled(url: String) = {
     |     Thread.sleep(100)
     |     Http(url).asString.body
     | }
getUrlContentThrottled: (url: String)String
scala> def getAsync(url: String) = Future(getUrlContentThrottled(url))
getAsync: (url: String)scala.concurrent.Future[String]
scala> /**
     |  * Time a function call returning a tuple of the elapsed time and the result
     |  */
     | def ptime[A](f: => A): (String, A) = {
     |   val t0 = System.nanoTime
     |   val ans = f
     |   val elapsed = f"${((System.nanoTime-t0)*1e-9)}%.3f sec"
     |   (elapsed, ans)
     | }
ptime: [A](f: => A)(String, A)
scala> ptime("My function result")
res2: (String, String) = (0.000 sec,My function result)
object SpiderStream {
  val Concurrency = 10
  val BatchSize = 100
  def source(args: Iterator[String]) = Source.fromIterator(() => args)
  def flow(f: (String) => Future[String]) = Flow[String]
    .throttle(1, 100.millis, 1, ThrottleMode.shaping)
    .mapAsync(Concurrency)(f)
    .map(_.length)
    .grouped(BatchSize)
  def run(args: Seq[String], f: (String) => Future[String]): Future[Seq[Int]] = {
    println("Starting")
    implicit val system = ActorSystem()
    system.whenTerminated.map { x =>
      println("Shut Down")
    }
    implicit val mat: Materializer = ActorMaterializer()
    val result = source(args.iterator)
      .via(flow(f))
      .runWith(Sink.head)
    result.map { x =>
      system.terminate()
      x
    }(system.dispatcher)
  }
}
scala> ptime((SpiderStream.run(urls.take(10).toSeq, (x) => getAsync(x)), 10.seconds)) Starting res3: (String, (scala.concurrent.Future[Seq[Int]], scala.concurrent.duration.FiniteDuration)) = (0.039 sec,(List(),10 seconds))
val singleFlow: Flow[String] = Flow[Int]
  .buffer(10, OverflowStrategy.backpressure) // buffer and apply backpressure
  .buffer(10, OverflowStrategy.dropTail) // buffer and drop old
  .delay(10.millis, DelayOverflowStrategy.backpressure) // force a delay downstream
  .throttle(1, 10.millis, 10, ThrottleMode.Shaping) // throttle to 1 in 10 milliseconds
  .map(_.toString) // map to a new type
  .async // introduce an async boundary
  .grouped(2) // convert every pair of ints to a Seq(1, 2)
  .mapConcat(identity) // expand the groups back to ints
  .mapAsyncUnordered(10)(Future.successful) // do 10 things asynchronously
  .intersperse(",") // intersperse "," similar to mkString
flow
    .broadcast //(1 input, n outputs) signals each output given an input signal,
    .zip // (2 inputs => 1 output) create tuples of (A, B) from a stream of A and a stream of B
    .unzip // (1 input => 2 outputs) unzip tuples of (A, B) into two streams one type A and one of type B
    .merge // (n inputs , 1 output), picks signals randomly from inputs pushing them one by one to its output,
    .concat //  (2 inputs, 1 output), first consume one stream, then the second stream
    .interleave // mix N elements from A, then N elements from B
/**
  * Akka Streams graph stage thats accepts a *SORTED* list stream of (Master, Detail) and
  * groups them into Combined records. Accomplishes the equivalent of groupBy(Master) but optimized
  * for a sorted stream.
  */
class MasterDetailGraphStage[Combined, Master, Detail]
(combine: (Set[(Master, Detail)]) => Combined)
  extends GraphStage[FlowShape[(Master, Detail), Combined]] {
  type MasterDetail = (Master, Detail)
  val in = Inlet[(Master, Detail)]("in")
  val out = Outlet[Combined]("out")
  val shape = FlowShape.of(in, out)
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var buffer = Set.empty[MasterDetail]
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val elem = grab(in)
          if (buffer.nonEmpty) {
            if (buffer.head._1 == elem._1) {
              buffer = buffer + elem
              pull(in)
            } else {
              push(out, combine(buffer))
              buffer = Set(elem)
            }
          } else {
            buffer = Set(elem)
            pull(in)
          }
        }
        override def onUpstreamFinish(): Unit = {
          if (buffer.nonEmpty) emit(out, combine(buffer))
          complete(out)
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}
/**
* Objects which can be identified by a key and ordered
*/
trait Keyed {
  def key: String
}
object Keyed {
  implicit def ordering[T <: Keyed]: Ordering[T] = new Ordering[T] {
    override def compare(x: T, y: T): Int = {
      x.key.compareTo(y.key)
    }
  }
}
/**
* An interface for services which provided paginated results in batches.
*/
trait BatchLoader[T <: Keyed] {
  def load(offset: Option[String]): Future[Batch[T]]
  def source(implicit ec: ExecutionContext): Source[T, ActorRef] = {
    Source.actorPublisher(BatchSource.props[T](this))
  }
}
class BatchSource[A <: Keyed](loader: BatchLoader[A])(implicit ec: ExecutionContext) extends ActorPublisher[A] {
  import BatchSource._
  import akka.stream.actor.ActorPublisherMessage._
  private var first = true
  private var nextOffset: Option[String] = None
  private var buffer: Seq[A] = Seq.empty
  def receive: Receive = waitingForDownstreamReq(0)
  case object Pull
  private def shouldLoadMore = {
    nextOffset.isDefined && (totalDemand > 0 || buffer.length < BUFFER_AMOUNT)
  }
  def waitingForDownstreamReq(offset: Long): Receive = {
    case Request(_) | Pull =>
      val sent = if (buffer.nonEmpty) {
        sendFromBuff(totalDemand)
      } else {
        0
      }
      if (first || (shouldLoadMore && isActive)) {
        first = false
        loader.load(nextOffset).pipeTo(self)
        context.become(waitingForFut(offset + sent, totalDemand))
      }
    case Cancel => context.stop(self)
  }
  def sendFromBuff(demand: Long): Long = {
    val consumed = buffer.take(demand.toInt).toList
    buffer = buffer.drop(consumed.length)
    consumed.foreach(onNext)
    if (nextOffset.isEmpty && buffer.isEmpty) {
      onComplete()
    }
    consumed.length.toLong
  }
  def waitingForFut(s: Long, beforeFutDemand: Long): Receive = {
    case batch: Batch[A] =>
      nextOffset = if (batch.items.isEmpty) {
        None
      } else {
        batch.nextOffset
      }
      buffer = buffer ++ batch.items
      val consumed = sendFromBuff(beforeFutDemand)
      self ! Pull
      context.become(waitingForDownstreamReq(s + consumed))
    case Request(_) | Pull => // ignoring until we receive the future response
    case Status.Failure(err) =>
      context.become(waitingForDownstreamReq(s))
      onError(err)
    case Cancel => context.stop(self)
  }
}
object BatchSource {
  final val BUFFER_AMOUNT = 1000
  def props[T <: Keyed](loader: BatchLoader[T])(implicit ec: ExecutionContext): Props = {
    Props(new BatchSource[T](loader))
  }
}
Further Resources