package org.dataguru.station;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;



import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class BaseStationDataPreprocess extends Configured implements Tool {

    /**
     * 计数器 用于计数各种异常数据
     */
    enum Counter {
        TIMESKIP, // 时间格式有误
        OUTOFTIMESKIP, // 时间不在参数指定的时间段内
        LINESKIP, // 源文件行有误
        USERSKIP // 某个用户某个时间段被整个放弃
    }

    public static class Map extends Mapper<LongWritable, Text, Text, Text> {
        String date;
        String[] timepoint;
        boolean dataSource;

        @Override
        protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {

            this.date = context.getConfiguration().get("date"); // 读取日期
            this.timepoint = context.getConfiguration().get("timepoint").split("-"); // 读取时间分割点

            // 提取文件名
            FileSplit fs = (FileSplit) context.getInputSplit();
            String fileName = fs.getPath().getName();
            if (fileName.startsWith("POS"))
                dataSource = true;
            else if (fileName.startsWith("NET"))
                dataSource = false;
            else
                throw new IOException("File Name should starts with POS or NET");
        }

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            TableLine tableLine = new TableLine();

            try {
                tableLine.set(line, this.dataSource, this.date, timepoint);
            } catch (LineException e) {
                if (e.getFlag() == -1)
                    context.getCounter(Counter.OUTOFTIMESKIP).increment(1);
                else
                    context.getCounter(Counter.TIMESKIP).increment(1);
                return;
            } catch (Exception e) {
                context.getCounter(Counter.LINESKIP).increment(1);
                return;
            }
            context.write(tableLine.outKey(), tableLine.outValue());

        }

    }

    /**
     * 统计同一个IMSI在同一时间段 在不同CGI停留的时长
     */
    public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {
        private String date;
        private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        @Override
        protected void setup(Reducer<Text, Text, NullWritable, Text>.Context context)
                throws IOException, InterruptedException {
            this.date = context.getConfiguration().get("date"); // 读取日期
        }

        /**
         * values:相同imsi和相同timeFlag的 作为的key
         */
        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, NullWritable, Text>.Context context)
                throws IOException, InterruptedException {
            String imsi = key.toString().split("\\|")[0];
            String timeFlag = key.toString().split("\\|")[1];

            // 用一个TreeMap记录时间 key:输出时间 value:位置代码
            TreeMap<Long, String> uploads = new TreeMap<Long, String>();
            String valueString;
            for (Text value : values) {
                valueString = value.toString();
                try {
                    uploads.put(Long.valueOf(valueString.split("\\|")[1]), valueString.split("\\|")[0]);
                } catch (NumberFormatException e) {
                    context.getCounter(Counter.TIMESKIP).increment(1);
                    continue;
                }

            }

            try {
                // 在最后添加“OFF”位置
                Date tmp = this.formatter.parse(this.date + " " + timeFlag.split("-")[1] + ":00:00");
                uploads.put((tmp.getTime() / 1000L), "OFF");
                // 汇总数据
                HashMap<String, Float> locs = getStayTime(uploads);

                // 输出
                for (Entry<String, Float> entry : locs.entrySet()) {
                    StringBuilder builder = new StringBuilder();
                    builder.append(imsi).append("|");
                    builder.append(entry.getKey()).append("|");
                    builder.append(timeFlag).append("|");
                    builder.append(entry.getValue());

                    context.write( NullWritable.get(), new Text(builder.toString()) );
                }
            } catch (Exception e) {
                context.getCounter(Counter.USERSKIP).increment(1);
                return;
            }

        }

        /**
         * 获得位置停留信息
         * 
         * @param uploads
         *            一个TreeMap记录时间 key:输出时间 value:位置代码
         * @return 一个TreeMap记录位置停留信息 key:位置代码 value:停留时间
         */
        private HashMap<String, Float> getStayTime(TreeMap<Long, String> uploads) {
            Entry<Long, String> upload, nextUpload;
            HashMap<String, Float> locs = new HashMap<String, Float>();
            // 初始化
            Iterator<Entry<Long, String>> it = uploads.entrySet().iterator();
            upload = it.next();
            // 计算
            while (it.hasNext()) {
                nextUpload = it.next();
                float diff = (float) (nextUpload.getKey() - upload.getKey()) / 60.0f;
                if (diff <= 60.0) {
                    if (locs.containsKey(upload.getValue()))
                        locs.put(upload.getValue(), locs.get(upload.getValue()) + diff);
                    else
                        locs.put(upload.getValue(), diff);
                }
                upload = nextUpload;
            }
            return locs;

        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        conf.set("date", "2013-09-12");
        conf.set("timepoint", "07-09-17-24");
        String input = "hdfs://192.168.25.137:9000/input/station";
        String output = "hdfs://192.168.25.137:9000/user/hdfs/station";

        Job job = new Job(conf, "BaseStationDataPreprocess");
        job.setJarByClass(BaseStationDataPreprocess.class);

        FileInputFormat.addInputPath( job, new Path(input) );            //输入路径
        FileOutputFormat.setOutputPath( job, new Path(output) );        //输出路径

        job.setMapperClass( Map.class );                                //调用上面Map类作为Map任务代码
        job.setReducerClass ( Reduce.class );                            //调用上面Reduce类作为Reduce任务代码
        job.setOutputFormatClass( TextOutputFormat.class );
        job.setOutputKeyClass( Text.class );
        job.setOutputValueClass( Text.class );

        job.waitForCompletion(true);

        return job.isSuccessful() ? 0 : 1;

    }

    public static void main(String[] args) throws Exception {

        // 运行任务
        int res = ToolRunner.run(new Configuration(), new BaseStationDataPreprocess(), args);

        System.exit(res);
    }

}

results matching ""

    No results matching ""