어제 한건 출발지연시간
오늘은 도착지연시간을 출력해보자.
ArrivalDelayCountMapper.java
ArrivalDelayCountMapper;
package com.gyuone.driver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.gyuone.mapper.ArrivalDelayCountMapper;
import com.gyuone.reducer.DelayCountReducer;
public class ArrivalDelayCount {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
if(args.length != 2) {
System.err.println("Usage: ArrivalDelayCount <input> <output>");
System.exit(1); // 에러메시지 주고 프로그램 종료
}
Job job = Job.getInstance(conf, "ArrivalDelayCount"); // Job은 싱글톤 객체
job.setJarByClass(ArrivalDelayCount.class);
job.setMapperClass(ArrivalDelayCountMapper.class);
job.setReducerClass(DelayCountReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
AirPerfomance 0.2 로 바꾸기
project Maven clean , install 해주기
cd 로 targer 들어가서
scp E:\source\java\eclipse-workspace\AirPerformance\target
yarn ./AirPerformance-0.2 jar com.gyuone.driver.ArrivalDelayCount air-input arr-delay-count
hdfs dfs -ls arr-delay-count하면 success 뜨는거 확인
hdfs dfs -cat arr-delay-count/part-r-00000
DelayCount.java 만들어서 출발 / 도착 한꺼번에
DelayCount.java
package com.gyuone.driver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.gyuone.mapper.DelayCountMapper;
import com.gyuone.reducer.DelayCountReducer;
public class DelayCount extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new DelayCount(), args);
System.out.println("MapReduce-Job Result : " + res);
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage: DelayCount <input> <output>");
System.exit(1);
}
Job job = Job.getInstance(getConf(), "DelayCount"); // Job은 싱글톤 객체
job.setMapperClass(DelayCountMapper.class);
job.setReducerClass(DelayCountReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.waitForCompletion(true);
return 0;
}
}
DelayCountMapper.java
package com.gyuone.mapper;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.gyuone.common.AirlinePerformanceParser;
public class DelayCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private String workType;
private final static IntWritable outputvalue = new IntWritable(1);
private Text outputKey = new Text();
@Override // 매퍼 객체가 생성이 될때 한 번만 초기화하는 .. 메서드
protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
workType = context.getConfiguration().get("workType"); // 객체를 가져온다. workType 이라는 파라메터를 던져줘서 workType을 가져온다.
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
outputKey.set(parser.getYear() + "," + parser.getMonth());
if (workType.equals("departure")) {
if (parser.getDepartureDelayTime() > 0) {
context.write(outputKey, outputvalue);
}
} else if (workType.equals("arrive")) {
if (parser.getArriveDelayTime() > 0) {
context.write(outputKey, outputvalue);
}
}
}
}
AirPerformance-0.3.jar workType=departure departure-delay-count 실행 !
ls -lrt
yarn jar ./AirPerformance-0.3.jar com.gyuone.driver.DelayCount -D workType=departure air-input departure-delay-count
hdfs dfs -ls departure-delay-count
hdfs dfs -cat departure-delay-count/part-r-00000
AirPerformance-0.3.jar workType=arrive arrival-delay-count2 실행
ls -l
yarn jar ./AirPerformance-0.3 jar com.gyuone.driver.DelayCount -D workType=arrive air-input arrival-delay-count2
hdfs dfs -ls
hdfs dfs -cat arrival-delay-count2/part-r-00000
DelayCounters.java
DelayCountMapperWithCounter.java
package com.gyuone.mapper;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.gyuone.common.AirlinePerformanceParser;
import com.gyuone.common.DelayCounters;
public class DelayCountMapperWithCounter extends Mapper<LongWritable, Text, Text, IntWritable> {
private String workType;
private final static IntWritable outputvalue = new IntWritable(1);
private Text outputKey = new Text();
@Override // setup 매퍼 객체가 생성이 될때 한 번만 초기화하는 .. 메서드
protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
workType = context.getConfiguration().get("workType"); // 객체를 가져온다. workType 이라는 파라메터를 던져줘서 workType을 가져온다.
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
if (workType.equals("departure")) {
if (parser.isDistanceAvailable()) { // departure 값이 있는 경우
if (parser.getDepartureDelayTime() > 0) {
outputKey.set(parser.getYear() + "," + parser.getMonth());
context.write(outputKey, outputvalue);
} else if (parser.getDepartureDelayTime() == 0) {
context.getCounter(DelayCounters.scheduled_departure).increment(1); // 정시 출발
} else if (parser.getDepartureDelayTime() < 0) {
context.getCounter(DelayCounters.early_departure).increment(1);
}
} else {
context.getCounter(DelayCounters.not_available_departure).increment(1);
}
} else if (workType.equals("arrival")) {
if (parser.isDistanceAvailable()) {
if (parser.getArriveDelayTime() > 0) {
outputKey.set(parser.getYear() + "," + parser.getMonth());
context.write(outputKey, outputvalue);
} else if (parser.getArriveDelayTime() == 0) {
context.getCounter(DelayCounters.scheduled_arrival).increment(1);
} else if (parser.getArriveDelayTime() < 0) {
context.getCounter(DelayCounters.early_arrival).increment(1);
}
} else {
context.getCounter(DelayCounters.not_available_arrival).increment(1);
}
}
}
}
DelayCounterWithCounter.java
package com.gyuone.driver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.gyuone.mapper.DelayCountMapperWithCounter;
import com.gyuone.reducer.DelayCountReducer;
public class DelayCounterWithCounter extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new DelayCounterWithCounter(), args);
System.out.println("MapReduce-Job Result : " + res);
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: DelayCounterWithCounter <input> <output>");
System.exit(2);
}
Job job = Job.getInstance(getConf(), "DelayCounterWithCounter"); // Job은 싱글톤 객체
job.setJarByClass(DelayCounterWithCounter.class);
job.setMapperClass(DelayCountMapperWithCounter.class);
job.setReducerClass(DelayCountReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.waitForCompletion(true);
return 0;
}
}
jar 파일 실행시키기
yarn jar ./AirPerformance-0.4 jar com.gyuone.driver.DelayCounterWithCounter -D workType=departure air-input arr-arrival-count-counter4
successfully 확인
hdfs dfs -ls
hdfs dfs -cat arr-arrival-count-counter4/part-r-00000
DelayCountWithCounterTwo.java
기존 DelayCounterWithCounter에 mapper 변경해서 작성해주기 !
package com.gyuone.driver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.gyuone.mapper.DelayCountMapperWithCounterTwo;
import com.gyuone.reducer.DelayCountReducer;
public class DelayCountWithCounterTwo extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new DelayCountWithCounterTwo(), args);
System.out.println("MapReduce-Job Result : " + res);
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: DelayCountWithCounterTwo <input> <output>");
System.exit(2);
}
Job job = Job.getInstance(getConf(), "DelayCountWithCounterTwo"); // Job은 싱글톤 객체
job.setJarByClass(DelayCountWithCounterTwo.class);
job.setMapperClass(DelayCountMapperWithCounterTwo.class);
job.setReducerClass(DelayCountReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.waitForCompletion(true);
return 0;
}
}
DelayCountMapperWithCounterTwo.java
slack에서 강사님께서 주신 코드 ! 🌟 분석해보기..
package com.gyuone.mapper;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.gyuone.common.AirlinePerformanceParser;
import com.gyuone.common.DelayCounters;
public class DelayCountMapperWithCounterTwo extends Mapper<LongWritable, Text, Text, IntWritable>{
private String params;
private String arrDep;
private String origin;
private final static IntWritable outputValue = new IntWritable(1);
private Text outputKey = new Text();
@Override
protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
params = context.getConfiguration().get("workType");
String[] str = params.split("-");
arrDep = str[0];
origin = str[1];
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
if(arrDep.equals("D")) {
if(parser.isArriveDelayAvailable()) {
if(parser.getDepartureDelayTime() > 0) {
if(origin.equals("999")) {
outputKey.set(parser.getYear() + "," + parser.getMonth());
context.write(outputKey, outputValue);
}else {
if(origin.equals(parser.getOrigin())) {
outputKey.set(parser.getYear() + "," + parser.getMonth());
context.write(outputKey, outputValue);
}
}
}else if(parser.getDepartureDelayTime() == 0) {
if(origin.equals("999")) {
context.getCounter(DelayCounters.scheduled_departure).increment(1);
}else {
if(origin.equals(parser.getOrigin())) {
context.getCounter(DelayCounters.scheduled_departure).increment(1);
}
}
}else if(parser.getDepartureDelayTime() < 0) {
if(origin.equals("999")) {
context.getCounter(DelayCounters.early_departure).increment(1);
}else {
if(origin.equals(parser.getOrigin())) {
context.getCounter(DelayCounters.early_departure).increment(1);
}
}
}
}else {
if(origin.equals("999")) {
context.getCounter(DelayCounters.not_available_departure).increment(1);
}else {
if(origin.equals(parser.getOrigin())) {
context.getCounter(DelayCounters.not_available_departure).increment(1);
}
}
}
}else if(arrDep.equals("A")) {
if(parser.isArriveDelayAvailable()) {
if(parser.getArriveDelayTime() > 0) {
if(origin.equals("999")) {
outputKey.set(parser.getYear() + "," + parser.getMonth());
context.write(outputKey, outputValue);
}else {
if(origin.equals(parser.getDestination())) {
outputKey.set(parser.getYear() + "," + parser.getMonth());
context.write(outputKey, outputValue);
}
}
}else if(parser.getArriveDelayTime() == 0) {
if(origin.equals("999")) {
context.getCounter(DelayCounters.scheduled_arrival).increment(1);
}else {
if(origin.equals(parser.getDestination())) {
context.getCounter(DelayCounters.scheduled_arrival).increment(1);
}
}
}else if(parser.getArriveDelayTime() < 0) {
if(origin.equals("999")) {
context.getCounter(DelayCounters.early_arrival).increment(1);
}else {
if(origin.equals(parser.getDestination())) {
context.getCounter(DelayCounters.early_arrival).increment(1);
}
}
}
}else {
if(origin.equals("999")) {
context.getCounter(DelayCounters.not_available_arrival).increment(1);
}else {
if(origin.equals(parser.getDestination())) {
context.getCounter(DelayCounters.not_available_arrival).increment(1);
}
}
}
}
}
}
터미널
'IT > HADOOP' 카테고리의 다른 글
[26일차] hadoop eco system/hive 설치/HiveQL사용하기 (0) | 2022.07.26 |
---|---|
[25일차] DelayCountWithMultipleOutputs.java (0) | 2022.07.25 |
[23일차] Hadoop WordCount . DepartureDelayCount (0) | 2022.07.20 |
[22일차] Hadoop 하둡 개념정리, 설치 (0) | 2022.07.19 |
댓글