使用MapReduce实现join操作

文章目录

  • 一.概述
  • 二.需求
  • 三.map+reduce实现join
  • 四.MapReduce Map端 join实现原理(没有reduce处理)

一.概述

熟悉SQL的读者都知道,使用SQL语法实现join是很简单的,只需要一条SQL语句即可,但是在大数据场景下使用MapReduce编程模型实现join还是比较繁琐的在实际生产中我们可以借助Hive,Spark SQL 等框架来实现join,但是对于join的实现原理我们需要掌握,这对于理解join的底层实现很有帮助,本文介绍如何使用MapReduce API 来实现join

二.需求

实现如下SQL的功能: select c.customer_id,c.customer_name,o.orderId,o.order_status from customer c join order o on c.customer_id=o.customer_id
文件链接: https://pan.baidu.com/s/1GziR0W7pNwk26lHf-ZZ8NA 提取码: 2piw

三.map+reduce实现join

map

  • 判断字段个数如果是4个字段就是order表,9个字段就是customer表
  • ( customer_id,(customer_id,customer_name,orderId,order_status,flag))

reduce
对同一个customer_id的key进行处理,将value值进行拼接
代码实现
-1)先写所需字段的实体类如下:

package hadoop.mapreduce.reducejoin;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author sunyong
 * @date 2020/07/02
 * @description
 */
public class CustomerOrders implements Writable {
    //-( customer_id,(customer_id,customer_name,orderId,order_status,flag))
    private String customer_id;//客户id
    private String customer_name;//客户名
    private String orderId;//订单id
    private String order_status;//订单状态
    private String flag;//标志位(是map识别文件的标志)

    public CustomerOrders() {
    }

    public CustomerOrders(String customer_id, String customer_name, String orderId, String order_status, String flag) {
        this.customer_id = customer_id;
        this.customer_name = customer_name;
        this.orderId = orderId;
        this.order_status = order_status;
        this.flag = flag;
    }
	//序列化
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(customer_id);
        dataOutput.writeUTF(customer_name);
        dataOutput.writeUTF(orderId);
        dataOutput.writeUTF(order_status);
        dataOutput.writeUTF(flag);
    }
	//反序列化(顺序要一致)
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.customer_id=dataInput.readUTF();
        this.customer_name=dataInput.readUTF();
        this.orderId=dataInput.readUTF();
        this.order_status=dataInput.readUTF();
        this.flag=dataInput.readUTF();
    }

    public String getCustomer_id() {
        return customer_id;
    }

    public void setCustomer_id(String customer_id) {
        this.customer_id = customer_id;
    }

    public String getCustomer_name() {
        return customer_name;
    }

    public void setCustomer_name(String customer_name) {
        this.customer_name = customer_name;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getOrder_status() {
        return order_status;
    }

    public void setOrder_status(String order_status) {
        this.order_status = order_status;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public String toString() {
        return customer_id + ',' +
              customer_name + ',' +
           orderId + ',' +
         order_status ;
    }
}
  • 2)编写map类,如下:
package hadoop.mapreduce.reducejoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author sunyong
 * @date 2020/07/02
 * @description
 */
public class CustomerOrderMapper extends Mapper<LongWritable, Text,Text,CustomerOrders> {
    CustomerOrders v = new CustomerOrders();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //将字段进行切割,返回字段数值
        String[] fields = value.toString().split(",");
        //进行判断,4字段是订单表,否则就是顾客表
        if(fields.length==4){
            //订单表中可赋值的字段进行赋值
            v.setCustomer_id(fields[2]);
            v.setCustomer_name("");
            v.setOrderId(fields[0]);
            v.setOrder_status(fields[3]);
            v.setFlag("1");
        }else{
            //顾客表中可赋值的字段进行赋值
            v.setCustomer_id(fields[0]);
            v.setOrderId("");
            v.setOrder_status("");
            v.setCustomer_name(fields[1]);
            v.setFlag("0");
        }
        //从Map端写出
        context.write(new Text(v.getCustomer_id()),v);
    }
}
  • 3)编写reduce类,如下:
package hadoop.mapreduce.reducejoin;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

/**
 * @author sunyong
 * @date 2020/07/02
 * @description
 */
public class CustomerOrderReducer extends Reducer<Text,CustomerOrders,CustomerOrders, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<CustomerOrders> values, Context context) throws IOException, InterruptedException {
        //1.准备订单记录集合
        ArrayList<CustomerOrders> ordeBeans = new ArrayList<>();
        //准备顾客bean对象
        CustomerOrders cusBean = new CustomerOrders();
        //2.遍历map端输出内容将数据放入到集合中,准备合并bean对象
        for (CustomerOrders bean : values) {
            if(bean.getFlag().equals("1")){//订单表
               CustomerOrders orderBean =  new CustomerOrders();
                try {
                    BeanUtils.copyProperties(orderBean,bean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
                ordeBeans.add(orderBean);
            }else {//顾客表
                try {
                    BeanUtils.copyProperties(cusBean,bean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }
        //3.遍历集合,进行空白字段拼接
        for (CustomerOrders bean : ordeBeans) {
            bean.setCustomer_name(cusBean.getCustomer_name());
            //4.调用写出方法
            context.write(bean,NullWritable.get());
        }
    }
}
  • 4)编写Driver类运行,如下:
package hadoop.mapreduce.reducejoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author sunyong
 * @date 2020/07/01
 * @description
 */
public class CustomerOrderDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1.创建配置文件,创建Job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"sqlJoin");

        //2.设置jar的位置
        job.setJarByClass(CustomerOrderDriver.class);

        //3.设置map和reduce的位置
        job.setMapperClass(CustomerOrderMapper.class);
        job.setReducerClass(CustomerOrderReducer.class);

        //4.设置map输出端的key,value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CustomerOrders.class);

        //5.设置reduce输出的key,value类型
        job.setOutputKeyClass(CustomerOrders.class);
        job.setOutputValueClass(NullWritable.class);
        //6.设置输出路径
        FileInputFormat.setInputPaths(job,new Path("F:\\sunyong\\Java\\codes\\javaToHdfs\\join"));
        FileOutputFormat.setOutputPath(job,new Path("joinOut"));

        //7.提交程序运行
        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);
    }
}

  • 5)执行之后去查看文件,如下:
    在这里插入图片描述

四.MapReduce Map端 join实现原理(没有reduce处理)

实际把一个表缓存到内存里(小表),可以使用HashMap缓存,再遍历另一个表,通过key到HashMap中进行取值
客户表:一个客户一个记录-->小表
订单表:一个客户可有多个订单

  • 1.编写实体类,如上:
  • 2.编写map类.这里不同
package hadoop.mapreduce.join;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

/**
 * @author sunyong
 * @date 2020/07/02
 * @description
 */
public class MapJoinMapper extends Mapper<LongWritable, Text,CustomerOrders, NullWritable> {
    //hashmap存储顾客id顾客姓名
    HashMap<String,String> customerMap = new HashMap<>();
    //准备顾客订单对象
   CustomerOrders customerOrders = new CustomerOrders();
   //对顾客表操作
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //获取缓存文件的URI,这里只有一个文件
        URI[] cacheFiles = context.getCacheFiles();
        if(cacheFiles!=null && cacheFiles.length>0){
            //获取文件路径,文件名
           String fileName = cacheFiles[0].getPath().toString();
           //缓冲流并设置utf8格式
           BufferedReader bw = new BufferedReader(new InputStreamReader(new FileInputStream(fileName),"UTF-8"));
           String line;
            //读取文件将第一列和第二列作为map的键和值
            while(StringUtils.isNotEmpty(line = bw.readLine())){
               String[] split = line.split(",");
               customerMap.put(split[0],split[1]);
             }
             //关闭资源
             bw.close();
        }

    }
    //对订单表操作
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获取第一行切割成字段
        String[] fields = value.toString().split(",");
        //进行赋值
        customerOrders.setCustomer_id(fields[2]);
        customerOrders.setOrderId(fields[0]);
        customerOrders.setOrder_status(fields[3]);
        //从HashMap获取姓名
        customerOrders.setCustomer_name(customerMap.get(fields[2]));
        //写出一个个对象(map方法每个键都会执行)
        context.write(customerOrders,NullWritable.get());
    }
}
  • 3.编写Driver类执行:
package hadoop.mapreduce.join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * @author sunyong
 * @date 2020/07/01
 * @description
 */
public class CustomerOrderDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        //1.创建配置文件,创建Job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"mapJoin");

        //2.设置jar的位置
        job.setJarByClass(CustomerOrderDriver.class);

        //3.设置map和reduce的位置(这里不需要reduce)
        job.setMapperClass(MapJoinMapper.class);
        //设置reduce个数为0
        job.setNumReduceTasks(0);
        //4.设置map输出端的key,value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CustomerOrders.class);
        //5.设置reduce输出的key,value类型(这里不需要)
        //6.设置输出路径
        //注意URI无法识别\\只能用///不然会报错,无法识别路径
        job.addCacheFile(new URI("file:///F:///sunyong///Java///codes///javaToHdfs///join///customers.csv"));//设置小表的缓存
        FileInputFormat.setInputPaths(job,new Path("F:\\sunyong\\Java\\codes\\javaToHdfs\\join"));
        FileOutputFormat.setOutputPath(job,new Path("mapOut"));
        //7.提交程序运行
        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);
    }
}
  • 4)执行后查看文件效果如下(是没有顺序的):
    在这里插入图片描述

热门文章

暂无图片
编程学习 ·

PAT 1161 Merging Linked Lists

原题链接:暂无 关键词:链表 Given two singly linked lists L 1 =a 1 →a 2 →…→a n−1 →a n L1=a1→a2→…→an−1→an and L 2 =b 1 →b 2 →…→b m−1 →b m L2=b1→b2→…→bm−1→bm . If n≥2m n≥2m , you are supposed to reverse and merge the shorter one i…
暂无图片
编程学习 ·

leetcode 124. 二叉树中的最大路径和

题目 给定一个非空二叉树,返回其最大路径和。 本题中,路径被定义为一条从树中任意节点出发,达到任意节点的序列。该路径至少包含一个节点,且不一定经过根节点。 思路 简单题,dfs返回当前节点为一端的最大链。答案有两种情况,1 当前节点到子孙的一条链 2 当前节点为中端,…
暂无图片
编程学习 ·

Leetcode 题解 - 排序

快速选择 用于求解 Kth Element 问题,使用快速排序的 partition() 进行实现。 需要先打乱数组,否则最坏情况下时间复杂度为 O(N2)。 堆排序 用于求解 TopK Elements 问题,通过维护一个大小为 K 的堆,堆中的元素就是 TopK Elements。 堆排序也可以用于求解 Kth Element …
暂无图片
编程学习 ·

爬虫工作的代理ip选择

代理ip的使用是爬虫工作必须使用的爬取辅助工具,大数据的快速发展,很多的网站不断的维护自己的网站信息,开始设置反爬虫机制,在网站进行反爬虫限制的情况下,怎样通过反爬虫机制,提高工作效率。一:使用多线程与代理ip1、多线程方式:多线程同时开展工作采集,迅速提高工作…
暂无图片
编程学习 ·

最小生成树的java实现

文章目录一、概念二、算法2.1 Prim算法2.2 Kruskal算法 笔记来源:中国大学MOOC王道考研 一、概念连通图:图中任意两点都是连通的,那么图被称作连通图生成树:连通图包含全部顶点的一个极小连通子图最小生成树:在含有n个顶点的带权无向连通图中选择n-1条边,构成一棵极小连…
暂无图片
编程学习 ·

ant design of vue,form自定义校验

<a-form-item label="产品特性" class="am-enter_form_item"><a-select mode="tags"class="am-enter-select"placeholder="选择或填写2~4个标签(限制2~4个字)"showArrow:max-tag-count="4":max-tag-te…
暂无图片
编程学习 ·

C++--找出三条能构成三角形且周长最大的边的一个普通方法

题目:给定由一些正数(代表长度)组成的数组 A,返回由其中三个长度组成的、面积不为零的三角形的最大周长。如果不能形成任何面积不为零的三角形,返回 0。 #输出示例 输入:[2,1,2] 输出:5 输入:[1,5,1] 输出:0 输入:[3,2,3,4] 输出:10 输入:[3,6,2,3] 输出:8 #inclu…
暂无图片
编程学习 ·

【译】理解C++中的 nullptr

原文链接🔗 Understanding nullptr in C++译者注:nullptr 是 C++11 为了解决NULL的歧义问题而引入的新特性,表示空指针常量. 原文作者是 Utkarsh Trivedi,发布网站是GeeksforGeeks.考虑下面的C++程序,它暴露了一些NULL的问题 // C++ program to demonstrate problem with N…
暂无图片
编程学习 ·

数据库语句和数据库表常用的操作命令

Mysql的启动与关闭启动 net start mysql关闭 net stop mysql显示当前服务器版本 SELECT NERSION();显示当前的日期 SWLECT NOW();显示当前用户 SELECT USER(); 数据库语句(DDL)查看数据库 show databases;创建数据库 create database demo;查看警告信息 show warnings;查…
暂无图片
编程学习 ·

海思NNIE开发系列文章--转载

https://blog.csdn.net/zh8706/article/details/94554337海思NNIE开发系列文章:海思NNIE开发(一):海思Hi3559AV100/Hi3519AV100 NNIE深度学习模块开发与调试记录海思NNIE开发(二):FasterRCNN在海思NNIE平台上的执行流程(一)海思NNIE开发(三):FasterRCNN在海思NNIE平…
暂无图片
编程学习 ·

项目实训——初版的页面优化(2)

项目实训——初版的页面优化(2)题目太长的解决就业帮助具体内容的收起展开表格的美化 再次进行了一次小组会议,找到了更加多的需要优化和完善的地方。比如题目很容易出框,讨论区话题的显示需要限制长度等等。同时也新增一些功能,比如评论的删除。这篇先写完善。 题目太长的…
暂无图片
编程学习 ·

vue3.0全家桶

Vue 3.0 全家桶抢先体验 1.vue: Beta 2.vue-router: Alpha 3.vuex: Alpha 4.vue-class-component: Alpha 5.vue-cli: Experimental support via vue-cli-plugin-vue-next 6.eslint-plugin-vue: Alpha 7.vue-test-utils: Alpha 8.vue-devtools: WIP 9.jsx: WIP 可以看到 Vue 3.0…
暂无图片
编程学习 ·

基于JavaWeb的宿舍管理系统(源码+数据库)无论文

背景: 管理信息系统在现代社会已深入到各行各业,由于计算机技术的迅速发展和 普及,信息管理系统MIS事实上已成为计算机管理信息系统,大学生宿舍管理系统就是一个典型的管理信息系统,它可以让宿舍管理工作变的更轻松。 任务:设计一个大学生宿舍管理系统。 目的: 实现系统维护…
暂无图片
编程学习 ·

什么是python?python有什么用途?

新手哪门编程语言最合适?绝对是python。python是目前主流的编程语言,也是当下发展最为迅速的编程语言,python可以做很多事情,无论是入门新手还是专业级选手都可以使用python。python是什么?有什么用途?python是一门非常具有条理、强大的面向对象的程序设计语言,类似于Pe…
暂无图片
编程学习 ·

node.js实现爬虫项目展示大作业

一、项目要求二、实现 1、用户可注册登录网站,非注册用户不可登录查看数据 登录页、注册页 <!DOCTYPE html> <html ng-app="login"> <head><meta charset="utf-8" /><title>Login</title><link rel="stylesh…
暂无图片
编程学习 ·

Redis cluster 集群节点维护

Redis cluster 集群节点维护 一 Redis cluster 集群节点维护 集群运行时间长久之后,难免由于硬件故障、网络规划、 业务增长等原因对已有集群进行相应的调整, 比如增加 Redis node 节点、 减少节点、 节点迁移、更换服务器等。 增加节点和删除节点会涉及到已有的槽位重新分配…
暂无图片
编程学习 ·

QT图形界面初学者项目 - 无人机群作战仿真模拟

代码已经上传到github:点这里 欢迎star 文章目录说明流程图程序控制流程主要代码展示 说明 某985大学课设,使用QT Designer完成图形化交互界面设计 ,实现的效果类似下面视频:仿真无人机对战游艇.mp4代码质量不高,但实现了功能,可以作为非专业的课程设计参考。 流程图程序…
暂无图片
编程学习 ·

skywalking分布式追踪系统 + mysql

软件版本apache-skywalking-apm-8.0.1.tar.gz mysql-connector-java-8.0.16.jar mysql-8.0.x JDK-1.8安装启动脚本, /bin 目录. 包含linux启动和Windows启动服务和UI的脚本. 配置文件, /config 目录. 包含以下配置文件.application.yml log4j.xml alarm-settings.yml 引用Jar, …
暂无图片
编程学习 ·

解决Ubuntu系统桌面分辨率太小的问题

在Ubuntu 20系统之前,安装Ubuntu系统之前都会发现系统桌面太小,分辨率太低的情况,那么如果解决这些问题呢?本人尝试过网上很多办法,都不能很好地解决这个问题,其实要想解决这个问题,并且能够做到屏幕自适应,最简单的办法就是使用一下命令:sudo apt-get autoremove ope…