﻿ mapreduce 二次排序 - 鸿网互联

# mapreduce 二次排序

1 二次排序 1.1 思路 所谓二次排序，对第1个字段相同的数据，使用第2个字段进行排序。 举个例子，电商平台记录了每一用户的每一笔订单的订单金额，现在要求属于同一个用户的所有订单金额作排序，并且输出的用户名也要排序。 账户 订单金额 hadoop@apache 200

## 1 二次排序

hive@apache 550
yarn@apache 580
hive@apache 159
hive@apache 258
yarn@apache 100
yarn@apache 560
yarn@apache 260

hive@apache 159
hive@apache 258
hive@apache 550
yarn@apache 100
yarn@apache 260
yarn@apache 560
yarn@apache 580

### 1.2 实现

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.Charset;

public class SecondarySortMapReduce extends Configured implements Tool {

/**
* 消费信息
* @author Ivan
*
*/
public static class CostBean implements WritableComparable<CostBean> {
private String account;
private double cost;

public void set(String account, double cost) {
this.account = account;
this.cost = cost;
}

public String getAccount() {
return account;
}

public double getCost() {
return cost;
}

@Override
public void write(DataOutput out) throws IOException {
byte[] buffer = account.getBytes(Charset.forName("UTF-8"));

out.write(buffer);
out.writeDouble(cost);
}

@Override
public void readFields(DataInput in) throws IOException {
byte[] bytes = new byte[accountLength];

account = new String(bytes);
}

@Override
public int compareTo(CostBean o) {
if (account.equals(o.account)) {        //账户相等, 接下来比较消费金额
return cost == o.cost ? 0 : (cost > o.cost ? 1 : -1);
}

return account.compareTo(o.account);
}

@Override
public String toString() {
return account + "\t" + cost;
}
}

/**
* 用于map端和reduce端排序的比较器:如果账户相同，则比较金额
* @author Ivan
*
*/
public static class CostBeanComparator extends WritableComparator {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

int result = compareBytes(b1, s1 + 4, accountLength1, b2, s2 + 4, accountLength2);
if (result == 0) {  // 账户相同，则比较金额
double thisValue = readDouble(b1, s1 + 4 + accountLength1);
double thatValue = readDouble(b2, s2 + 4 + accountLength2);
return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
} else {
return result;
}
}
}

/**
* 用于map端在写磁盘使用的分区器
* @author Ivan
*
*/
public static class CostBeanPatitioner extends Partitioner<CostBean, DoubleWritable> {

/**
* 根据 account分区
*/
@Override
public int getPartition(CostBean key, DoubleWritable value, int numPartitions) {
return key.account.hashCode() % numPartitions;
}
}

/**
* 用于在reduce端分组的比较器根据account字段分组,即相同account的作为一组
* @author Ivan
*
*/
public static class GroupComparator extends WritableComparator {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

byte[] tmpb1 = new byte[accountLength1];
byte[] tmpb2 = new byte[accountLength2];
System.arraycopy(b1, s1 + 4, tmpb1, 0, accountLength1);
System.arraycopy(b2, s2 + 4, tmpb2, 0, accountLength2);

String account1 = new String(tmpb1, Charset.forName("UTF-8"));
String account2 = new String(tmpb1, Charset.forName("UTF-8"));

System.out.println("grouping: accout1=" + account1 + ", accout2=" + account2);

return compareBytes(b1, s1 + 4, accountLength1, b2, s2 + 4, accountLength2);
}
}

/**
* Mapper类
* @author Ivan
*
*/
public static class SecondarySortMapper extends Mapper<LongWritable, Text, CostBean, DoubleWritable> {
private final CostBean outputKey = new CostBean();
private final DoubleWritable outputValue = new DoubleWritable();

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] data = value.toString().split("\t");

double cost = Double.parseDouble(data[1]);
outputKey.set(data[0].trim(), cost);
outputValue.set(cost);

context.write(outputKey, outputValue);
}
}

public static class SecondarySortReducer extends Reducer<CostBean, DoubleWritable, Text, DoubleWritable> {
private final Text outputKey = new Text();
private final DoubleWritable outputValue = new DoubleWritable();
@Override
protected void reduce(CostBean key, Iterable<DoubleWritable> values,Context context)
throws IOException, InterruptedException {
outputKey.set(key.getAccount());

for (DoubleWritable v : values) {
outputValue.set(v.get());
context.write(outputKey, outputValue);
}
}
}

public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, SecondarySortMapReduce.class.getSimpleName());
job.setJarByClass(SecondarySortMapReduce.class);

FileOutputFormat.setOutputPath(job, new Path(args[1]));

// map settings
job.setMapperClass(SecondarySortMapper.class);
job.setMapOutputKeyClass(CostBean.class);
job.setMapOutputValueClass(DoubleWritable.class);

// partition settings
job.setPartitionerClass(CostBeanPatitioner.class);

// sorting
job.setSortComparatorClass(CostBeanComparator.class);

// grouping

job.setGroupingComparatorClass(GroupComparator.class);

// reduce settings
job.setReducerClass(SecondarySortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputKeyClass(DoubleWritable.class);

boolean res = job.waitForCompletion(true);

return res ? 0 : 1;
}

/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
if (args.length < 2) {
throw new IllegalArgumentException("Usage: <inpath> <outpath>");
}

ToolRunner.run(new Configuration(), new SecondarySortMapReduce(), args);
}
}

### 1.3 测试

###### 运行环境
• 操作系统: Centos 6.4

hive@apache 550
yarn@apache 580
hive@apache 159