发布网友 发布时间:2022-04-29 21:28
共1个回答
热心网友 时间:2022-06-23 04:25
自定义 MR 实现如下逻辑
proct_no lac_id moment start_time user_id county_id staytime city_id13429100031 22554 8 2013-03-11 08:55:19.151754088 571 571 282 571
13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571
13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 103 571
13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571
13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 571
13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571
13429100140 26642 9 2013-03-11 09:02:19.151754088 571 571 18 571
13429100082 22691 8 2013-03-11 08:57:32.151754088 571 571 287 571
13429100189 22558 8 2013-03-11 08:56:24.139539816 571 571 48 571
13429100349 22503 8 2013-03-11 08:54:30.152622440 571 571 211 5711234567891011
字段解释:
proct_no:用户手机号;
lac_id:用户所在基站;
start_time:用户在此基站的开始时间;
staytime:用户在此基站的逗留时间。
需求描述:
根据 lac_id 和 start_time 知道用户当时的位置,根据 staytime 知道用户各个基站的逗留时长。根据轨迹合
并连续基站的 staytime。最终得到每一个用户按时间排序在每一个基站驻留时长。
期望输出举例:
13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571
13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 390 571
13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571
13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571
13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 57112345
分析上面的结果:
第一列升序,第四列时间降序。因此,首先需要将这两列抽取出来,然后自定义排序。
实现如下:
package FindFriend;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.maprece.Job;import org.apache.hadoop.maprece.Mapper;import org.apache.hadoop.maprece.Recer;import org.apache.hadoop.maprece.lib.input.FileInputFormat;import org.apache.hadoop.maprece.lib.output.FileOutputFormat;public class StringComp2 {
final static String INPUT_PATH = "hdfs://master:8020/liguodong/test2"; final static String OUT_PATH = "hdfs://master:8020/liguodong/test2out"; public static void main(String[] args) throws IOException,
URISyntaxException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration(); final FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf); if(fs.exists(new Path(OUT_PATH))){
fs.delete(new Path(OUT_PATH),true);
}
Job job = Job.getInstance(conf, "date sort");
job.setJarByClass(StringComp2.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(NewK2.class);
job.setMapOutputValueClass(Text.class); //job.setCombinerClass(MyRecer.class);
job.setRecerClass(MyRecer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job,new Path(OUT_PATH));
System.exit(job.waitForCompletion(true)?0:1);
} static class MyMapper extends Mapper<LongWritable, Text, NewK2, Text>{ @Override
protected void map(LongWritable k1, Text v1,
Context context) throws IOException, InterruptedException { //这里采用正则表达式抽取出了proct_no 与 start_time列的数据。
Pattern pattern = Pattern.compile
("([\\d]{11})|([\\d]{4}-[\\d]{2}-[\\d]{2} [\\d]{2}:[\\d]{2}:[\\d]{2}.[\\d]{9})");
Matcher matcher = pattern.matcher(v1.toString());
matcher.find();
String str1= matcher.group();
matcher.find();
String str2= matcher.group(); final NewK2 k2 = new NewK2(str1, str2); //System.err.println(stringBuilder);
context.write(k2, v1);
}
} static class MyRecer extends Recer<NewK2, Text, Text, NullWritable>{ @Override
protected void rece(NewK2 k2, Iterable<Text> v2s, Context context) throws IOException, InterruptedException { for (Text v2 : v2s) {
context.write(v2,NullWritable.get());
}
}
} static class NewK2 implements WritableComparable<NewK2>{
String first;
String second; public NewK2(){} public NewK2(String first, String second){ this.first = first; this.second = second;
} @Override
public void readFields(DataInput in) throws IOException { this.first = in.readUTF(); this.second = in.readUTF();
} @Override
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeUTF(second);
} /**
* 当k2进行排序时,会调用该方法.
* 当第一列不同时,升序;当第一列相同时,第二列降序
*/
@Override
public int compareTo(NewK2 o) { final int minus = compTo(this.first,o.first); if(minus != 0){ return minus;
} return -compTo(this.second,o.second);
} //仿照JDK源码String类的compareTo方法进行实现,
//我发现直接使用String类的compareTo方法,并不能得到我想要的结果(第一列升序,第二列降序)。
public int compTo(String one,String another) { int len = one.length(); char[] v1 = one.toCharArray(); char[] v2 = another.toCharArray(); int k = 0; while (k < len) { char c1 = v1[k]; char c2 = v2[k]; if (c1 != c2) { return c1 - c2;
}
k++;
} return 0;
} @Override
public int hashCode() { return this.first.hashCode()+this.second.hashCode();
} @Override
public boolean equals(Object obj) { if(!(obj instanceof NewK2)){ return false;
}
NewK2 oK2 = (NewK2)obj; return (this.first==oK2.first)&&(this.second==oK2.second);
}
}
}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
运行结果: