MapReduce
MapReduce---网约车撤销订单数据清洗
- 数据及需求解析
- 数据
- 需求及解析
- 代码实现
- 自定义数据类型
- Map阶段
- Reduce阶段
- Driver阶段
数据及需求解析
数据
数据一示例
shouyue,430100,B190306162112070000,20190306162112,20190306162223,1,1,用户取消
companyid,address,orderid,ordertime,canceltime,operator,canceltypecode,cancelreason
1200DDCX3307,430104,17625076885092,20190307173227,20190307173833,2,5,null
1200DDCX3307,430406,17624999825776,20190306164901,20190306164947,1,4,null
1100YDYC423D,430602,6665578474529331090,20190307172846,20190307172909,1,1,第三方接口取消
3301YXKJ223E,430700,17146323201919,20190307083000,20190307073855,3,3,
1100YDYC423D,430602,6665578474529331090,20190307172846,20190307172909,1,1,第三方接口取消
3301YXKJ223E,430700,,20190307083000,20190307073855,3,3,
shouyue,430100,P190307171256186000,20190307171255,20190307171348,1,1,点击下单120S内没有筛选到司机时, 乘客手动点击取消订单
字段解析
companyid,address,orderid,ordertime,canceltime,operator,canceltypecode,cancelreason
数据二示例
340103,安徽省合肥市庐阳区
431027,湖南省郴州市桂东县
210302,辽宁省鞍山市铁东区
440605,广东省佛山市南海区
字段解析
行政区划代码,行政区
需求及解析
- 判断一行数据字段是否完整,本数据字段之间通过,分割,数据分割后长度为 8 ,如果分割后字符列表长度小于 8
或有不完整字段(字段值为空),则清洗掉这一行数据。 - 去重清洗:若有相同订单 id(orderid)只保留第一行,其他的清洗掉;
- 撤销理由(cancelreason)有大量的字符串 null,请将这些字符串用”未知”替代;
- 将数据集中的订单时间(ordertime)、订单撤销时间(canceltime)转换为 “yyyy-MM-dd
HH:mm:ss”格式,同时只保留订单时间(ordertime)和订单撤销时间(canceltime)在 2019 年 03 月 07
日的数据,其他日期或者不匹配的数据清洗掉(字段名数据); - 处理数据集中的行政区划代码(address),将其转换成对应的地区名称,在行政区划代码(address)字段后插入,若数据中行政区划代码在数据库没有找到对应的行政区名称,则将行政区名设置为“未知”;
代码实现
自定义数据类型
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Data implements WritableComparable<Data> {private String companyid;private String address;private String orderid;private String ordertime;private String canceltime;private String operator;private String canceltypecode;private String cancelreason;@Overridepublic int compareTo(Data o) {return 0;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(companyid);dataOutput.writeUTF(address);dataOutput.writeUTF(orderid);dataOutput.writeUTF(ordertime);dataOutput.writeUTF(canceltime);dataOutput.writeUTF(operator);dataOutput.writeUTF(canceltypecode);dataOutput.writeUTF(cancelreason);}@Overridepublic void readFields(DataInput dataInput) throws IOException {companyid = dataInput.readUTF();address = dataInput.readUTF();orderid = dataInput.readUTF();ordertime = dataInput.readUTF();canceltime = dataInput.readUTF();operator = dataInput.readUTF();canceltypecode = dataInput.readUTF();cancelreason = dataInput.readUTF();}public void set(String companyid, String address, String orderid, String ordertime, String canceltime, String operator, String canceltypecode, String cancelreason) {thispanyid = companyid;this.address = address;this.orderid = orderid;this.ordertime = ordertime;this.canceltime = canceltime;this.operator = operator;this.canceltypecode = canceltypecode;this.cancelreason = cancelreason;}@Overridepublic String toString() {return companyid + "|" + address + "|" + orderid + "|" + ordertime + "|" + canceltime + "|" +operator + "|" + canceltypecode + "|" + cancelreason + "|";}public String getCompanyid() {return companyid;}public void setCompanyid(String companyid) {thispanyid = companyid;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}public String getOrderid() {return orderid;}public void setOrderid(String orderid) {this.orderid = orderid;}public String getOrdertime() {return ordertime;}public void setOrdertime(String ordertime) {this.ordertime = ordertime;}public String getCanceltime() {return canceltime;}public void setCanceltime(String canceltime) {this.canceltime = canceltime;}public String getOperator() {return operator;}public void setOperator(String operator) {this.operator = operator;}public String getCanceltypecode() {return canceltypecode;}public void setCanceltypecode(String canceltypecode) {this.canceltypecode = canceltypecode;}public String getCancelreason() {return cancelreason;}public void setCancelreason(String cancelreason) {this.cancelreason = cancelreason;}
}
Map阶段
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;public class MapTest extends Mapper<LongWritable, Text, Text, Data> {Text k = new Text();Data v = new Data();Map<String, String> address_name = new HashMap<String, String>();String ordertime;String canceltime;String address;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {URI[] uris = context.getCacheFiles();File file = new File(uris[0]);BufferedReader br = new BufferedReader(new FileReader(file));String line;while ((line = br.readLine()) != null) {address_name.put(line.split(",")[0], line.split(",")[1]);}}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {if (value.toString().startsWith("companyid ")) {return;}String[] datas = value.toString().split(",", -1);//是否完整if (datas.length != 8) {return;}for (String s : datas) {if (s == null || s.equals("")) {return;}}//判断时间SimpleDateFormat sdf1 = new SimpleDateFormat("yyyyMMddHHmmss");SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {ordertime = sdf2.format(sdf1.parse(datas[3]));canceltime = sdf2.format(sdf1.parse(datas[4]));} catch (ParseException e) {e.printStackTrace();}if ((!ordertime.startsWith("2019-03-07")) && (!canceltime.startsWith("2019-03-07"))) {return;}//将null改成未知if ("null".equals(datas[7])) {datas[7] = "未知";}//地区if (address_name.containsKey(datas[1])) {address = datas[1]+"|"+address_name.get(datas[1]);}else {address = datas[1]+"|"+"未知";}k.set(datas[2]);v.set(datas[0], address, datas[2], ordertime, canceltime, datas[5], datas[6], datas[7]);context.write(k, v);}
}
Reduce阶段
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class RedTest extends Reducer<Text,Data,Data, NullWritable> {Data k = new Data();@Overrideprotected void reduce(Text key, Iterable<Data> values, Context context) throws IOException, InterruptedException {for (Data data:values){k = data;context.getCounter("是否合格","总计").increment(1);}context.getCounter("是否合格","合格").increment(1);context.write(k,NullWritable.get());}
}
Driver阶段
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.File;
import java.URI;public class DriTest {public static void main(String[] args) throws Exception {File file = new File("D:\\MP\\网约车\\output");if (file.exists()) {delFile(file);driver();} else {driver();}}public static void delFile(File file) {File[] files = file.listFiles();if (files != null && files.length != 0) {for (int i = 0; i < files.length; i++) {delFile(files[i]);}}file.delete();}public static void driver() throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setMapperClass(MapTest.class);job.setJarByClass(DriTest.class);job.setReducerClass(RedTest.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Data.class);job.setOutputKeyClass(Data.class);job.setOutputValueClass(NullWritable.class);job.addCacheFile(new URI("file:///D:/MP/网约车/data.txt"));FileInputFormat.setInputPaths(job, "D:\\MP\\网约车\\input");FileOutputFormat.setOutputPath(job, new Path("D:\\MP\\网约车\\output"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}


发布评论