On Github giokincade / scalding-talk
Scala
Hadoop
Cascading
Scalding
scala> "blah and blah" res4: java.lang.String = blah and blah scala> "blah and blah".urlencode() error: value urlencode is not a member of java.lang.String
class WrappedString(val x:String) {
def urlencode() = URLEncoder.encode(x)
}
implicit def stringToWrappedString(x:String): WrappedString = new WrappedString(x)
scala> "blah and blah".urlencode() res2: java.lang.String = blah+and+blah! scala> stringToWrappedString(“blah and blah”).urlencode() res2: java.lang.String = blah+and+blah!
def emailSignature(name:String)(implicit organization:String) = "Sincerely Yours,\n" + name + "\n" + organization
scala> implicit val organization = "Etsy, Inc." organization: java.lang.String = Etsy, Inc.
scala> emailSignature("Giovanni Fernandez-Kincade")
res3: java.lang.String =
Sincerely Yours,
Giovanni Fernandez-Kincade
Etsy, Inc.
val words = List("one", "two", "three")
words
.map {
word =>
Map(word.substring(0,1) -> 1)
}
.reduce {
(a, b) =>
a ++ b.map {
mapTuple =>
val (key, count) = mapTuple
(key, a.getOrElse(key, 0) + count)
}
}
val words = List("one", "two", "three")
words
.map {
word =>
Map(word.substring(0,1) -> 1)
}
List(Map(o -> 1), Map(t -> 1), Map(t -> 1))
List(Map(o -> 1), Map(t -> 1), Map(t -> 1))
.reduce {
(a, b) =>
a ++ b.map {
mapTuple =>
val (key, count) = mapTuple
(key, a.getOrElse(key, 0) + count)
}
}
Map(o -> 1, t -> 2)
~> echo "one\ntwo\nthree" > ~/foo.txt ~> hadoop fs -put ~/foo.txt
void map(K1 key, V1 value, OutputCollector<K2,V2> output)
void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3,V3> output)
cascading.tuple.Tuple
val tuple = new Tuple(
1.asInstanceOf[Object],
"gigi".asInstanceOf[Object],
"miami".asInstanceOf[Object]
)
val fields = new Fields( "user_id", "user_name", "location" )
val one = new TupleEntry( fields, tuple )
> one.get("user_id")
res4: java.lang.Comparable[_] = 1
> one.get("user_name")
res5: java.lang.Comparable[_] = gigi
def cascadingTuple[T](fieldName:String, entry:T): TupleEntry =
new TupleEntry(
new Fields(fieldName),
new Tuple(entry.asInstanceOf[Object])
)
val words = List(
cascadingTuple("word", "one"),
cascadingTuple("word", "two"),
cascadingTuple("word", "three")
)
words
.map {
tuple =>
cascadingTuple(
"map",
Map(tuple.getString("word").substring(0,1) -> 1)
)
}
.reduce {
(tupleA, tupleB) =>
val (a,b) = (
tupleA.getObject("map").asInstanceOf[Map[String,Int]],
tupleB.getObject("map").asInstanceOf[Map[String,Int]],
)
val result = a ++ b.map {
mapTuple =>
val (key, count) = mapTuple
(key, a.getOrElse(key, 0) + count)
}
cascadingTuple("map", result)
}
> run fields: ['map'] tuple: ['Map(o -> 1, t -> 2)']
List("one", "two", "three")
.groupBy(_.substring(0,1))
res1: Map(o -> List(one), t -> List(two, three))
List("one", "two", "three")
.groupBy(_.substring(0,1))
.map {
(tuple) =>
val (key, value) = tuple
(key, value.size)
}
res1: Map(o -> 1, t -> 2)
import com.twitter.scalding._
class ExampleJob(args: Args)
extends Job(args) {
TextLine("data/words.txt")
.map('line -> 'first_character) {
(line:String) =>
line.substring(0,1)
}
.groupBy('first_character) {
(g: GroupBuilder) =>
g.size('size)
}
.write(Tsv("data/output/characters.tsv"))
}
class ExampleJob(args: Args)
extends Job(args) {
TextLine("data/words.txt")
fields: ['line'] tuple: ['one'] fields: ['line'] tuple: ['two'] fields: ['line'] tuple: ['three']
tuple.getString("line")
.map('line -> 'first_character) {
(line:String) =>
line.substring(0,1)
}
tuple.setString("first_character”, line.substring(0,1))
fields: ['line', 'first_character'] tuple: ['one', 'o'] fields: ['line', 'first_character'] tuple: ['two', 't'] fields: ['line', 'first_character'] tuple: ['three', 't']
tuple.getObject("first_character")
.groupBy('first_character) {
(g: GroupBuilder) =>
g.size('size)
}
tuple.setInt("size", ...)
fields: ['first_character', 'size'] tuple: ['o', 1] fields: ['first_character', 'size'] tuple: ['t', 2]
.groupBy('first_character) {
(g: GroupBuilder) =>
g.size('size)
}
fields: ['first_character', ‘size’] tuple: ['o', ‘1’] fields: ['first_character', ‘size’] tuple: ['t', ‘2’]
.write(Tsv("data/output/characters.tsv"))
o 1 t 2
TextLine("data/words.txt")
.map('line -> 'first_character) {
class Job(val args: Args)
extends FieldConversions
with java.io.Serializable {
implicit def pipeToRichPipe(pipe: Pipe): RichPipe =
new RichPipe(pipe)
implicit def sourceToRichPipe(src: Source): RichPipe =
new RichPipe(src.read)
def map[A, T](fs: (Fields, Fields))
(fn: A => T)
(implicit conv: TupleConverter[A],
setter: TupleSetter[T]): Pipe
.map('line -> 'first_character) {
def map[A, T](fs: (Fields, Fields))
implicit def symbolToFields(x: Symbol) = {
if (x == '*) {
Fields.ALL
} else {
new Fields(x.name)
}
}
.map('line -> 'first_character) {
(line:String) =>
line.substring(0,1)
}
def map[A, T](fs: (Fields, Fields))
(fn: A => T)
implicit def tuple1Converter[A](implicit gA: TupleGetter[A]): TupleConverter[Tuple1[A]]
implicit object StringGetter extends TupleGetter[String] {
override def get(tup: CTuple, i: Int) = tup.getString(i)
}
.map('line -> 'first_character) {
(line:String) =>
line.substring(0,1)
}
def map[A, T](fs: (Fields, Fields))
(fn: A => T)
(implicit conv: TupleConverter[A],
setter: TupleSetter[T]): Pipe
implicit def singleSetter[A]: TupleSetter[A] = new TupleSetter[A] {
override def apply(arg: A) = {
val tup = CTuple.size(1)
tup.set(0, arg)
tup
}
> runMain com.twitter.scalding.Tool
com.giokincade.scalding.ExampleJob --local
hadoop jar foo.jar com.giokincade.scalding.ExampleJob --hdfs
TypedPipe.from(
TextLine("data/words.txt")
)
.map(line => line.substring(0,1))
.groupBy(character => character)
.size
.write(
TypedTsv[(String, Long)]("data/output/typed-characters.tsv")
)