본문 바로가기
IT/HADOOP

[24일차] driver 패키지에 ArrivalDelayCount/ DelayCount/ DelayCounterWithCounter/ DelayCountWithCounterTwo 작성하기

by GWLEE 2022. 7. 21.

어제 한건 출발지연시간

오늘은 도착지연시간을 출력해보자.

 

 

ArrivalDelayCountMapper.java

 


 

ArrivalDelayCountMapper;

package com.gyuone.driver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.gyuone.mapper.ArrivalDelayCountMapper;
import com.gyuone.reducer.DelayCountReducer;


public class ArrivalDelayCount {
	public static void main(String[] args) throws Exception{
		Configuration conf = new Configuration();
		if(args.length != 2) {
			System.err.println("Usage: ArrivalDelayCount <input> <output>");
			System.exit(1); // 에러메시지 주고 프로그램 종료
		}
		Job job = Job.getInstance(conf, "ArrivalDelayCount"); // Job은 싱글톤 객체
		
		job.setJarByClass(ArrivalDelayCount.class);
		job.setMapperClass(ArrivalDelayCountMapper.class);
		job.setReducerClass(DelayCountReducer.class);
		
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
		
		
	}
}

 


AirPerfomance 0.2 로 바꾸기

project Maven clean , install 해주기 


 

cd 로 targer 들어가서 

scp E:\source\java\eclipse-workspace\AirPerformance\target

 

yarn ./AirPerformance-0.2 jar com.gyuone.driver.ArrivalDelayCount air-input arr-delay-count

 

hdfs dfs -ls arr-delay-count하면 success 뜨는거 확인

hdfs dfs -cat arr-delay-count/part-r-00000 

 


 


DelayCount.java 만들어서 출발 / 도착 한꺼번에 

 

DelayCount.java

package com.gyuone.driver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.gyuone.mapper.DelayCountMapper; 
import com.gyuone.reducer.DelayCountReducer;

public class DelayCount extends Configured implements Tool {
	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new DelayCount(), args);
		System.out.println("MapReduce-Job Result : " + res);
	}

	@Override
	public int run(String[] args) throws Exception {
		// TODO Auto-generated method stub
		String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();

		if(otherArgs.length != 2) {
			System.err.println("Usage: DelayCount <input> <output>");
			System.exit(1); 
		}
		
		Job job = Job.getInstance(getConf(), "DelayCount"); // Job은 싱글톤 객체
		
		job.setMapperClass(DelayCountMapper.class);
		job.setReducerClass(DelayCountReducer.class);
		
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		
		job.waitForCompletion(true);
		
		return 0;
		
	}
	
	
}

DelayCountMapper.java

package com.gyuone.mapper;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.gyuone.common.AirlinePerformanceParser;

public class DelayCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	private String workType;
	private final static IntWritable outputvalue = new IntWritable(1);
	private Text outputKey = new Text();

	@Override // 매퍼 객체가 생성이 될때 한 번만 초기화하는 .. 메서드
	protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		workType = context.getConfiguration().get("workType"); // 객체를 가져온다. workType 이라는 파라메터를 던져줘서 workType을 가져온다.
	}

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
		outputKey.set(parser.getYear() + "," + parser.getMonth());

		if (workType.equals("departure")) {
			if (parser.getDepartureDelayTime() > 0) {
				context.write(outputKey, outputvalue);
			}
		} else if (workType.equals("arrive")) {
			if (parser.getArriveDelayTime() > 0) {
				context.write(outputKey, outputvalue);
			}

		}
	}

}

 

 


AirPerformance-0.3.jar  workType=departure departure-delay-count 실행 !

 

ls -lrt 

 

yarn jar ./AirPerformance-0.3.jar com.gyuone.driver.DelayCount -D workType=departure air-input departure-delay-count 

 

 

 

hdfs dfs -ls departure-delay-count 

 

hdfs dfs -cat departure-delay-count/part-r-00000


 

AirPerformance-0.3.jar  workType=arrive arrival-delay-count2 실행 

 

ls -l

yarn jar ./AirPerformance-0.3 jar com.gyuone.driver.DelayCount -D workType=arrive air-input arrival-delay-count2

 

 

hdfs dfs -ls

hdfs dfs -cat arrival-delay-count2/part-r-00000

 


DelayCounters.java

 

 


DelayCountMapperWithCounter.java

 

package com.gyuone.mapper;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.gyuone.common.AirlinePerformanceParser;
import com.gyuone.common.DelayCounters;

public class DelayCountMapperWithCounter extends Mapper<LongWritable, Text, Text, IntWritable> {
	private String workType;
	private final static IntWritable outputvalue = new IntWritable(1);
	private Text outputKey = new Text();

	@Override // setup 매퍼 객체가 생성이 될때 한 번만 초기화하는 .. 메서드
	protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		workType = context.getConfiguration().get("workType"); // 객체를 가져온다. workType 이라는 파라메터를 던져줘서 workType을 가져온다.
	}

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		AirlinePerformanceParser parser = new AirlinePerformanceParser(value);

		if (workType.equals("departure")) {
			if (parser.isDistanceAvailable()) { // departure 값이 있는 경우
				if (parser.getDepartureDelayTime() > 0) {
					outputKey.set(parser.getYear() + "," + parser.getMonth());
					context.write(outputKey, outputvalue);
				} else if (parser.getDepartureDelayTime() == 0) {
					context.getCounter(DelayCounters.scheduled_departure).increment(1); // 정시 출발
				} else if (parser.getDepartureDelayTime() < 0) {
					context.getCounter(DelayCounters.early_departure).increment(1);
				}
			} else {
				context.getCounter(DelayCounters.not_available_departure).increment(1);
			}

		} else if (workType.equals("arrival")) {
			if (parser.isDistanceAvailable()) {
				if (parser.getArriveDelayTime() > 0) {
					outputKey.set(parser.getYear() + "," + parser.getMonth());
					context.write(outputKey, outputvalue);
				} else if (parser.getArriveDelayTime() == 0) {
					context.getCounter(DelayCounters.scheduled_arrival).increment(1);
				} else if (parser.getArriveDelayTime() < 0) {
					context.getCounter(DelayCounters.early_arrival).increment(1);
				}
			} else {
				context.getCounter(DelayCounters.not_available_arrival).increment(1);
			}
		}
	}

}

DelayCounterWithCounter.java

package com.gyuone.driver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.gyuone.mapper.DelayCountMapperWithCounter;
import com.gyuone.reducer.DelayCountReducer;

public class DelayCounterWithCounter extends Configured implements Tool {
	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new DelayCounterWithCounter(), args);
		System.out.println("MapReduce-Job Result : " + res);
	}

	@Override 
	public int run(String[] args) throws Exception {
		// TODO Auto-generated method stub
		String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();

		if (otherArgs.length != 2) {
			System.err.println("Usage: DelayCounterWithCounter <input> <output>");
			System.exit(2);
		}

		Job job = Job.getInstance(getConf(), "DelayCounterWithCounter"); // Job은 싱글톤 객체

		job.setJarByClass(DelayCounterWithCounter.class);
		job.setMapperClass(DelayCountMapperWithCounter.class);
		job.setReducerClass(DelayCountReducer.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

		job.waitForCompletion(true);

		return 0;

	}
}

jar 파일 실행시키기

 

yarn jar ./AirPerformance-0.4 jar com.gyuone.driver.DelayCounterWithCounter -D workType=departure air-input arr-arrival-count-counter4

 

successfully 확인 

 

hdfs dfs -ls 

hdfs dfs -cat arr-arrival-count-counter4/part-r-00000


DelayCountWithCounterTwo.java

기존 DelayCounterWithCounter에 mapper 변경해서 작성해주기 !

 

package com.gyuone.driver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.gyuone.mapper.DelayCountMapperWithCounterTwo;
import com.gyuone.reducer.DelayCountReducer;

public class DelayCountWithCounterTwo extends Configured implements Tool {
	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new DelayCountWithCounterTwo(), args);
		System.out.println("MapReduce-Job Result : " + res);
	}

	@Override 
	public int run(String[] args) throws Exception {
		// TODO Auto-generated method stub
		String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();

		if (otherArgs.length != 2) {
			System.err.println("Usage: DelayCountWithCounterTwo <input> <output>");
			System.exit(2);
		}

		Job job = Job.getInstance(getConf(), "DelayCountWithCounterTwo"); // Job은 싱글톤 객체

		job.setJarByClass(DelayCountWithCounterTwo.class);
		job.setMapperClass(DelayCountMapperWithCounterTwo.class);
		job.setReducerClass(DelayCountReducer.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

		job.waitForCompletion(true);

		return 0;

	}
}

DelayCountMapperWithCounterTwo.java

slack에서 강사님께서 주신 코드 ! 🌟 분석해보기.. 

 

package com.gyuone.mapper;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.gyuone.common.AirlinePerformanceParser;
import com.gyuone.common.DelayCounters;

public class DelayCountMapperWithCounterTwo extends Mapper<LongWritable, Text, Text, IntWritable>{
	private String params;
	private String arrDep;
	private String origin;
	private final static IntWritable outputValue = new IntWritable(1);
	private Text outputKey = new Text();
	
	@Override
	protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
		throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		params = context.getConfiguration().get("workType");
		String[] str = params.split("-");
		arrDep = str[0];
		origin = str[1];
	}

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
		throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
		
		if(arrDep.equals("D")) {
			if(parser.isArriveDelayAvailable()) {
				if(parser.getDepartureDelayTime() > 0) {
					if(origin.equals("999")) {
						outputKey.set(parser.getYear() + "," + parser.getMonth());
						context.write(outputKey, outputValue);
					}else {
						if(origin.equals(parser.getOrigin())) {
							outputKey.set(parser.getYear() + "," + parser.getMonth());
							context.write(outputKey, outputValue);
						}
					}
				}else if(parser.getDepartureDelayTime() == 0) {
					if(origin.equals("999")) {
						context.getCounter(DelayCounters.scheduled_departure).increment(1);
					}else {
						if(origin.equals(parser.getOrigin())) {
							context.getCounter(DelayCounters.scheduled_departure).increment(1);
						}
					}
				}else if(parser.getDepartureDelayTime() < 0) {
					if(origin.equals("999")) {
						context.getCounter(DelayCounters.early_departure).increment(1);
					}else {
						if(origin.equals(parser.getOrigin())) {
							context.getCounter(DelayCounters.early_departure).increment(1);
						}
					}
				}
			}else {
				if(origin.equals("999")) {
					context.getCounter(DelayCounters.not_available_departure).increment(1);
				}else {
					if(origin.equals(parser.getOrigin())) {
						context.getCounter(DelayCounters.not_available_departure).increment(1);
					}
				}
			}
			
		}else if(arrDep.equals("A")) {
			if(parser.isArriveDelayAvailable()) {
				if(parser.getArriveDelayTime() > 0) {
					if(origin.equals("999")) {
						outputKey.set(parser.getYear() + "," + parser.getMonth());
						context.write(outputKey, outputValue);
					}else {
						if(origin.equals(parser.getDestination())) {
							outputKey.set(parser.getYear() + "," + parser.getMonth());
							context.write(outputKey, outputValue);
						}
					}
				}else if(parser.getArriveDelayTime() == 0) {
					if(origin.equals("999")) {
						context.getCounter(DelayCounters.scheduled_arrival).increment(1);
					}else {
						if(origin.equals(parser.getDestination())) {
							context.getCounter(DelayCounters.scheduled_arrival).increment(1);
						}
					}
				}else if(parser.getArriveDelayTime() < 0) {
					if(origin.equals("999")) {
						context.getCounter(DelayCounters.early_arrival).increment(1);
					}else {
						if(origin.equals(parser.getDestination())) {
							context.getCounter(DelayCounters.early_arrival).increment(1);
						}
					}
				}
			}else {
				if(origin.equals("999")) {
					context.getCounter(DelayCounters.not_available_arrival).increment(1);
				}else {
					if(origin.equals(parser.getDestination())) {
						context.getCounter(DelayCounters.not_available_arrival).increment(1);
					}
				}
			}
		}
	}
}

 


 

터미널

 

 

 

 

 

댓글