Title: MapReduce 基于文件的 join数据清洗

Categories: mapreduce

Description: map reduce join for hdfs

Keywords: mapreduce hadoop hdfs

这只是一次突然需求导致的问题,解决的比较唐突,如果哪里有不对的地方欢迎指正,不希望误导他人。

MapReduce 实现基于文件的Join

package com.huaching.task;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;
import java.net.URI;

/**
 * @author firshme
 * @version 2018-12-19.
 * @url github.com/uk0
 * @project Task
 * @since JDK1.8.
 *
 * # 多个cvs文件 以key 相同进行分组  并且合并关键字
 */
public class JoinReducesSimple {

    public static class Mapper1 extends Mapper<Object, Text, Text, Text> {
        int counter = 0;

        // key index
        @Override
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String record = value.toString();
            String[] parts = record.split(",");
            StringBuilder lineData = new StringBuilder();
            for (String sk : parts) {
                lineData.append(sk).append(",");
            }
            counter++;
            if (counter % 10000 == 0) {
                System.out.println("Mapper1---->" + lineData);
            }
            context.write(new Text(parts[0]), new Text("pay\t" + lineData.substring(0, lineData.length() - 1)));
        }
    }



    public static class Mapper2 extends Mapper<Object, Text, Text, Text> {
        int counter = 0;

        @Override
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String record = value.toString();
            String[] parts = record.split(",");
            StringBuilder lineData = new StringBuilder();
            for (String sk : parts) {
                lineData.append(sk).append(",");
            }
            counter++;
            if (counter % 10000 == 0) {
                System.out.println("Mapper2---->" + lineData);
            }
            context.write(new Text(parts[4]), new Text("deposit\t" + lineData.substring(0, lineData.length() - 1)));
        }
    }

    public static class Mapper3 extends Mapper<Object, Text, Text, Text> {
        int counter = 0;

        @Override
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String record = value.toString();
            String[] parts = record.split(",");
            StringBuilder lineData = new StringBuilder();
            for (String sk : parts) {
                lineData.append(sk).append(",");
            }
            counter++;
            if (counter % 10000 == 0) {
                System.out.println("Mapper3---->" + lineData);
            }
            context.write(new Text(parts[0]), new Text("car_user\t" + lineData.substring(0, lineData.length() - 1)));
        }
    }

    public static class Reducer1 extends Reducer<Text, Text, Text, Text> {

        int counter = 0;

        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            if (counter == 0) {
                counter++;
                String title = "用户ID,手机号(如没有手机则空),注册时间,缴纳押金时间,第一次停车时间,第一次停车地点,累计消费金额(不含押金),使用过停车场数量,停车订单累计次数,最后一次停车时,最后一次支付时间,最后一次退押金时间(如现在未退押金显示押金金额)";
                context.write(new Text("Number,"), new Text(title));
            }
            counter++;
            String car_user = "";
            String deposit = "";
            String pay = "";

            // out
            String userID = "";
            String mobile = "";
            String firstParking = "";
            String firstPark = "";
            String sumPrice = "";
            String parks = "";
            String count = "";
            String lastParking = "";
            String lastPay = "";
            String depositTime = "";
            String register_time = "";
            String lastRefundDeposit = "";

            String park = "";
            for (Text t : values) {
                String[] parts = t.toString().split("\t");
                if (parts[0].trim().equals("car_user")) {
                    car_user = parts[1];
                    String [] car_userArrays = car_user.split(",");
                    if (car_userArrays.length>=24) {
                        userID = car_userArrays[0];
                        mobile = car_userArrays[2];
                        register_time = car_userArrays[19];
                    }
                } else if (parts[0].trim().equals("deposit")) {
                    deposit = parts[1];
                    String[] depositArray = deposit.split(",");
                    depositTime = depositArray[2];  //押金创建时间
                    if ("0.00".equals(depositArray[3])) {
                        lastRefundDeposit = depositArray[1]; // 已经退了押金查看退的时间
                    } else {
                        lastRefundDeposit = depositArray[3];  //显示金额
                    }

                } else if (parts[0].trim().equals("pay")) {
                    pay = parts[1];
                    String[] payArrays = pay.split(",");
                    firstParking = payArrays[3];
                    firstPark = payArrays[4];
                    sumPrice = payArrays[5];
                    parks = payArrays[1];
                    count = payArrays[2];
                    lastParking = payArrays[6];
                    lastPay = payArrays[7];
                }
                else if (parts[0].trim().equals("park")) {
                    park = parts[1];
                }
            }
            context.write(new Text(String.valueOf(counter) + ","), new Text(userID + "," + mobile + "," + register_time + "," + depositTime + "," + firstParking + "," + firstPark + "," + sumPrice + "," + parks + "," + count + "," + lastParking + "," + lastPay + "," + lastRefundDeposit+","+park));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        JobConf jobConf = new JobConf(conf, JoinReducesSimple.class);
        Job job = Job.getInstance(jobConf);
        job.setJarByClass(JoinReducesSimple.class);

        job.setReducerClass(Reducer1.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        MultipleInputs.addInputPath(job, new Path("/public/csv/parkingPayment/parkingPayment-r-00000"), TextInputFormat.class, Mapper1.class);
        MultipleInputs.addInputPath(job, new Path("/private/parking_ths_deposit_user.csv"), TextInputFormat.class, Mapper2.class);
        MultipleInputs.addInputPath(job, new Path("/private/parking_car_user.csv"), TextInputFormat.class, Mapper3.class);

        String outFile = "/public/csv/job1/";
        Path path = new Path(outFile);
        FileSystem fs = FileSystem.get(URI.create(outFile), conf);
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        MultipleOutputs.addNamedOutput(job, "job", TextOutputFormat.class, Text.class, Text.class);

        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job, path);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

大致实现方式

/***
    MapReduce 分为 map 阶段 reduce 阶段
    Map 阶段可以理解为将数据根据指定的key 进行整理
    reduce 阶段接到key 以及 value
    多个map 同时进行读取数据 并且以相应的key 进行分组
    reduce 阶段读取到 key value  value 里面取巧的加了一下 相应Map数据的标记,为了获取相应的数据。

    map1data = [id,y,x,z]
    map2data = [y,id,z]
    map3data = [f,y,x,z,id]


     Map1  Map2  Map3
      |     |      |
     key[0] key[1] key[4]
     \      |      /
      \     |     /
          reduce
            |
        key[0],value[map1,map2,map3 value传递的数据]

        reduce的时候多个Map的Key已经被分组,
            这个时候你读取value 可能是他们其中某一个map 的value
            进行相应的操作即可

***/

转载请注明出处,本文采用 CC4.0 协议授权