MapReduce详细分析

一、MapReduce概述

1、定义

MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群 上。

2、MR进程

一个完整的MapR educe程序在分布式运行时有三类实例进程:

  1. **Mr AppMaster:**负责整个程序的过程调度及状态协调。
  2. MapTask:负责Map阶段的整个数据处理流程。
  3. ReduceTask:负责Reduce阶段的整个数据处理流程。

3、常用数据序列化类型

Java****类型 Hadoop Writable****类型
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable

4、MR编程规范

  1. Mapper阶段
    (1)户自定义的Mapper要继承自己的父类
    (2) Mapper的输入数据是KV对的形式(KV的类型可自定义)
    (3) Mapper中的业务逻辑写在map()方法中
    (4) Mapper的输出数据是KV对的形式(KV的类型可自定义)
    (5) map0方法(MapTask进程)对每一个<K,V>调用一次

  2. Reducer阶 段
    (1)用户自定义的Reducer要继承自己的父类
    (2) Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
    (3) Reducer的业务逻辑写在reduce0方法中
    (4) ReduceTask进程**对一组相同k的k,v>**组调用一次reduce0方法

  3. Driver阶段
    相当于YARN集群的客户端,于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象

5、Mapper类、Reducer类中关键方法

Mapper类:

setup()方法: Called once at the beginning of the task . 在每个MapTask中只会在Task开始运行时被调用一次.

map()方法: Called once for each key/value pair in the input split. Most applications should override this, but the default is the identity function.

​ 一个切片中输入的每一个kv对会调用一次map方法。

cleanup()方法: Called once at the end of the task. 在每个MapTask中只会在Task结束前被调用一次

run()方法: 负责控制Mapper的执行过程.

Reducer类:

setup()方法: Called once at the start of the task. 在每个ReduceTask开始运行时会调用一次该方法.

reduce()方法: This method is called once for each key. Most applications will define their reduce class by overriding this method. The default implementation is an identity function.

​ 为同一个key(map端输出的数据可能有相同key的多个kv对,称之为一组kv)执行一次reduce方法。

cleanup()方法: Called once at the end of the task. 在每个ReduceTask结束时会调用一次该方法

run()方法: 负责控制Reducer的执行过程。

二、Hadoop的序列化

序列化就是把内存中的对象,转换成字节列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

Java的序列化是一个 重量级序列化框架(Serializable) ,一个对象被序列化后, 会附带很多额外的信息(各种校验信息,Header, 继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable) 。
Hadoop序列化特点:
(1)紧凑:高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)可扩展:随着通信协议的升级而可升级
(4)互操作:支持多语言的交互

自定义bean对象实现序列化接口(Writable)

(1)必须实现Writable接口

(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

(3)重写序列化方法

(4)重写反序列化方法

(5)注意反序列化的顺序和序列化的顺序完全一致

(6)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。

(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。

三、MR框架原理

在这里插入图片描述

1、InputFormat数据输入

InputFormat: 负责Map端数据的输入
重要的方法:
getSplits(): 生成切片信息。
createRecordReader(): 负责输入数据的读取处理
子抽象类: FileInputFormat
getSplits(): 做了具体的实现. Hadoop默认的切片规则。
具体实现类:
TextInputFormat : Hadoop默认使用的InputFormat
默认使用FileInputFormat中的getSplits方法来生成切片信息。
使用LineRecordReader来读取数据,就是一行一行读取.
CombineFileInputFormat:它可以将多个小文件从逻辑上规划到一个切片中,
NLineInputFormat
KeyValueTextInputFormat

1.1 切片与MapTask的并行度决定机制

**数据切片:**数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。

1)一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
2)每一个Split切片分配一个MapTask并行 实例处理
3)默认情况下,切片大小=BlockSize
4)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

1.2 FileInputFormat切片源码解析

public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  // 1 
	 // minSize ==> mapreduce.input.fileinputformat.split.minsize
    long maxSize = getMaxSplitSize(job);  // Long.MAX_VALUE
	 // maxSize ==> mapreduce.input.fileinputformat.split.maxsize

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);

    boolean ignoreDirs = !getInputDirRecursive(job)
      && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
    
    // 循环每个文件,为每个文件单独生成切片.
    for (FileStatus file: files) {
      if (ignoreDirs && file.isDirectory()) {
        continue;
      }
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(job, path)) {  //若可以切分
          long blockSize = file.getBlockSize();//本地最大为32M,集群上最大为128M
          
	     //计算切片的大小
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
		    // return Math.max(minSize, Math.min(maxSize, blockSize));

          long bytesRemaining = length;
	  // 当前文件剩余的大小 除以  切片大小 >1.1 ,继续切片,否则,剩余的大小生成一个切片。
	  // 避免数据倾斜问题  SPLIT_SLOP =1.1
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          if (LOG.isDebugEnabled()) {
            // Log only if the file is big enough to be splitted
            if (length > Math.min(file.getBlockSize(), minSize)) {
              LOG.debug("File is not splittable so no parallelization "
                  + "is possible: " + file.getPath());
            }
          }
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }

    // 返回切片信息的集合
    return splits;
  }

(1) 程序先找到你数据存储的目录。
(2)开始遍历处理(规划切片)目录下的每一个文件
(3)遍历第一个文件First.txt(300M)
a) 获取文件大小fs. sizeOf(First.txt)
b)计算切片大小(默认块大小)
computeSplitSize( Math.max(minSize, Math.min(maxSize, blockSize)))=blocksize=128 M
c)默认情况下,切片大小=blocksize
d)开始切,形成第1个切片: First.txt- -0:128M 第2个切First.txt- -128: 256M第3个切片First.txt- -256 M:30M
(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍, 不大于1.1倍就划分-块切片)
e)将切片信息写到一个切片规划文件中
f)整个切片的核心过程在getSplit()方法中完成
g) InputSpit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等。

​ eg: 一个切片信息: file:/D:/input/inputflow/phone_data.txt:0+1178
​ 意思就是读取/D:/input/inputflow/phone_data.txt的 0~1178 范围的数据.

(4)提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数。

切片时不考虑数据集整体,而是逐个针对每一个文 件单独切片

2、Shuffle机制

2.1 Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称为Shuffle。

Map端:每一个Map任务都维护一个环形缓冲区。Map方法出来之后进入到getpartition()方法,然后数据进入到环形缓冲区,大小为100M,底层为字节数组,到达80%的时候进行反向溢写spill文件。然后对溢写文件进行分区,排序,排序的手段是快排,对key按照字典顺序进行排序。然后将这些文件数据按照分区进行归并排序。最后就可以将数据落盘。

Reduce端:Reduce通过HTTP的方式,从map端拉取数据,默认一次拉取5个。拉取的数据存储在内存中,当内存不足时,将会把数据写入磁盘中。若有Combiner,合并期间会运行,以减少写入磁盘的数据量。不管是内存中,还是磁盘中的数据都会进行归并写入到Reduce方法中去。
在这里插入图片描述
生产环境中的优化:

①反向溢写可以调整为90%-95%,目的是为了减少溢写文件spill的个数

②环形缓冲区的大小可以调整为200M,目的是为了减少溢写文件spill的个数

③在Map的归并排序之前进行Combiner,以减少IO、提高Reduce的效率

④在Map的归并排序之后进行Combiner,以减少IO、提高Reduce的效率

⑤Map端数据落盘之前可以采用压缩,以减少数据的IO

⑥当MapTask特别多时,为了调高效率,可以将Reduce拉取数据调整为10-20个

⑦在Reduce端,数据归并排序之前可以进行Combiner

⑧Map端归并排序默认一次归并10个,可以调整为20个,以增加效率

⑨可以增加Reduce端的内存大小,减少数据写入磁盘

其他优化设置:

(1)关于压缩:

压缩可以使用在map的输入:优先考虑文件的大小,文件小不会切片的–>选择压缩速度快的,文件大会切片–>考虑压缩是否支持切片

​ map的输出:优先考虑压缩速度

​ reduce输出:考虑需求,reduce的最数据的用途

(2)设置Map、Reduce共存:调整slowstart.completedmaps参数,使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间。

**(3)规避使用Reduce,**因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。

(4)NodeManager默认内存8G,需要根据服务器实际配置灵活调整,例如128G内存,配置为100G内存左右,yarn.nodemanager.resource.memory-mb。

(5)单任务默认内存8G,需要根据该任务的数据量灵活调整,yarn.scheduler.maximum-allocation-mb。

(6)MapTask内存上限,默认为1G,如果数据量是128m,正常不需要调整内存;如果数据量大于128m,可以增加MapTask内存,最大可以增加到4-5g。mapreduce.map.memory.mb

(7)ReduceTask内存上限。默认内存大小为1G,如果数据量是128m,正常不需要调整内存;如果数据量大于128m,可以增加ReduceTask内存大小为4-5g。mapreduce.reduce.memory.mb

**环形缓冲区的底层实现:**就是一个字节数组,数组前面记录关于KV的索引信息,数组后面记录KV数据。首尾相接,构成环形缓冲区,中间是赤道。用于数据的Spill溢出处理。

2.2 Partition分区

public class HashPartitioner<K, V> extends Partitioner<K, V> {
	public int getPartition(K key, V value, int numReduceTasks) {
	return (key .hashCode () & Integer.MAX VALUE)%numReduceTasks;
    }
}

默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

因此,可以自定义分区。

(1)如果ReduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
(2)如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放, 会Exception;
(3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个
ReduceTask,最终也就只会产生一个结果文件part-r-00000;
(4) 分区号必须从零开始,逐一累加。

//3、自定义Partitioner步骤
//(1)自定义类继承Partitioner,重写getPartition()方法
public class CustomPartitioner extends Partitioner<Text, FlowBean> {
	@Override
	public int getPartition (Text key, FlowBean value, int numPartitions) {
		//控制分区代码逻辑
		return partition;
    }
}
//(2)在Job驱动中,设置自定义Partitioner
	job.setPartitionerClass(CustomPartitioner.class);
//(3)自定义Partition后, 要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
	job.setNumReduceTasks(5);

2.3、排序

MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

三次排序过程:

​ 对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

​ 对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

排序分类:

(1)部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。在全排序的基础上增加分区个数,部分排序(区内排序)可以保证保证输出的每个文件内部有序
(2)全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。 但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
*(3)辅助排序: (*GroupingComparator分组)
**在Reduce端对key进行分组。**应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
(4)二次排序
在自定义排序过程中,如果compare To中的判断条件为两个即为二次排序。

辅助排序(分组比较)★

Reduce端是怎么把相同的key分到一个reduce呢?依赖与分组比较。

WritableComparator :比较—>一般用于分组比较

1.//存储Hadoop自身序列化类型及对应的分组比较器。			
private static final ConcurrentHashMap<Class, WritableComparator> comparators 
							= new ConcurrentHa shMap<Class, WritableComparator>(); 
//2.实际在Hadoop自身的序列化类型中,都已经定义好了对应的分组比较器.
例如:在Text类中:
/** A WritableComparator optimized for Text keys. */
public static class Comparator extends WritableComparator {
	public Comparator () {
			super (Text.class) ;
    }	
	@override
	public int compare (byte[] b1, int s1, int 11,
						byte[] b2,int s2, int 12) {
			int n1 = WritableUti1s.decodeVIntSize (b1[s1]) ;
			int n2 = Writab1eUtils.decodeVIntSize (b2[s2]) ;
			return compareBytes (b1,s1+n1, 11-n1, b2, s2+n2, 12-n2) ;
    }
}
//3.在运行MR程序时,hadoop会将自身的序列化类型及对应的分组比较器注册到WritableComparator中的comparators这个Map中
//例如:在Text类中:
static {
// register this comparator
WritableComparator.define (Text.class, new Comparator()) ;
}
//4. 在mr中如何获取当前key类型对应的分组比较器
     
    //在MapTask中的 1018行:  获取key的比较器
    comparator = job.getOutputKeyComparator();

    public RawComparator getOutputKeyComparator() {
    Class<? extends RawComparator> theClass = getClass(
      JobContext.KEY_COMPARATOR, null, RawComparator.class);
    if (theClass != null)
      return ReflectionUtils.newInstance(theClass, this);
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);//判断当前的类是否是WritableComparable的子类,并获得比较器
  }    

   public static WritableComparator get(
      Class<? extends WritableComparable> c, Configuration conf) {
    WritableComparator comparator = comparators.get(c);
    if (comparator == null) {
      // force the static initializers to run  重新加载
      forceInit(c);
      // look to see if it is defined now  再次获取
      comparator = comparators.get(c);
      // if not, use the generic one  若获取不到,则直接new一个,newWritableComparator的时候,会调用WritableComparable方法
      if (comparator == null) {
        comparator = new WritableComparator(c, conf, true);
      }
    }
    // Newly passed Configuration objects should be used.
    ReflectionUtils.setConf(comparator, conf);
    return comparator;
  }
// 5. 自定义比较器对象:   继承WritableComparator,提供构造方法,重写compre方法.
    
//    重写这个compare。 默认还是调用Bean对象的compareTo方法用作分组比较器
    public int compare(WritableComparable a, WritableComparable b) {
       return a.compareTo(b);  
    }

 // 如果一个key没有提供对应的比较器对象,默认会使用key对应的类中的 compareTo方法来进行分组比较.   

2.4、Combiner合并

(1) Combiner是MR程序中Mapper和Reducer之外的一种组件。

(2) Combiner组件的父类就是Reducer。

(3) Combiner和Reducer的区别在于运行的位置

​ Combiner是在每一个 MapTask所在的节点运行;
​ Reducer是接收全局所有Mapper的输出结果;

(4) Combiner的意义就是对每个MapTask的输出进行局部汇总,以减小网络传输量。

(5) Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。

3、MapTask工作机制

//1. 从Job提交流程的(2)--><9> 进去 
   Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);  //构造真正执行的Job , LocalJobRunnber$Job
//2. LocalJobRunnber$Job 的run()方法
   1)  TaskSplitMetaInfo[] taskSplitMetaInfos = 
          SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
       // 读取job.splitmetainfo
   2)   int numReduceTasks = job.getNumReduceTasks();  // 获取ReduceTask个数

   3)  List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(
            taskSplitMetaInfos, jobId, mapOutputFiles); 
       // 根据切片的个数, 创建执行MapTask的 MapTaskRunnable
   4)   ExecutorService mapService = createMapExecutor();  // 创建线程池
       
   5)	runTasks(mapRunnables, mapService, "map");   //执行 MapTaskRunnable
	
   6)   因为Runnable提交给线程池执行,接下来会执行MapTaskRunnable的run方法。

   7)   执行 LocalJobRunner$Job$MapTaskRunnable 的run()方法.
        
	  (1)  MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,
            info.getSplitIndex(), 1);   //创建MapTask对象

	  (2)   map.run(localConf, Job.this);  //执行MapTask中的run方法
	      
	      <1> .runNewMapper(job, splitMetaInfo, umbilical, reporter); 
	          
		      ①  org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =  JobContextImpl
		      ②  org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =  WordConutMapper
		      ③  org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = TextInputFormat
		      ④  split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
				splitIndex.getStartOffset());   // 重构切片对象
                       切片对象的信息 :                  //file:/D:/input/inputWord/JaneEyre.txt:0+36306679
              ⑤  org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = MapTask$NetTrackingRecordReader

		      ⑥   output = new NewOutputCollector(taskContext, job, umbilical, reporter);  //构造缓冲区对象
			
			    [1] collector = createSortingCollector(job, reporter);  //获取缓冲区对象
			    MapTask$MapOutputBuffer

			       {1} . collector.init(context);  //初始化缓冲区对象
			           
				       1>>.final float spillper =
					job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
                                      // 溢写百分比  0.8
				       2>>.final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,
						 MRJobConfig.DEFAULT_IO_SORT_MB);    
				      // 缓冲区大小  100M
			           3>>.sorter = ReflectionUtils.newInstance(job.getClass(
						   MRJobConfig.MAP_SORT_CLASS, QuickSort.class,
						   IndexedSorter.class), job);
				     // 排序对象
				     // 排序使用的是快排,并且基于索引排序。
                       4>> . // k/v serialization  // kv序列化
				       5>> . // output counters    // 计数器
				       6>> .  // compression       //  压缩
				       7>> .  // combiner          //  combiner

		     ⑦  mapper.run(mapperContext);    // 执行WordCountMapper中的run方法。 实际执行的是WordCountMapper继承的Mapper中的run方法。
		      
		        [1] . 在Mapper中的run方法中 
		             map(context.getCurrentKey(), context.getCurrentValue(), context);
			     执行到WordCountMapper中的map方法。
		        [2] . 在WordCountMapper中的map方法中将kv写出
		            context.write(outK,outV);

4、ReduceTask工作机制

5、OutputFormat数据输出

​ OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了 OutputFormat接口。

​ 几种常见的OutputFormat实现类。
​ 1.文本输出TextOutputFormat
​ 默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString0方法把它们转换为字符串。

​ 2.SequenceFileOutputFormat

将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。
     3.**自定义OutputFormat**
 根据用户需求,自定义实现输出。

自定义OutputFormat

为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。

自定义OutputFormat步骤
(1)自定义一个类继承FileOutputFormat
(2)改写RecordWriter, 具体改写输出数据的方法write。

6、Join

6.1Reduce Join

Map端的主要工作:

为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:

在Reduce端以连接字段作为key的分组已经完成, 我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。

缺点:这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。
解决方案: Map端实现数据合并

6.2 Map Join

将小表的文件提前加载到内存中,接下来每读取一条大表的数据,就与内存中的小表的数据进行join, join完成后直接写出.

Map Join适用于一张表十分小、一张表很大的场景。

思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?

在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

具体办法:采用DistributedCache

​ (1)在Mapper的setup阶段,将文件读取到缓存集合中。

​ (2)在驱动函数中加载缓存。

​ // 缓存普通文件到Task运行节点。

​ job.addCacheFile(new URI(“file://e:/cache/pd.txt”));

7、计数器

Hadoop为每个作业维护若干内置计数器,以描述多项指标。例如,某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量。
计数器API
(1)采用枚举的方式统计计数
enum MyCounter{MALFOROR MED ,NORMAL}
//对枚举定,义的自定义计数器加1
context getCounter(MyCounter.MAL F ORO RMED)increment(1);
(2)采用计数器组、计数器名称的方式统计
context. getCounter(" counterGroup" , " counter" ). increment(1),
组名和计数器名称随便起,但最好有意义。
(3)计数结果在程序运行后的控制台上查看。

四、数据压缩

1、概述

数据压缩对于节省资源最小化磁盘I/O和网络传输非常有帮助。可以在任意MapReduce阶段启用压缩。

压缩基本原则:
(1)运算密集型的job,少用压缩
(2) IO密集型的iob,多用压缩

2、MR支持的压缩编码

压缩格式 hadoop自带? 算法 文件扩展名 是否可切分 换成压缩格式后,原来的程序是否需要修改
DEFLATE 是,直接使用 DEFLATE .deflate 和文本处理一样,不需要修改
Gzip 是,直接使用 DEFLATE .gz 和文本处理一样,不需要修改
bzip2 是,直接使用 bzip2 .bz2 和文本处理一样,不需要修改
LZO 否,需要安装 LZO .lzo 需要建索引,还需要指定输入格式
Snappy 否,需要安装 Snappy .snappy 和文本处理一样,不需要修改
压缩格式 对应的编码/解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

3、常用压缩方式选择

Bzip2压缩:

优点:**支持Split;**具有很高的压缩率,比Gzip 压缩率都高; Hadoop本身自带,使用方便。
缺点:压缩解压速度慢

Lzo压缩:

优点:压缩解压速度也比较快,合理的压缩率;支持Split, 是Hadoop中最流行的压缩格式;可以在inux系统下安装lzop命令,使用方便。
缺点: Hadoop本身不支持,需要安装;在应用中对Lzo格式的文件需要做一些特殊处理(为了支持Split**需要建索引,**还需要指定InputFormat为Lzo格式)
应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大, Lzo优点越越明显。

Snappy压缩:

优点:高速压缩速度和合理的压缩率。
缺点:**不支持Split;**压缩率比Gzip要低; Hadoop本身不支持, 需要安装。
应用场景:当MapReduce作业的Map输出的数据比较大的时候,作为Map到Reduce的中间数据的压缩格式;或者作为一个MapReduce作业的输 出和另外一个MapReduce作业的输入。

4、压缩位置选择

在这里插入图片描述

热门文章

暂无图片
编程学习 ·

【面试】如果你这样回答“什么是线程安全”,面试官都会对你刮目相看

不是线程的安全 面试官问:“什么是线程安全”,如果你不能很好的回答,那就请往下看吧。 论语中有句话叫“学而优则仕”,相信很多人都觉得是“学习好了可以做官”。然而,这样理解却是错的。切记望文生义。 同理,“线程安全”也不是指线程的安全,而是指内存的安全。为什么如…
暂无图片
编程学习 ·

中断、异常、系统调用的概念

系统调用是由应用程序发起的,意思是应用程序主动向操作系统发出服务请求。 异常是由不良的应用程序发起的,意思是非法指令或者其他坏的处理状态,比如内存出错。 中断是由外设发起的,它来自于不同硬件设备的计时器或者网络的中断。
暂无图片
编程学习 ·

leetcode:208. 实现 Trie (前缀树)

链接:https://leetcode-cn.com/problems/implement-trie-prefix-tree/ 实现一个前缀树(节点),一个前缀树节点需要保存它可能的26个孩子的信息,以及这个节点是不是一个单词的结尾。 C++代码: class Trie {Trie * children[26];bool isWord = false; public:/** Initialize…
暂无图片
编程学习 ·

Linux系统中的firewalld火墙管理及优化(firewalld)

Linux系统中的firewalld火墙管理及优化(firewalld)1.firewalld 的模块化管理及存储方式(1)火墙配置目录 /etc/firewalld 火墙模块目录 /lib/firewalld (2)firewalld的一些域网络区名称 默认配置 trusted(信任) 可接受的所有网络连接 home(家庭) 用于家庭,仅接受ss…
暂无图片
编程学习 ·

SQL存储过程

什么是存储过程,如何创建一个存储过程 * Stored Procedure * 存储过程=SQL语句+流控制语句定义存储过程定义 create procedure 存储过程名称(【参数列表】) begin 需要执行的语句 end. 创建CREATE PROCEDURE `get_hero_scores`( OUT max_max_hp FLOAT, OUT min_max_mp FLO…
暂无图片
编程学习 ·

收藏量4w+的Web开发框架,你还没学?点击收藏!

Python的Web开发,也是工作岗位比较多的领域。如果你对Python的Web开发有兴趣,正打算开始学习使用Python做Web开发等,那么学习一门基于Python的Web开发框架是必修课。Python作为当前最热门,也是最主要的Web开发语言之一,在其二十多年的历史中出现了数十种Web框架,比如Djan…
暂无图片
编程学习 ·

Web前端页面制作流程以及注意事项,满满的干货!

每天我们打开电脑,看到各种各样的web前端页面。你知道他们是如何制作的吗?为了让页面更具有规范性,让使用者更加方便,在制作页面过程中必须遵循一定的设计流程。在这里就为大家详细介绍一下制作一个Web前端页面的设计流程及注意事项。一:确定网站主题 每个网站都有自身以及…
暂无图片
编程学习 ·

nginx从下载到部署全过程(Linux)

导航NGINX官网下载NGINX安装环境解压,编译,安装启动及测试NGINX官网以下列举了三个网址,分别是:NGINX官网,下载网址及官方文档。官方网站:http://nginx.org/下载网址:http://nginx.org/en/download.html官方文档:http://nginx.org/en/docs/ 下载NGINX 通过官方下载地址…
暂无图片
编程学习 ·

SwiftUI 2020年WWDC演示示例

整体效果代码实现 文件目录SandwichesApp.swift import SwiftUI@main struct SandwichesApp: App {// 定义一个私有的状态对象 store@StateObject private var store = SandwichStore()var body: some Scene {WindowGroup {// 将store传递给列表页ContentView(store: store)}} …
暂无图片
编程学习 ·

docker常用命令

1, docker常用命令 #查看本地镜像 docker images #查看镜像历史,httpd为镜像名称 docker history httpd 运行容器:docker start 容器ID|容器名称 停止容器:docker stop 容器ID|容器名称 重启容器:docker restart 容器ID|容器名称 删除容器:docker rm 容器ID|容器名称 删…
暂无图片
编程学习 ·

Java之父 詹姆斯·高斯林 传奇的一生

Java之父 传奇的一生 Java之父 詹姆斯高斯林 詹姆斯高斯林 (James Gosling)是一名软件专家,1955年5月19日出生于加拿大,Java编程语言的共同创始人之一,一般公认他为“Java之父”。 1977年获得了加拿大卡尔加里大学计算机科学学士学位,1983年获得了美国卡内基梅隆大学计算…
暂无图片
编程学习 ·

Object类的方法

Object 类是类层次结构的根,在 Java 语言中,所有的类从根本上而言都继承自这个类。而且,Object 类是 Java 语言中唯一没有父类的类,而其他所有的类,包括标准容器类,例如数组,都继承了 Object 类。方法名 返回类型 方法描述clone() Object 创建并返回此对象的一个副本equ…
暂无图片
编程学习 ·

linux监控网卡抓包

/usr/sbin/tcpdump -n -e -i eth0 tcp port 8090tcpdump 的抓包保存到文件的命令参数是-w xxx.cap 抓eth1的包 tcpdump -i eth1 -w /tmp/xxx.cap 抓 192.168.1.123的包 tcpdump -i eth1 host 192.168.1.123 -w /tmp/xxx.cap 抓192.168.1.123的80端口的包 tcpdump -i eth1 host …
暂无图片
编程学习 ·

ssm实现用户管理系统

ssm实现用户管理系统(2) @RequestMapping("/add.do") public String add(User user){userService.add(user);return "redirect:findAll.do"; } @RequestMapping("/toUpdate.do") public ModelAndView toUpdate(int id){User user=userService.…
暂无图片
编程学习 ·

2020.6.27 HTML总结

HTML总结: 一、HTML基本结构 <html><head><title>这是一个文本</title></head> ​ <body>文本</body> </html>< html >元素是 HTML 页面的根元素 < head > 元素包含了文档的元(meta)数据,如 < meta charse…
暂无图片
编程学习 ·

python中的eval函数

eval是Python的一个内置函数,这个函数的作用是,返回传入字符串的表达式的结果。想象一下变量赋值时,将等号右边的表达式写成字符串的格式,将这个字符串作为eval的参数,eval的返回值就是这个表达式的结果。 几个例子 a="[[1,2], [3,4], [5,6], [7,8], [9,0]]" pr…
暂无图片
编程学习 ·

next()与nextLine()区别

next()与nextLine()区别 两种接收方式第一种import java.util.Scanner;public class Demo01 {public static void main(String[] args) {//创建一个扫描器对象,用于按收健盘数据Scanner sc = new Scanner(System.in);System.out.println("使用next方式接收");//为断…
暂无图片
编程学习 ·

自定义js封装

网上搜索了很多,一顿整合,目前觉得比较合适的 才用 闭包 + 原型模式 结构记录如下 :;(function(undefined){ use strict;var _global;var defaultSetting = {};var tools={};var checkbox = function (settings){if(!(this instanceof checkbox)){return new checkbox()};…