1. 맵리듀스란?
하둡의 맵리듀스는 데이터를 병렬로 처리하는 소프트웨어 프레임워크로 하둡의 핵심요소이다. 이전 포스팅에서 언급했듯이 하둡에서는 데이터들이 분산되어 디스크에 저장되어있다. 따라서 이 분산된 데이터에 연산을 할 때 어떻게 할 것인지, 그 방법이 필요한데 맵리듀스는 그에 대한 해답을 제공해준다.
맵리듀스는 분산된 데이터를 처리하는 방법은 쉽게 말해 divide and conquer 방식이다. 분산된 데이터 각각에 연산을 한 다음(맵) 그 결과를 하나로 합치는 것(리듀스)이다. 아래에 맵리듀스 애플리케이션이 어떻게 동작하는지 그림으로 설명되어있다. 여기서 프로그래머의 역할은 맵 합수와 리듀스 함수를 구현하는 것이다. 중간에 분산된 맵의 결과를 소팅하고 합쳐서 리듀스로 보내는 것은 맵리듀스 프레임워크가 알아서 처리한다.
2. 맵리듀스 동작방식
1) 맵리듀스 흐름
맵리듀스의 흐름은 다음과 같다.
(input) <k1, v1> → 맵 (map) → <k2, v2> → 셔플과 정렬 (shuffle and sorting) → 리듀스(reduce) → <k3, v3> (output)
맵리듀스 애플리케이션은 초기 인풋과 최종 아웃풋을 포함하여 모든 중간 데이터가 키-밸류 페어의 형식으로 되어있다. 초기 인풋은 프로그래머가 작성한 맵 함수를 거쳐 두 번째 키-밸류 페어에 매핑된다. 두 번째 키-밸류 페어는 키 값에 따라 소팅된 후 리듀스 태스크에 전달된다. 리듀스 단계에서는 두 번째 키-밸류 페어에 프로그래머가 작성한 리듀스 함수를 수행한 후 결과를 최종 키-밸류 아웃풋으로 매핑한다.
추가적으로 맵 단계 후에 컴바이너가 추가될 수 있다. 셔플과 소팅에는 많은 시간과 자원이 소비되므로 이를 줄이기 위해 맵의 결과에 추가적인 연산을 해주는 것이다. 참고로 맵의 결과는 RAM이 아닌 디스크에 저장된다. (맵리듀스가 매우 느린 이유)
2)키와 밸류
키와 밸류는 이동을 위해 직렬화(serialization)되어야 한다. 하둡에서는 직렬화 데이터 타입(writable)을 자료형에 맞게 모두 제공하므로 별로 걱정할 필요는 없다. 하지만 커스텀 데이터 타입을 작성할 때가 있는데 이때 중간에 키 값의 소팅을 위해 꼭 comparator를 구현해주어야한다.
3) 잡(job)
클라이언트는 YARN의 리소스 매니저에 잡(실행파일과 컨피겨레이션 파일)을 제출한다. 잡 컨피겨레이션은 인풋과 아웃풋의 위치 및 포맷 + 맵/리듀스 함수 + 잡 파라미터로 구성되어있다. 리소스 매니저는 클라이언트로부터 잡을 받으면 해당 잡을 워커들에게 뿌린 후 스케줄링하고 모니터링한다.
※잡과 애플리케이션
애플리케이션은 YARN의 관점에서 클러스터에서 돌고 있는 프로그램을 의미한다. 그리고 해당 프로그램은 맵리듀스 입장에서는 잡이다. 따라서 맵리듀스 잡을 YARN에서 실행하면 잡=애플리케이션이다.
4) 매퍼(Mapper) / 맵 태스크(Map task)
정확히 말하면 매퍼는 클래스고 매퍼 클래서가 생성되어 돌고 있는 것을 맵 태스크라고 한다. 맵 단계의 인풋은 인풋스플릿(InputSplit)이라는 단위로 나뉘고 하나의 인풋스플릿 당 하나의 맵 태스크가 생성된다. 생성된 맵 태스크는 인풋스플릿 안의 키-밸류 페어에 대해 맵 함수를 호출하여 아웃풋 키-밸류 페어를 만든다. 해당 아웃풋은 키 값에 대해 소팅되고 파티셔너(partitioner)에 의해 파티션(partition)이라는 단위로 나뉜다.
5) 컴바이너(Combiner)
컴바이너는 따로 구현할 수도 있고 리듀서가 컴바이너가 될 수도 있다. 따로 구현된 컴바이너는 맵 태스크의 아웃풋에 대해 로컬 어그리게이션을 수행하여 매퍼에서 리듀스로 가는 데이터를 줄여준다.
6) 리듀서 (Reducer) / 리듀스 태스크 (Reduce task)
리듀서와 리듀스 태스크의 관계는 앞서 말한 매퍼와 맵 태스크의 관계와 동일하다. 앞서 말했듯이 맵 태스크의 아웃풋은 소팅 된 후 파티션이라는 단위로 나뉘어지는데 이 파티션 하나에 대해 리듀스 태스크 하나가 생성된다. 리듀스 태스크는 파티션의 각 키-밸류 페어에 대해 리듀스 함수를 호출하여 최종 키-밸류 아웃풋을 만들어낸다. 최종 아웃풋은 사용자가 지정한 파일시스템의 경로에 쓰인다. 참고로 리듀스 작업이 필요 없는 경우도 있는데 이때는 맵 태스크의 아웃풋이 바로 디스크에 쓰인다.
3. 맵리듀스 간단 예시 (WordCount)
아래는 텍스트에 있는 각 단어의 등장 횟수를 세는 맵리듀스 애플리케이션 구현이다.
입력 키-밸류 쌍은 <null, 텍스트 일부분>이고 최종 아웃풋 키-밸류 쌍은 <단어, 등장 횟수>이다.
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
1) 매퍼(TokenizerMapper)
매퍼 구현으로 맵리듀스의 Mapper를 상속받는다. "public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>"에서 "Mapper<key1 타입, value1 타입, key2 타입, value2 타입>"을 적어주면 된다. 예제의 맵 함수에서는 인풋(텍스트)을 토크나이징 한 후 각 단어를 키 값으로 매핑하고 밸류는 1로 고정한다.
ex) <Null, I love dogs> → map → <I, 1>, <love, 1>, <dogs, 1>
2) 리듀서(IntSumReducer)
리듀서 구현으로 맵리듀스의 Reducer를 상속받는다. "public static class IntSumReducer extends Reducer<Text, IntWritable,Text, IntWritable>" 에서 "Reducer<key2 타입, value2 타입, key3 타입, value3 타입>"을 적어주면 된다. 예제의 리듀스 함수에서는 같은 키 값을 가진 쌍들에 대해 밸류(1)를 모두 합한다. 이때 키는 각 단어로 계속 유지되고 밸류는 각 단어의 등장 횟수로 바뀐다.
ex) <I, 1>, <love, 1>, <dogs, 1>, <dogs, 1>, <love, 1>, <me, 1>→ <I, 1>, <love, 2>, <dogs, 2>, <me, 1>
3) 메인함수
메인함수의 각 줄은 주석으로 설명했다. 참고로 Job 큻래스는 하둡에게 잡에 대한 설명을 해주는 인터페이스이다. 매퍼, 컴바이너, 파티셔너, 리듀서, 인풋/아웃풋 포맷 구현 등을 설정할 수 있다.
public static void main(String[] args) throws Exception {
//컨피겨레이션 생성
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
//실행 파일 설정
job.setJarByClass(WordCount.class);
//매퍼, 컴바이너, 리듀서 클래스 설정
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
//아웃풋 타입 설정
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//인풋파일과 아웃풋 파일 경로 설정
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
'Data Engineering > 하둡 2.0' 카테고리의 다른 글
하둡 HDFS (Hadoop Distributed File System) (0) | 2024.02.09 |
---|---|
하둡 YARN이란? (0) | 2024.02.09 |
빅데이터란? 하둡(Hadoop)이란? 하둡 설치 (0) | 2024.02.09 |
댓글