How it all began
							
								“Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.”
							
						
	          			- Created by engineers from Typesafe, Netflix, Redhat, and more) plus Doug Lea :)
- 
	          				
Why
						- Efficiently processing large indeterminate streams is hard
- Avoiding blocking is essential to maximise performance
- Every stage in the stream needs to be able to push and pull
- We don't want to overload (or starve!) downstream consumers...
- we want to pass data across applications, nodes, CPUs, threads, actors, etc
- 
	          				
How
						- Treat data as a stream of elements
- Asynchronous non-blocking data and demand flows
- Demand flows upstream, causing data to flow downstream
- Data flow is therefore restricted by demand 
	          					
- Demand happens on a separate flow!
- Demand can be in blocks (give me the next 50!!)
- 
	          				
What
						- The Reactive Streams specification is just that
								- A collection of interfaces methods and protocols
- Provides example implementations and a TCK for verification
- Aimed at providing a way to build common implementations
 
- It's just a specification!!
- 
	          				
Introducing Akka Streams!!
						Akka's implementation of Reactive Streams
						
					Design Principles
						- Explicitness over magic (I'm looking at you Shapeless!)
- Fully composable- Each component, or set of componenents can be combined
 
- Each building block is immutable
- Fully compatible with other Reactive Stream implementations
Building blocks cont...
						- Source- Traditionally known as a producer
- Supplies messages that will flow downstream
- Exactly one output stream
 
- Sink- Traditionally known as a consumer
- End point of the stream, this is where messages end up
 
Building blocks cont...
						- Flow- A processing stage in the Stream
- Used to compose Streams
- Exactly one input and one output stream
- See also BidirectionalFlow (two in -> two out)
 
- Between the Source and Sink
- BidirectionalFlow is one of many - more later
Building blocks cont...
						- RunnableGraphs- A pre-assembled set of Stream components, packaged into a Graph.
- All exposed ports are connected (between a Source and Sink)
- This can then be Materialized
 
Building blocks cont...
						- Composite Flows- It is possible to wrap several components into more complex ones
- This composition can then be treated as one block
 
- Partial Flow Graphs- An incomplete Flow (Graph)
- Can be used to construct more complex Graphs easily
 
Building blocks cont...
						- Materializer- Once complete, the flow is Materialized in order to start stream processing
- Supports fully distributed stream processing
			         					- Each step must be either serializable immutable values or ActorRefs
 
- Fails immediately at runtime if the Graph isn't complete
 
Errors vs Failures
						- Errors handlied within the stream as normal data elements
			         			- Passed using the onNext function
 
- Failure means that the stream itself has failed and is collapsing
			         			- Raises the onError signal... (???)
 
- Each block in the flow can choose to absorb or propagate the errors
			         			- Possibly resulting the the complete collapse of the flow
 
- Supervisor strategies to manage how the stream handles failures
- 
	          				
First things first
						We need to create an ActorSystem and Materializer
						
implicit val system = ActorSystem("actors")
implicit val materializer = ActorMaterializer()
	          			
					Simple Stream
						We need to create an ActorSystem and Materializer
						
Source(1 to 5)
   .filter(_ < 3) // 1, 2
   .map(_ * 2) // 2, 4
   .to(Sink.foreach(println))
   .run()
//prints 2 4
	          			
	          			- The fixed source produces Int's from 1 to 5
- Filters, maps then passes them to a Sink which prints the results.
Composing elements together
						We can combine multiple components together
						
Composing elements together
val nestedSource = Source(1 to 5)
					.map(_ * 2)
val nestedFlow = Flow[Int]
					.filter(_ <= 4)
					.map(_ + 2)
val sink = Sink.foreach(println)
//link up the Flow to a Sink
val nestedSink = nestedFlow.to(Sink.foreach(println))
// Create a RunnableGraph - and run it! Prints 4 6
nestedSource.to(nestedSink).run()
	          			
	          			- Really useful when building complex streams
- 
	          				
Composing elements together cont...
						Alternatively we could do this, linking them in one step
						
nestedSource
	.via(nestedFlow)
	.to(Sink.foreach(println(_)))
	          			
	          			- via used to link up flows (processing stages)
- 
	          				
Composing elements together cont...
	          			Graph Processing Stages
						- Fan Out
			         			- 
Broadcast[T] – (1 input, N outputs)
- 
Balance[T] – (1 input, N outputs)
- ...
 
- Fan In
			         			- 
Merge[In] – (N inputs , 1 output)
- ...
 
- Timer Driven
			         			- 
groupedWithin(Int, Duration)
			         					- 
			         							Groups elements when either the number or duration is reached (whichever is first). Very useful for batching messages.
			         						
 
 
- See the Akka Stream docs for more!
Graph Processing Stages cont...
	          			The Graph DSL
						- Whenever you want to perform multiple operations to control the Flow of a Graph, manually constructing them as above can become very clumbersome and tedius, not to mentioned hard to maintain.
- For this reason the Akka team have written a DSL to help write complex Graphs.
- DSL is Scala only
- Makes visualising graphs in the code much easier!!
The Graph DSL
						
val g = FlowGraph.closed() { 
 implicit builder: FlowGraph.Builder[Unit] =>
   //This provides the DSL
   import FlowGraph.Implicits._ 
   val in = Source(1 to 3)
   val out = Sink.foreach(println)
   //2 outputs, 2 inputs
   val bcast = builder.add(Broadcast[Int](2)) 
   val merge = builder.add(Merge[Int](2)) 
   val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
   in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
               bcast ~> f4 ~> merge
}
g.run() //Prints 31 31 32 32 33 33
	          			
	          			- closed() ensures we end up with a complete graph (can also use partial instead...
- Makes visualising graphs in the code much easier!!
Example - Reactive Kafka
						- The guys at SoftwareMill have implemented a wrapper for Apache Kafka
			         			
- Tried and tested by yours truly
Example - Reactive Kafka cont...
						- Source is a Kafka Consumer
- Sink is a Kafka Publisher
val kafka = new ReactiveKafka()
val publisher: Publisher[StringKafkaMessage] = 
   kafka.consume(
      ConsumerProperties(...)
   )
val subscriber: Subscriber[String] = 
   kafka.publish(
      ProducerProperties(...)
   )
Source(publisher).map(_.message().toUpperCase)
   .to(Sink(subscriber)).run()
	          			
	          			A real world example cont...
	          			
FlowGraph.closed() { 
   implicit builder: FlowGraph.Builder[Unit] =>
   import FlowGraph.Implicits._
   val in = Source(kafkaConsumer)
   val out = Sink.foreach(println)
   val bcast = builder
      .add(Broadcast[StringKafkaMessage](2))
   val merge = builder
      .add(Merge[StringKafkaMessage](2))
   val parser1, parser2 = Flow[StringKafkaMessage]
      .map(...)
   val group = Flow[StringKafkaMessage].grouped(4)
   in ~> bcast ~> parser1 ~> merge ~> group ~> out
         bcast ~> parser2 ~> merge
}.run()
	          			
	          			 
		
					Akka Streams
					From Zero to Kafka
					
						Created by Mark Harrison / @markglh