๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
IT/HADOOP

[25์ผ์ฐจ] DelayCountWithMultipleOutputs.java

by GWLEE 2022. 7. 25.

๐Ÿค 2022-07-25 ๐Ÿค

 

 

DelayCountMapperWithMultipleOutputs.java

 

package com.gyuone.mapper;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.gyuone.common.AirlinePerformanceParser;
import com.gyuone.common.DelayCounters;

public class DelayCountMapperWithMultipleOutputs extends Mapper<LongWritable, Text, Text, IntWritable> {
	private final static IntWritable outputValue = new IntWritable(1);
	private Text outputKey = new Text();

	@Override // map override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		AirlinePerformanceParser parser = new AirlinePerformanceParser(value);

		if (parser.isDepartureDelayAvailable()) {
			if (parser.getDepartureDelayTime() > 0) {
				outputKey.set("D, " + parser.getYear() + "," + parser.getMonth());
				context.write(outputKey, outputValue);
			} else if (parser.getDepartureDelayTime() == 0) {
				context.getCounter(DelayCounters.scheduled_departure).increment(1);
			} else if (parser.getDepartureDelayTime() < 0) {
				context.getCounter(DelayCounters.early_departure).increment(1);
			}
		} else {
			context.getCounter(DelayCounters.not_available_departure).increment(1);
		}

		if (parser.isArriveDelayAvailable()) {
			if (parser.getArriveDelayTime() > 0) {
				outputKey.set("A, " + parser.getYear() + "," + parser.getMonth());
				context.write(outputKey, outputValue);
			} else if (parser.getArriveDelayTime() == 0) {
				context.getCounter(DelayCounters.scheduled_arrival).increment(1);
			} else if (parser.getArriveDelayTime() < 0) {
				context.getCounter(DelayCounters.early_arrival).increment(1);
			}
		} else {
			context.getCounter(DelayCounters.not_available_arrival).increment(1);
		}

	}
}

 


DelayCountReducerWithMultipleOutputs.java 

 

package com.gyuone.reducer;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class DelayCountReducerWithMultipleOutputs extends Reducer<Text, IntWritable, Text, IntWritable> {
	private Text outputKey = new Text();
	private IntWritable result = new IntWritable();
	private MultipleOutputs<Text, IntWritable> mos;
	
	@Override
	protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		mos = new MultipleOutputs<Text, IntWritable>(context);
	}

	@Override
	protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		mos.close();
	}

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		String[] columns = key.toString().split(",");
		outputKey.set(columns[1] + "," + columns[2]); // month
		int sum = 0;
		for(IntWritable value: values) {
			sum += value.get();
		}
		result.set(sum);
		if(columns[0].equals("D")) {
			mos.write("departure", outputKey, result); // ์ถœ๋ ฅ์œผ๋กœ ๋“ค์–ด๊ฐˆ ๋””๋ ‰ํ† ๋ฆฌ๋ช… ์ง€์ • departure 
		}else {
			mos.write("arrival", outputKey, result);
		}
	}
}

driver 

DelayCountWithMultipleOutputs.java

 

package com.gyuone.driver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.gyuone.mapper.DelayCountMapperWithMultipleOutputs;
import com.gyuone.reducer.DelayCountReducerWithMultipleOutputs;

public class DelayCountWithMultipleOutputs extends Configured implements Tool {
	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new DelayCountWithMultipleOutputs(), args);
		System.out.println("MapReduce Result" + res);

	}
	
	@Override
	public int run(String[] args) throws Exception {
		// TODO Auto-generated method stub
		String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
		if (otherArgs.length != 2) {
			System.out.println("Usage: DelayCountWithMultipleOutputs <in> <out>");
			System.exit(2);
		}
		
		Job job = Job.getInstance(getConf(), "DelayCountWithMultipleOutputs");
		
		job.setJarByClass(DelayCountWithMultipleOutputs.class);
		job.setMapperClass(DelayCountMapperWithMultipleOutputs.class);
		job.setReducerClass(DelayCountReducerWithMultipleOutputs.class);
		
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		
		MultipleOutputs.addNamedOutput(job, "departure", TextOutputFormat.class,
				Text.class, IntWritable.class);
		MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class,
				Text.class, IntWritable.class);
		
		job.waitForCompletion(true);
		
		return 0;
	}

}

 

 


scp .\AirPerformance-0.5 jar ubuntu22:~/work/jar

mapred historyserver start& -> exit 

 

 

yarn jar ./AirPerformance-0.5.jar com.gyuone.driver.DelayCountWithMultipleOutputs air-input delay-count-mos

 

 

hdfs dfs -ls 

hdfs dfs -ls delay-count-mos

part-r-00000๊ฐ€ 0์ด๋ฏ€๋กœ written์—์„œ 0์ด ์ฐํžŒ๋‹ค.

 

 


โ— ๊ธฐ์กด ํ•ญ๊ณต ์šดํ•ญ ๋ฐ์ดํ„ฐ ๋ถ„์„ ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด ์›” ์ˆœ์„œ๋Œ€๋กœ ์ถœ๋ ฅ๋˜์ง€ ์•Š์•˜์Œ
โ— ์ถœ๋ ฅ ๋ฐ์ดํ„ฐ์˜ ํ‚ค๊ฐ’ ์ž์ฒด๊ฐ€ ์—ฐ๋„์™€ ์›”์ด ํ•ฉ์ณ์ง„ ํ•˜๋‚˜์˜ ๋ฌธ์ž์—ด๋กœ ์ธ์‹
โ— ๋ณด์กฐ์ •๋ ฌ(Secondary Sort)์€ ํ‚ค์˜ ๊ฐ’๋“ค์„ ๊ทธ๋ฃนํ•‘ํ•˜๊ณ  ๊ทธ๋ฃนํ•‘๋œ ๋ ˆ์ฝ”๋“œ์— ์ˆœ์„œ๋ฅผ ๋ถ€์—ฌํ•˜๋Š” ๋ฐฉ์‹
โ— ๋ณด์กฐ์ •๋ ฌ ๊ตฌํ˜„์ˆœ์„œ
	a. ๊ธฐ์กด ํ‚ค์˜ ๊ฐ’๋“ค์„ ์กฐํ•ฉํ•œ ๋ณตํ•ฉํ‚ค(Composite Key)๋ฅผ ์ •์˜, ์ด ๋•Œ ํ‚ค์˜ ๊ฐ’ ์ค‘์—์„œ ์–ด๋–ค ํ‚ค๋ฅผ ๊ทธ๋ฃนํ•‘ ํ‚ค๋กœ ์‚ฌ์šฉํ•  ์ง€ ๊ฒฐ์ •
	b. ๋ณตํ•ฉํ‚ค์˜ ๋ ˆ์ฝ”๋“œ๋ฅผ ์ •๋ ฌํ•˜๊ธฐ ์œ„ํ•œ ๋น„๊ต๊ธฐ(Comparator)๋ฅผ ์ •์˜
	c. ๊ทธ๋ฃนํ•‘ ํ‚ค๋ฅผ ํŒŒํ‹ฐ์…”๋‹ํ•  ํŒŒํ‹ฐ์…”๋„ˆ(Partitioner)๋ฅผ ์ •์˜
๊ทธ๋ฃนํ•‘ ํ‚ค๋ฅผ ์ •๋ ฌํ•˜๊ธฐ ์œ„ํ•œ ๋น„๊ต๊ธฐ(Comparator)๋ฅผ ์ •์˜

 

vi ํ•ด์„œ dd :wq -> atom ์œผ๋กœ ๋”ฐ๋กœ ์ €์žฅํ•ด๋‘๊ธฐ 

 

scp ./* ubuntu22:~/work/air-data/

 

 

"YEAR","MONTH","DAY_OF_MONTH","DAY_OF_WEEK","FL_DATE","UNIQUE_CARRIER","TAIL_NUM","FL_NUM","ORIGIN_AIRPORT_ID","ORIGIN","ORIGIN_STATE_ABR","DEST_AIRPORT_ID","DEST","DEST_STATE_ABR","CRS_DEP_TIME","DEP_TIME","DEP_DELAY","DEP_DELAY_NEW","DEP_DEL15","DEP_DELAY_GROUP","TAXI_OUT","WHEELS_OFF","WHEELS_ON","TAXI_IN","CRS_ARR_TIME","ARR_TIME","ARR_DELAY","ARR_DELAY_NEW","ARR_DEL15","ARR_DELAY_GROUP","CANCELLED","CANCELLATION_CODE","DIVERTED","CRS_ELAPSED_TIME","ACTUAL_ELAPSED_TIME","AIR_TIME","FLIGHTS","DISTANCE","DISTANCE_GROUP","CARRIER_DELAY","WEATHER_DELAY","NAS_DELAY","SECURITY_DELAY","LATE_AIRCRAFT_DELAY",โ€‹

 

 

๊ธฐ์กด์— ์žˆ๋Š” input csv๋‚ ๋ฆฌ๊ธฐ hdfs dfs -rm air-input/*

put์€ ์ƒˆ๋กœ์šด csv์ €์žฅํ•˜๊ธฐ hdfs dfs -put ./* air-input/

 

 

๋‹ค๋ฅธ input์œผ๋กœ ์ง‘์–ด๋„ฃ๊ธฐ sortํ•˜๋Š” csvํŒŒ์ผ๋กœ ๋ฐ”๊พธ๊ธฐ/ ls๋กœ ์ž˜ ๋“ค์–ด๊ฐ”๋Š”์ง€ ํ™•์ธ 

 


DataKey.java

 

package com.gyuone.common;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;

public class DateKey implements WritableComparable<DateKey> {
	private String year;
	private Integer month;

	public DateKey() {
	}

	public DateKey(String year, Integer month) {
		super();
		this.year = year;
		this.month = month;
	}

	public String getYear() {
		return year;
	}

	public void setYear(String year) {
		this.year = year;
	}

	public Integer getMonth() {
		return month;
	}

	public void setMonth(Integer month) {
		this.month = month;
	}

	@Override
	public int compareTo(DateKey o) { // ๋‘๊ฐœ ๊ฐ’์„ ๋น„๊ตํ•  ๋•Œ ์™ผ์ชฝ์ด ํฌ๋ฉด +1 ๊ฐ™์œผ๋ฉด 0 ๋’ค์—๊บผ๊ฐ€ ํฌ๋ฉด -1 ์–˜๋„ ๋งˆ์ฐฌ๊ฐ€์ง€
		// TODO Auto-generated method stub
		int result = year.compareTo(o.year);
		if (result == 0) { // year๊ฐ€ ๊ฐ™์œผ๋ฉด if๋ฌธ ์•ˆ๋“ค์–ด๊ฐ
			result = month.compareTo(o.month); // ์ถ”๊ฐ€๋กœ compareTo
		}
		return result;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		WritableUtils.writeString(out, year);
		out.writeInt(month); // intํƒ€์ž…์œผ๋กœ writeํ•ด๋ผ
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		year = WritableUtils.readString(in);
		month = in.readInt(); // ํ•˜๋‘ก document์— ์ •์˜๋˜์–ด ์žˆ๋‹ค.
	}

	@Override
	public String toString() {
		// TODO Auto-generated method stub
		return new StringBuilder().append(year).append(",").append(month).toString(); // key๋ฅผ ์ƒˆ๋กœ ์ƒ์„ฑํ•ด์ค€๋‹ค.
	}
}

์ ์‹ฌ....


โ— ๋ฆฌ๋“€์„œ์—๋Š” ๊ทธ๋ฃนํ•‘ ํŒŒํ‹ฐ์…”๋„ˆ์™€ ๊ทธ๋ฃนํ•‘ comparator์— ์˜ํ•ด ๊ฐ™์€ ์—ฐ๋„๋กœ ๊ทธ๋ฃนํ•‘๋œ ๋ฐ์ดํ„ฐ๊ฐ€ ์ „๋‹ฌ๋œ ์ƒํƒœ
โ— ๋ณตํ•ฉํ‚ค comparator๋กœ ์ธํ•ด ๊ทธ๋ฃนํ•‘๋œ ๊ฐ’์€ ์›”์˜ ์ˆœ์„œ๋Œ€๋กœ ์˜ค๋ฆ„์ฐจ์ˆœ์œผ๋กœ ์ •๋ ฌ๋˜์–ด ์žˆ์Œ.. ์ •๋ ฌ๋œ ์ƒํƒœ๋กœ ๋“ค์–ด์˜จ๋‹ค.
โ— ํ•˜์ง€๋งŒ ๋ฆฌ๋“€์„œ ๋ฉ”์„œ๋“œ์—์„œ ์ง€์—ฐ ํšŸ์ˆ˜๋ฅผ ํ•ฉ์‚ฐํ•  ๊ฒฝ์šฐ ๋ฐ์ดํ„ฐ์— ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒ
โ— ์˜ˆ๋ฅผ ๋“ค์–ด, 2008๋…„๋„ ํ•ญ๊ณต ์ถœ๋ฐœ ์ง€์—ฐ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ๊ฒฝ์šฐ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ฒฐ๊ณผ๊ฐ€ ๋‚˜ํƒ€๋‚จ
		EX) 2008 12 2647363
โ— 2008๋…„๋„ 12์›”๋งŒ ์ถœ๋ ฅ๋˜๊ณ  ์ง€์—ฐ ํšŸ์ˆ˜๋„ 2008๋…„๋„์˜ ๋ชจ๋“   ์ง€์—ฐ ํšŸ์ˆ˜๊ฐ€ ํ•ฉ์‚ฐ๋˜์–ด ์ถœ๋ ฅ๋จ
โ— ์ด๋Ÿฌํ•œ ํ˜„์ƒ์ด ๋‚˜ํƒ€๋‚˜๋Š” ์ด์œ ๋Š” ๋ฆฌ๋“€์„œ๋Š” 2008๋…„ ์ด๋ผ๋Š” ๊ทธ๋ฃนํ‚ค๋ฅผ ๊ธฐ์ค€์œผ๋กœ ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ•˜๊ธฐ ๋•Œ๋ฌธ
โ— ์›”๋ณ„๋กœ ์ง€์—ฐ ํšŸ์ˆ˜๋ฅผ ๊ณ„์‚ฐํ•˜๋ ค๋ฉด ๋ณตํ•ฉํ‚ค๋ฅผ ๊ตฌ๋ถ„ํ•ด์„œ ์ฒ˜๋ฆฌํ•˜๋Š” ์ฝ”๋“œ๋ฅผ ๊ตฌํ˜„ํ•ด์•ผ ํ•จ
โ— ์ž…๋ ฅ ๋ฐ์ดํ„ฐ์˜ ๊ฐ’์— ํ•ด๋‹นํ•˜๋Š” Iterable ๊ฐ์ฒด๋ฅผ ์ˆœํšŒํ•  ๋•Œ ์›”์— ํ•ด๋‹นํ•˜๋Š” ๊ฐ’์„ bMonth ๋ผ๋Š” ๋ณ€์ˆ˜์— ๋ฐฑ์—…
โ— ์ˆœํšŒ๋ฅผ ํ•˜๋ฉด์„œ ๋ฐฑ์—…๋œ ์›”๊ณผ ํ˜„์žฌ ๋ฐ์ดํ„ฐ์˜ ์›”์ด ์ผ์น˜ํ•˜์ง€ ์•Š์„ ๋•Œ๋Š” 
  ๋ฆฌ๋“€์„œ์˜ ์ถœ๋ ฅ ๋ฐ์ดํ„ฐ์— ๋ฐฑ์—…๋œ ์›”์˜ ์ง€์—ฐ ํšŸ์ˆ˜๋ฅผ ์ถœ๋ ฅ
โ— ์ด ๋•Œ, ๋‹ค์Œ ์ˆœ์„œ์— ์žˆ๋Š” ์›”์˜ ์ง€์—ฐ ํšŸ์ˆ˜๋ฅผ ํ•ฉ์‚ฐํ•  ์ˆ˜ ์žˆ๊ฒŒ ์ง€์—ฐ ํšŸ์ˆ˜ ํ•ฉ๊ณ„ ๋ณ€์ˆ˜๋ฅผ 0์œผ๋กœ ์ดˆ๊ธฐํ™”

 


DateKeyComparator.java

package com.gyuone.common;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class DateKeyComparator extends WritableComparator {
	protected DateKeyComparator() {
		super(DateKey.class, true);
	}

	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		// TODO Auto-generated method stub
		DateKey k1 = (DateKey)a;
		DateKey k2 = (DateKey)b;
		
		int cmp = k1.getYear().compareTo(k2.getYear());
		if(cmp !=0 ) {
			return cmp;
		}
		return k1.getMonth() == k2.getMonth() ? 0:(k1.getMonth() < k2.getMonth() ? -1: 1);
		// k1 ๊ฐ™์œผ๋ฉด 0 ๋ฆฌํ„ด ๋‹ค๋ฅด๋ฉด ()๋กœ ๊ฐ€์„œ ๊ฐ™์œผ๋ฉด -1 ๋ฆฌํ„ด ๋‹ค๋ฅด๋‹ค 1 ๋ฆฌํ„ด 
		// ์‚ผํ•ญ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉ
	}
	
	
}


 

๋Œ“๊ธ€