Ville Seppänen @Vilsepi | Jari Voutilainen @Zharktas | @GoforeOy
All presentation material is available at https://github.com/gofore/aws-emr
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input my/Input/Directories \
-output my/Output/Directory \
-mapper myMapperProgram.py \
-reducer myReducerProgram.py
cat input_data.txt | mapper.py | reducer.py > output_data.txt
The endlessly fascinating example of counting words in Hadoop
#!/usr/bin/python
import sys
import re
pattern = re.compile("[a-zA-Z][a-zA-Z0-9]*")
for line in sys.stdin:
for word in pattern.findall(line):
print "LongValueSum:" + word.lower() + "\t" + "1"
LongValueSum:i 1 LongValueSum:count 1 LongValueSum:words 1 LongValueSum:with 1 LongValueSum:hadoop 1
<link>
<linkno>310102</linkno>
<startsite>1108</startsite>
<endsite>1107</endsite>
<name language="en">Hallila -> Kaukajärvi</name>
<name language="fi">Hallila -> Kaukajärvi</name>
<name language="sv">Hallila -> Kaukajärvi</name>
<distance>
<value>3875.000</value>
<unit>m</unit>
</distance>
</link>
Static link information (271kb xml)
642 one-way links, 243 sites
<ivjtdata duration="60" periodstart="2014-06-24T02:55:00Z">
<recognitions>
<link id="110302" data_source="1">
<recognition offset_seconds="8" travel_time="152"></recognition>
<recognition offset_seconds="36" travel_time="155"></recognition>
</link>
<link id="410102" data_source="1">
<recognition offset_seconds="6" travel_time="126"></recognition>
<recognition offset_seconds="45" travel_time="152"></recognition>
</link>
<link id="810502" data_source="1">
<recognition offset_seconds="25" travel_time="66"></recognition>
<recognition offset_seconds="34" travel_time="79"></recognition>
<recognition offset_seconds="35" travel_time="67"></recognition>
<recognition offset_seconds="53" travel_time="58"></recognition>
</link>
</recognitions>
</ivjtdata>
Each file contains finished passthroughs for each road segment during one minute.
{
"sites": [
{
"id": "1205",
"name": "Viinikka",
"lat": 61.488282,
"lon": 23.779057,
"rno": "3495",
"tro": "3495/1-2930"
}
],
"links": [
{
"id": "99001041",
"name": "Hallila -> Viinikka",
"dist": 5003.0,
"site_start": "1108",
"site_end": "1205"
}]
}
Static link information (120kb json)
{
"date": "2014-06-01T02:52:00Z",
"recognitions": [
{
"id": "4510201",
"tt": 117,
"cars": 4,
"itts": [
100,
139,
121,
110
]
}
]
}
#!/usr/bin/env python
import boto.emr
from boto.emr.instance_group import InstanceGroup
# Requires that AWS API credentials have been exported as env variables
connection = boto.emr.connect_to_region('eu-west-1')
instance_groups = []
instance_groups.append(InstanceGroup(
role="MASTER", name="Main node",
type="m1.medium", num_instances=1,
market="ON_DEMAND"))
instance_groups.append(InstanceGroup(
role="CORE", name="Worker nodes",
type="m1.medium", num_instances=3,
market="ON_DEMAND"))
instance_groups.append(InstanceGroup(
role="TASK", name="Optional spot-price nodes",
type="m1.medium", num_instances=20,
market="SPOT", bidprice=0.012))
cluster_id = connection.run_jobflow(
"Our awesome cluster",
instance_groups=instance_groups,
action_on_failure='CANCEL_AND_WAIT',
keep_alive=True,
enable_debugging=True,
log_uri="s3://our-s3-bucket/logs/",
ami_version="3.3.1",
bootstrap_actions=[],
ec2_keyname="name-of-our-ssh-key",
visible_to_all_users=True,
job_flow_role="EMR_EC2_DefaultRole",
service_role="EMR_DefaultRole")
steps = []
steps.append(boto.emr.step.StreamingStep(
"Our awesome streaming app",
input="s3://our-s3-bucket/our-input-data",
output="s3://our-s3-bucket/our-output-path/",
mapper="our-mapper.py",
reducer="aggregate",
cache_files=[
"s3://our-s3-bucket/programs/our-mapper.py#our-mapper.py",
"s3://our-s3-bucket/data/our-dictionary.json#our-dictionary.json",)
],
action_on_failure='CANCEL_AND_WAIT',
jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'))
connection.add_jobflow_steps(cluster_id, steps)
#!/usr/bin/env python
import boto.emr
from boto.emr.instance_group import InstanceGroup
connection = boto.emr.connect_to_region('eu-west-1')
cluster_id = connection.run_jobflow(**cluster_parameters)
connection.add_jobflow_steps(cluster_id, **steps_parameters)
# Create new cluster aws-tools/run-jobs.py create-cluster "Car speed counting cluster" Starting cluster j-F0K0A4Q9F5O0 Car speed counting cluster
# Add job step to the cluster aws-tools/run-jobs.py run-step j-F0K0A4Q9F5O0 05-car-speed-for-time-of-day_map.py digitraffic/munged/links-by-month/2014 Step will output data to s3://hadoop-seminar-emr/digitraffic/outputs/ 2015-02-18_11-08-24_05-car-speed-for-time-of-day_map.py/
# Download and concatenate output aws s3 cp s3://hadoop-seminar-emr/digitraffic/outputs/2015-02-18_11-08-24_05-car-speed-for-time-of-day_map.py/ /tmp/emr --recursive --profile hadoop-seminar-emr cat /tmp/emr/part-* > /tmp/emr/output
# Analyze results result-analysis/05_speed_during_day/05-car-speed-for-time-of-day_output.py /tmp/emr/output example-data/locationdata.json