鸿 网 互 联 www.68idc.cn

HBase新版本与MapReduce集成

来源:互联网 作者:佚名 时间:2016-05-24 08:40
1.MapReduce从hbase读取数据 //读取hbase表数据public class HbaseAndMapReduce { public static void main(String[] args) throws Exception { // 测试数据 // testData() ; // 完成的作业是: 有共同爱好的人 System .exit (run()) ; /* * TableMapper //

1.MapReduce从hbase读取数据

//读取hbase表数据
public class HbaseAndMapReduce {
    public static void main(String[] args) throws Exception {
        // 测试数据
        // testData();

        // 完成的作业是: 有共同爱好的人

        System.exit(run());

        /*
         * TableMapper //对输出hbase数据来格式分割的处理类 TableReducer //对输入hbase数据来统计处理的处理类
         * TableOutputFormat //来自hbase的格式 TableInputFormat //输入hbase的格式
         */
    }

    public static int run() throws Exception {
        Configuration conf = new Configuration();
        conf = HBaseConfiguration.create(conf);
        conf.set("hbase.zookeeper.quorum", "192.168.52.140");

        Job job = Job.getInstance(conf, "findFriend");
        job.setJarByClass(HbaseAndMapReduce.class);

        Scan scan = new Scan();
        // 取对业务有用的数据 tags, nickname
        scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"));
        scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"));

        // 数据来源 hbase
        // TableInputFormat.addColumns(scan, columns);

        // ImmutableBytesWritable来自hbase数据的类型
        TableMapReduceUtil.initTableMapperJob("blog", scan, FindFriendMapper.class, ImmutableBytesWritable.class,
                Result.class, job);
        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.52.140:9000/outhbase" + new Date().getTime()));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static class FindFriendMapper extends TableMapper<ImmutableBytesWritable, Result> {
        @Override

        // key是hbase中的行键
        // value是hbase中的所行键的所有数据
        protected void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Result>.Context context)
                        throws IOException, InterruptedException {
            System.out.println("key :: " + Bytes.toString(key.get()));

            List<Cell> cs = value.listCells();
            System.out.print("value :: ");
            for (Cell cell : cs) {
                String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
                long timestamp = cell.getTimestamp();
                String family = Bytes.toString(CellUtil.cloneFamily(cell));
                String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                String val = Bytes.toString(CellUtil.cloneValue(cell));
                System.out.println("RowKey=" + rowKey + ", Timestamp=" + timestamp + ", Family=" + family
                        + ", Qualifier=" + qualifier + ", Val=" + val);
            }
            super.map(key, value, context);
        }
    }

    public static void testData() {
        try {
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "192.168.52.140");
            Connection con = ConnectionFactory.createConnection(conf);
            Admin admin = con.getAdmin();

            TableName tn = TableName.valueOf("blog");
            if (admin.tableExists(tn)) {
                admin.disableTable(tn);
                admin.deleteTable(tn);
            }

            HTableDescriptor htd = new HTableDescriptor(tn);
            HColumnDescriptor hcd01 = new HColumnDescriptor("article");
            htd.addFamily(hcd01);
            HColumnDescriptor hcd02 = new HColumnDescriptor("author");
            htd.addFamily(hcd02);
            admin.createTable(htd);

            Table t = con.getTable(tn);
            Put put = new Put(Bytes.toBytes("1"));
            put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("content"),
                    Bytes.toBytes("HBase is the Hadoop database. Use it when you need random, "
                            + "realtime read/write access to your Big Data"));
            put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("HBase,NoSql,Hadoop"));
            put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("title"), Bytes.toBytes("Head First Hbase"));
            put.addColumn(Bytes.toBytes("author"), Bytes.toBytes("name"), Bytes.toBytes("zhangsan"));
            put.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("sansan"));

            Put put02 = new Put(Bytes.toBytes("10"));
            put02.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("Hadoop"));
            put02.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("xiaoshi"));

            Put put03 = new Put(Bytes.toBytes("100"));
            put03.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("hbase,nosql"));
            put03.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("superman"));

            List<Put> puts = Arrays.asList(put, put02, put03);
            t.put(puts);
            System.out.println("==========> 测试数据准备完成...");

            if (admin != null) {
                admin.close();
            }
            if (con != null) {
                con.close();
            }

        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

2.从hbase读取数据,经过处理后输到hdfs中

//从hbase表里面读取数据
//经过处理输到hdfs上
public class HbaseAndMapReduce02 {
    public static void main(String[] args) throws Exception {
        //完成的作业是: 有共同爱好的人
        System.exit(run());
        /*
         * TableMapper //对输出hbase数据来格式分割的处理类 
         * TableReducer //对输入hbase数据来统计处理的处理类
         * TableOutputFormat  //来自hbase的格式
         *  TableInputFormat  //输入hbase的格式
         */
    }

    public static int run() throws Exception {
        Configuration conf = new Configuration();
        conf = HBaseConfiguration.create(conf);
        conf.set("hbase.zookeeper.quorum", "192.168.52.140");

        Job job = Job.getInstance(conf, "findFriend");
        job.setJarByClass(HbaseAndMapReduce02.class);


        Scan scan = new Scan();
        //取对业务有用的数据 tags, nickname
        scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"));
        scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"));

        //数据来源 hbase
        //TableInputFormat.addColumns(scan, columns);
        //ImmutableBytesWritable来自hbase数据的类型
        TableMapReduceUtil.initTableMapperJob("blog", scan, FindFriendMapper.class, 
                Text.class,  Text.class, job);
        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.52.140:9000/outbasemapreduce" + new Date().getTime()));

        job.setReducerClass(FindFriendReducer.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static class FindFriendMapper extends TableMapper<Text, Text>{
        @Override
        //key是hbase中的行键
        //value是hbase中的所行键的所有数据
        protected void map(
                ImmutableBytesWritable key,
                Result value,
                Mapper<ImmutableBytesWritable, Result,Text, Text>.Context context)
                throws IOException, InterruptedException {

            Text v = null;
            String[] kStrs = null;
            List<Cell> cs = value.listCells();
            for (Cell cell : cs) {
                if ("tags".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    System.out.println(Bytes.toString(CellUtil.cloneValue(cell)).split(","));
                    kStrs = Bytes.toString(CellUtil.cloneValue(cell)).split(",");
                }
                else if ("nickname".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    v = new Text(CellUtil.cloneValue(cell));
                    System.out.println(v.toString());
                }
            }

            for (String kStr : kStrs) {
                context.write(new Text(kStr.toLowerCase()), v);
            }

        }
    }

    public static class FindFriendReducer extends Reducer<Text, Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            StringBuilder sb = new StringBuilder();
            for (Text text : values) {
                System.out.println(text.toString());
                sb.append((sb.length() > 0 ? ",":"") + text.toString());
            }
            context.write(key, new Text(sb.toString()));
        }
    }
}

3.从hdfs中读取数据,插入到hbase表中

//从hsfs文件中读取数据插入hbase中
public class HbaseAndMapReduce03 {
    public static void main(String[] args) throws Exception {
        System.exit(run());
    }

    public static int run() throws Exception {
        Configuration conf = new Configuration();
        conf = HBaseConfiguration.create(conf);
        conf.set("hbase.zookeeper.quorum", "192.168.52.140");

        Job job = Job.getInstance(conf, "findFriend");
        job.setJarByClass(HbaseAndMapReduce03.class);

        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(
                "hdfs://192.168.52.140:9000/outbasemapreduce1463486843454"));
        // 把数据写入Hbase数据库

        TableMapReduceUtil.initTableReducerJob("friend",
                FindFriendReducer.class, job);
        checkTable(conf);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    private static void checkTable(Configuration conf) throws Exception {
        Connection con = ConnectionFactory.createConnection(conf);
        Admin admin = con.getAdmin();
        TableName tn = TableName.valueOf("friend");
        if (!admin.tableExists(tn)){
            HTableDescriptor htd = new HTableDescriptor(tn);
            HColumnDescriptor hcd = new HColumnDescriptor("person");
            htd.addFamily(hcd);
            admin.createTable(htd);
            System.out.println("表不存在,新创建表成功....");
        }
    }

    public static class FindFriendReducer extends
            TableReducer<Text, Text, ImmutableBytesWritable> {
        @Override
        protected void reduce(
                Text key,
                Iterable<Text> values,
                Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)
                throws IOException, InterruptedException {

            Put put = new Put(key.getBytes());
            put.addColumn(Bytes.toBytes("person"), Bytes.toBytes("nicknames"),
                    values.iterator().next().getBytes());
            context.write(new ImmutableBytesWritable(key.getBytes()), put);
        }
    }
}

4.从hbase表中读取数据,插入到另一个表中

//从一个表读取数据插入到另一个表中
public class HbaseAndMapReduce04 {
    public static void main(String[] args) throws Exception {
        System.exit(run());
    }

    public static int run() throws Exception {
        Configuration conf = new Configuration();
        conf = HBaseConfiguration.create(conf);
        conf.set("hbase.zookeeper.quorum", "192.168.52.140");

        Job job = Job.getInstance(conf, "findFriend");
        job.setJarByClass(HbaseAndMapReduce04.class);
        Scan scan = new Scan();
        scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"));
        scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"));

        TableMapReduceUtil.initTableMapperJob("blog", scan, FindFriendMapper.class, 
                ImmutableBytesWritable.class, ImmutableBytesWritable.class,job);
        TableMapReduceUtil.initTableReducerJob("friend02", FindFriendReducer.class, job);

        checkTable(conf);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    private static void checkTable(Configuration conf) throws Exception {
        Connection con = ConnectionFactory.createConnection(conf);
        Admin admin = con.getAdmin();
        TableName tn = TableName.valueOf("friend02");
        if (!admin.tableExists(tn)){
            HTableDescriptor htd = new HTableDescriptor(tn);
            HColumnDescriptor hcd = new HColumnDescriptor("person");
            htd.addFamily(hcd);
            admin.createTable(htd);
            System.out.println("表不存在,新创建表成功....");
        }
    }

    public static class FindFriendMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable>{
        @Override

        //key是hbase中的行键
        //value是hbase中的所行键的所有数据
        protected void map(
                ImmutableBytesWritable key,
                Result value,
                Mapper<ImmutableBytesWritable, Result,ImmutableBytesWritable, ImmutableBytesWritable>.Context context)
                throws IOException, InterruptedException {

            ImmutableBytesWritable v = null;
            String[] kStrs = null;
            List<Cell> cs = value.listCells();
            for (Cell cell : cs) {
                if ("tags".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    kStrs = Bytes.toString(CellUtil.cloneValue(cell)).split(",");
                }
                else if ("nickname".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    v = new ImmutableBytesWritable(CellUtil.cloneValue(cell));
                }
            }
            for (String kStr : kStrs) {
                context.write(new ImmutableBytesWritable(Bytes.toBytes(kStr.toLowerCase())), v);
            }

        }
    }

    public static class FindFriendReducer extends
            TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
        @Override
        protected void reduce(
                ImmutableBytesWritable key,
                Iterable<ImmutableBytesWritable> values,
                Reducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, Mutation>.Context context)
                throws IOException, InterruptedException {
            Put put = new Put(key.get());
            StringBuilder vStr = new StringBuilder();
            for (ImmutableBytesWritable value : values) {
                vStr.append((vStr.length() > 0 ? ",":"") + Bytes.toString(value.get()));
            }
            put.addColumn(Bytes.toBytes("person"), Bytes.toBytes("nickname"),Bytes.toBytes(vStr.toString()));
            context.write(key, put);
        }
    }
}
网友评论
<