๐ค 2022-07-25 ๐ค
DelayCountMapperWithMultipleOutputs.java
package com.gyuone.mapper;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.gyuone.common.AirlinePerformanceParser;
import com.gyuone.common.DelayCounters;
public class DelayCountMapperWithMultipleOutputs extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable outputValue = new IntWritable(1);
private Text outputKey = new Text();
@Override // map override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
if (parser.isDepartureDelayAvailable()) {
if (parser.getDepartureDelayTime() > 0) {
outputKey.set("D, " + parser.getYear() + "," + parser.getMonth());
context.write(outputKey, outputValue);
} else if (parser.getDepartureDelayTime() == 0) {
context.getCounter(DelayCounters.scheduled_departure).increment(1);
} else if (parser.getDepartureDelayTime() < 0) {
context.getCounter(DelayCounters.early_departure).increment(1);
}
} else {
context.getCounter(DelayCounters.not_available_departure).increment(1);
}
if (parser.isArriveDelayAvailable()) {
if (parser.getArriveDelayTime() > 0) {
outputKey.set("A, " + parser.getYear() + "," + parser.getMonth());
context.write(outputKey, outputValue);
} else if (parser.getArriveDelayTime() == 0) {
context.getCounter(DelayCounters.scheduled_arrival).increment(1);
} else if (parser.getArriveDelayTime() < 0) {
context.getCounter(DelayCounters.early_arrival).increment(1);
}
} else {
context.getCounter(DelayCounters.not_available_arrival).increment(1);
}
}
}
DelayCountReducerWithMultipleOutputs.java
package com.gyuone.reducer;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class DelayCountReducerWithMultipleOutputs extends Reducer<Text, IntWritable, Text, IntWritable> {
private Text outputKey = new Text();
private IntWritable result = new IntWritable();
private MultipleOutputs<Text, IntWritable> mos;
@Override
protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
mos = new MultipleOutputs<Text, IntWritable>(context);
}
@Override
protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
mos.close();
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] columns = key.toString().split(",");
outputKey.set(columns[1] + "," + columns[2]); // month
int sum = 0;
for(IntWritable value: values) {
sum += value.get();
}
result.set(sum);
if(columns[0].equals("D")) {
mos.write("departure", outputKey, result); // ์ถ๋ ฅ์ผ๋ก ๋ค์ด๊ฐ ๋๋ ํ ๋ฆฌ๋ช
์ง์ departure
}else {
mos.write("arrival", outputKey, result);
}
}
}
driver
DelayCountWithMultipleOutputs.java
package com.gyuone.driver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.gyuone.mapper.DelayCountMapperWithMultipleOutputs;
import com.gyuone.reducer.DelayCountReducerWithMultipleOutputs;
public class DelayCountWithMultipleOutputs extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new DelayCountWithMultipleOutputs(), args);
System.out.println("MapReduce Result" + res);
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
if (otherArgs.length != 2) {
System.out.println("Usage: DelayCountWithMultipleOutputs <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(getConf(), "DelayCountWithMultipleOutputs");
job.setJarByClass(DelayCountWithMultipleOutputs.class);
job.setMapperClass(DelayCountMapperWithMultipleOutputs.class);
job.setReducerClass(DelayCountReducerWithMultipleOutputs.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
MultipleOutputs.addNamedOutput(job, "departure", TextOutputFormat.class,
Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class,
Text.class, IntWritable.class);
job.waitForCompletion(true);
return 0;
}
}
scp .\AirPerformance-0.5 jar ubuntu22:~/work/jar
mapred historyserver start& -> exit
yarn jar ./AirPerformance-0.5.jar com.gyuone.driver.DelayCountWithMultipleOutputs air-input delay-count-mos
hdfs dfs -ls
hdfs dfs -ls delay-count-mos
part-r-00000๊ฐ 0์ด๋ฏ๋ก written์์ 0์ด ์ฐํ๋ค.
โ ๊ธฐ์กด ํญ๊ณต ์ดํญ ๋ฐ์ดํฐ ๋ถ์ ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด ์ ์์๋๋ก ์ถ๋ ฅ๋์ง ์์์
โ ์ถ๋ ฅ ๋ฐ์ดํฐ์ ํค๊ฐ ์์ฒด๊ฐ ์ฐ๋์ ์์ด ํฉ์ณ์ง ํ๋์ ๋ฌธ์์ด๋ก ์ธ์
โ ๋ณด์กฐ์ ๋ ฌ(Secondary Sort)์ ํค์ ๊ฐ๋ค์ ๊ทธ๋ฃนํํ๊ณ ๊ทธ๋ฃนํ๋ ๋ ์ฝ๋์ ์์๋ฅผ ๋ถ์ฌํ๋ ๋ฐฉ์
โ ๋ณด์กฐ์ ๋ ฌ ๊ตฌํ์์
a. ๊ธฐ์กด ํค์ ๊ฐ๋ค์ ์กฐํฉํ ๋ณตํฉํค(Composite Key)๋ฅผ ์ ์, ์ด ๋ ํค์ ๊ฐ ์ค์์ ์ด๋ค ํค๋ฅผ ๊ทธ๋ฃนํ ํค๋ก ์ฌ์ฉํ ์ง ๊ฒฐ์
b. ๋ณตํฉํค์ ๋ ์ฝ๋๋ฅผ ์ ๋ ฌํ๊ธฐ ์ํ ๋น๊ต๊ธฐ(Comparator)๋ฅผ ์ ์
c. ๊ทธ๋ฃนํ ํค๋ฅผ ํํฐ์
๋ํ ํํฐ์
๋(Partitioner)๋ฅผ ์ ์
๊ทธ๋ฃนํ ํค๋ฅผ ์ ๋ ฌํ๊ธฐ ์ํ ๋น๊ต๊ธฐ(Comparator)๋ฅผ ์ ์
vi ํด์ dd :wq -> atom ์ผ๋ก ๋ฐ๋ก ์ ์ฅํด๋๊ธฐ
scp ./* ubuntu22:~/work/air-data/
"YEAR","MONTH","DAY_OF_MONTH","DAY_OF_WEEK","FL_DATE","UNIQUE_CARRIER","TAIL_NUM","FL_NUM","ORIGIN_AIRPORT_ID","ORIGIN","ORIGIN_STATE_ABR","DEST_AIRPORT_ID","DEST","DEST_STATE_ABR","CRS_DEP_TIME","DEP_TIME","DEP_DELAY","DEP_DELAY_NEW","DEP_DEL15","DEP_DELAY_GROUP","TAXI_OUT","WHEELS_OFF","WHEELS_ON","TAXI_IN","CRS_ARR_TIME","ARR_TIME","ARR_DELAY","ARR_DELAY_NEW","ARR_DEL15","ARR_DELAY_GROUP","CANCELLED","CANCELLATION_CODE","DIVERTED","CRS_ELAPSED_TIME","ACTUAL_ELAPSED_TIME","AIR_TIME","FLIGHTS","DISTANCE","DISTANCE_GROUP","CARRIER_DELAY","WEATHER_DELAY","NAS_DELAY","SECURITY_DELAY","LATE_AIRCRAFT_DELAY",โ
๊ธฐ์กด์ ์๋ input csv๋ ๋ฆฌ๊ธฐ hdfs dfs -rm air-input/*
put์ ์๋ก์ด csv์ ์ฅํ๊ธฐ hdfs dfs -put ./* air-input/
๋ค๋ฅธ input์ผ๋ก ์ง์ด๋ฃ๊ธฐ sortํ๋ csvํ์ผ๋ก ๋ฐ๊พธ๊ธฐ/ ls๋ก ์ ๋ค์ด๊ฐ๋์ง ํ์ธ
DataKey.java
package com.gyuone.common;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
public class DateKey implements WritableComparable<DateKey> {
private String year;
private Integer month;
public DateKey() {
}
public DateKey(String year, Integer month) {
super();
this.year = year;
this.month = month;
}
public String getYear() {
return year;
}
public void setYear(String year) {
this.year = year;
}
public Integer getMonth() {
return month;
}
public void setMonth(Integer month) {
this.month = month;
}
@Override
public int compareTo(DateKey o) { // ๋๊ฐ ๊ฐ์ ๋น๊ตํ ๋ ์ผ์ชฝ์ด ํฌ๋ฉด +1 ๊ฐ์ผ๋ฉด 0 ๋ค์๊บผ๊ฐ ํฌ๋ฉด -1 ์๋ ๋ง์ฐฌ๊ฐ์ง
// TODO Auto-generated method stub
int result = year.compareTo(o.year);
if (result == 0) { // year๊ฐ ๊ฐ์ผ๋ฉด if๋ฌธ ์๋ค์ด๊ฐ
result = month.compareTo(o.month); // ์ถ๊ฐ๋ก compareTo
}
return result;
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
WritableUtils.writeString(out, year);
out.writeInt(month); // intํ์
์ผ๋ก writeํด๋ผ
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
year = WritableUtils.readString(in);
month = in.readInt(); // ํ๋ก document์ ์ ์๋์ด ์๋ค.
}
@Override
public String toString() {
// TODO Auto-generated method stub
return new StringBuilder().append(year).append(",").append(month).toString(); // key๋ฅผ ์๋ก ์์ฑํด์ค๋ค.
}
}
์ ์ฌ....
โ ๋ฆฌ๋์์๋ ๊ทธ๋ฃนํ ํํฐ์
๋์ ๊ทธ๋ฃนํ comparator์ ์ํด ๊ฐ์ ์ฐ๋๋ก ๊ทธ๋ฃนํ๋ ๋ฐ์ดํฐ๊ฐ ์ ๋ฌ๋ ์ํ
โ ๋ณตํฉํค comparator๋ก ์ธํด ๊ทธ๋ฃนํ๋ ๊ฐ์ ์์ ์์๋๋ก ์ค๋ฆ์ฐจ์์ผ๋ก ์ ๋ ฌ๋์ด ์์.. ์ ๋ ฌ๋ ์ํ๋ก ๋ค์ด์จ๋ค.
โ ํ์ง๋ง ๋ฆฌ๋์ ๋ฉ์๋์์ ์ง์ฐ ํ์๋ฅผ ํฉ์ฐํ ๊ฒฝ์ฐ ๋ฐ์ดํฐ์ ์ค๋ฅ๊ฐ ๋ฐ์
โ ์๋ฅผ ๋ค์ด, 2008๋
๋ ํญ๊ณต ์ถ๋ฐ ์ง์ฐ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ๊ฒฝ์ฐ ๋ค์๊ณผ ๊ฐ์ ๊ฒฐ๊ณผ๊ฐ ๋ํ๋จ
EX) 2008 12 2647363
โ 2008๋
๋ 12์๋ง ์ถ๋ ฅ๋๊ณ ์ง์ฐ ํ์๋ 2008๋
๋์ ๋ชจ๋ ์ง์ฐ ํ์๊ฐ ํฉ์ฐ๋์ด ์ถ๋ ฅ๋จ
โ ์ด๋ฌํ ํ์์ด ๋ํ๋๋ ์ด์ ๋ ๋ฆฌ๋์๋ 2008๋
์ด๋ผ๋ ๊ทธ๋ฃนํค๋ฅผ ๊ธฐ์ค์ผ๋ก ์ฐ์ฐ์ ์ํํ๊ธฐ ๋๋ฌธ
โ ์๋ณ๋ก ์ง์ฐ ํ์๋ฅผ ๊ณ์ฐํ๋ ค๋ฉด ๋ณตํฉํค๋ฅผ ๊ตฌ๋ถํด์ ์ฒ๋ฆฌํ๋ ์ฝ๋๋ฅผ ๊ตฌํํด์ผ ํจ
โ ์
๋ ฅ ๋ฐ์ดํฐ์ ๊ฐ์ ํด๋นํ๋ Iterable ๊ฐ์ฒด๋ฅผ ์ํํ ๋ ์์ ํด๋นํ๋ ๊ฐ์ bMonth ๋ผ๋ ๋ณ์์ ๋ฐฑ์
โ ์ํ๋ฅผ ํ๋ฉด์ ๋ฐฑ์
๋ ์๊ณผ ํ์ฌ ๋ฐ์ดํฐ์ ์์ด ์ผ์นํ์ง ์์ ๋๋
๋ฆฌ๋์์ ์ถ๋ ฅ ๋ฐ์ดํฐ์ ๋ฐฑ์
๋ ์์ ์ง์ฐ ํ์๋ฅผ ์ถ๋ ฅ
โ ์ด ๋, ๋ค์ ์์์ ์๋ ์์ ์ง์ฐ ํ์๋ฅผ ํฉ์ฐํ ์ ์๊ฒ ์ง์ฐ ํ์ ํฉ๊ณ ๋ณ์๋ฅผ 0์ผ๋ก ์ด๊ธฐํ
DateKeyComparator.java
package com.gyuone.common;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class DateKeyComparator extends WritableComparator {
protected DateKeyComparator() {
super(DateKey.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
DateKey k1 = (DateKey)a;
DateKey k2 = (DateKey)b;
int cmp = k1.getYear().compareTo(k2.getYear());
if(cmp !=0 ) {
return cmp;
}
return k1.getMonth() == k2.getMonth() ? 0:(k1.getMonth() < k2.getMonth() ? -1: 1);
// k1 ๊ฐ์ผ๋ฉด 0 ๋ฆฌํด ๋ค๋ฅด๋ฉด ()๋ก ๊ฐ์ ๊ฐ์ผ๋ฉด -1 ๋ฆฌํด ๋ค๋ฅด๋ค 1 ๋ฆฌํด
// ์ผํญ์ฐ์ฐ์๋ฅผ ์ฌ์ฉ
}
}
๋๊ธ