
需要通过Java程序提交Yarn的MapReduce的计算任务。与一般的通过Jar包提交MapReduce任务不同,通过程序提交MapReduce任务需要有点小变动,详见以下代码。
以下为MapReduce主程序,有几点需要提一下:
1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分。
2、为了控制reduce的处理过程,map的输出键的格式为组合键格式。与常规的<key,value>不同,这里变为了<textpair,value>,TextPair的格式为<key1,key2>。
3、为了适应组合键,重新设定了分组函数,即GroupComparator。分组规则为,只要TextPair中的key1相同(不要求key2相同),则数据被分配到一个reduce容器中。这样,当相同key1的数据进入reduce容器后,key2起到了一个数据标识的作用。
package web.Hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import util.Utils;
public class GEMIMain {
public GEMIMain(){
job = null;
}
public Job job;
public static class NamePartitioner extends
Partitioner<textpair, byteswritable=""> {
@Override
public int getPartition(TextPair key, BytesWritable value,
int numPartitions) {
return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
}
}
/**
* 分组设置类,只要两个TextPair的第一个key相同,他们就属于同一组。他们的Value就放到一个Value迭代器中,
* 然后进入Reducer的reduce方法中。
*
* @author hduser
*
*/
public static class GroupComparator extends WritableComparator {
public GroupComparator() {
super(TextPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
TextPair t1 = (TextPair) a;
TextPair t2 = (TextPair) b;
// 比较相同则返回0,比较不同则返回-1
return t1.getFirst().compareTo(t2.getFirst()); // 只要是第一个字段相同的就分成为同一组
}
}
public boolean runJob(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 在conf中设置outputath变量,以在reduce函数中可以获取到该参数的值
conf.set("outputPath", args[args.length - 1].toString());
//设置HDFS中,每次任务生成产品的质量文件所在文件夹。args数组的倒数第二个原数为质量文件所在文件夹
conf.set("qualityFolder", args[args.length - 2].toString());
//如果在Server中运行,则需要获取web项目的根路径;如果以java应用方式调试,则读取/opt/hadoop-2.5.0/etc/hadoop/目录下的配置文件
//MapReduceProgress mprogress = new MapReduceProgress();
//String rootPath= mprogress.rootPath;
String rootPath="/opt/hadoop-2.5.0/etc/hadoop/";
conf.addResource(new Path(rootPath+"yarn-site.xml"));
conf.addResource(new Path(rootPath+"core-site.xml"));
conf.addResource(new Path(rootPath+"hdfs-site.xml"));
conf.addResource(new Path(rootPath+"mapred-site.xml"));
this.job = new Job(conf);
job.setJobName("Job name:" + args[0]);
job.setJarByClass(GEMIMain.class);
job.setMapperClass(GEMIMapper.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(BytesWritable.class);
// 设置partition
job.setPartitionerClass(NamePartitioner.class);
// 在分区之后按照指定的条件分组
job.setGroupingComparatorClass(GroupComparator.class);
job.setReducerClass(GEMIReducer.class);
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
// job.setOutputKeyClass(NullWritable.class);
// job.setOutputValueClass(Text.class);
job.setNumReduceTasks(8);
// 设置计算输入数据的路径
for (int i = 1; i < args.length - 2; i++) {
FileInputFormat.addInputPath(job, new Path(args[i]));
}
// args数组的最后一个元素为输出路径
FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1]));
boolean flag = job.waitForCompletion(true);
return flag;
}
@SuppressWarnings("static-access")
public static void main(String[] args) throws ClassNotFoundException,
IOException, InterruptedException {
String[] inputPaths = new String[] { "normalizeJob",
"hdfs://192.168.168.101:9000/user/hduser/red1/",
"hdfs://192.168.168.101:9000/user/hduser/nir1/","quality11111",
"hdfs://192.168.168.101:9000/user/hduser/test" };
GEMIMain test = new GEMIMain();
boolean result = test.runJob(inputPaths);
}
}
以下为TextPair类
public class TextPair implements WritableComparable {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() {
return first + "\t" + second;
}
@Override
/**A.compareTo(B)
* 如果比较相同,则比较结果为0
* 如果A大于B,则比较结果为1
* 如果A小于B,则比较结果为-1
*
*/
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
//此时实现的是升序排列
return second.compareTo(tp.second);
}
}
以下为WholeFileInputFormat,其控制数据在mapreduce过程中不被切分
package web.hadoop;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class WholeFileInputFormat extends FileInputFormat<text, byteswritable=""> {
@Override
public RecordReader<text, byteswritable=""> createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return new WholeFileRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
// TODO Auto-generated method stub
return false;
}
}
以下为WholeFileRecordReader类
package web.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeFileRecordReader extends RecordReader<text, byteswritable=""> {
private FileSplit fileSplit;
private FSDataInputStream fis;
private Text key = null;
private BytesWritable value = null;
private boolean processed = false;
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
// fis.close();
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return this.key;
}
@Override
public BytesWritable getCurrentValue() throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return this.value;
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)
throws IOException, InterruptedException {
fileSplit = (FileSplit) inputSplit;
Configuration job = tacontext.getConfiguration();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(job);
fis = fs.open(file);
}
@Override
public boolean nextKeyValue() {
if (key == null) {
key = new Text();
}
if (value == null) {
value = new BytesWritable();
}
if (!processed) {
byte[] content = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
System.out.println(file.getName());
key.set(file.getName());
try {
IOUtils.readFully(fis, content, 0, content.length);
// value.set(content, 0, content.length);
value.set(new BytesWritable(content));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
IOUtils.closeStream(fis);
}
processed = true;
return true;
}
return false;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return processed ? fileSplit.getLength() : 0;
}
}
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
在神经网络设计中,“隐藏层个数” 是决定模型能力的关键参数 —— 太少会导致 “欠拟合”(模型无法捕捉复杂数据规律,如用单隐 ...
2025-10-21在特征工程流程中,“单变量筛选” 是承上启下的关键步骤 —— 它通过分析单个特征与目标变量的关联强度,剔除无意义、冗余的特 ...
2025-10-21在数据分析全流程中,“数据读取” 常被误解为 “简单的文件打开”—— 双击 Excel、执行基础 SQL 查询即可完成。但对 CDA(Cert ...
2025-10-21在实际业务数据分析中,我们遇到的大多数数据并非理想的正态分布 —— 电商平台的用户消费金额(少数用户单次消费上万元,多数集 ...
2025-10-20在数字化交互中,用户的每一次操作 —— 从电商平台的 “浏览商品→加入购物车→查看评价→放弃下单”,到内容 APP 的 “点击短 ...
2025-10-20在数据分析的全流程中,“数据采集” 是最基础也最关键的环节 —— 如同烹饪前需备好新鲜食材,若采集的数据不完整、不准确或不 ...
2025-10-20在数据成为新时代“石油”的今天,几乎每个职场人都在焦虑: “为什么别人能用数据驱动决策、升职加薪,而我面对Excel表格却无从 ...
2025-10-18数据清洗是 “数据价值挖掘的前置关卡”—— 其核心目标是 “去除噪声、修正错误、规范格式”,但前提是不破坏数据的真实业务含 ...
2025-10-17在数据汇总分析中,透视表凭借灵活的字段重组能力成为核心工具,但原始透视表仅能呈现数值结果,缺乏对数据背景、异常原因或业务 ...
2025-10-17在企业管理中,“凭经验定策略” 的传统模式正逐渐失效 —— 金融机构靠 “研究员主观判断” 选股可能错失收益,电商靠 “运营拍 ...
2025-10-17在数据库日常操作中,INSERT INTO SELECT是实现 “批量数据迁移” 的核心 SQL 语句 —— 它能直接将一个表(或查询结果集)的数 ...
2025-10-16在机器学习建模中,“参数” 是决定模型效果的关键变量 —— 无论是线性回归的系数、随机森林的树深度,还是神经网络的权重,这 ...
2025-10-16在数字化浪潮中,“数据” 已从 “辅助决策的工具” 升级为 “驱动业务的核心资产”—— 电商平台靠用户行为数据优化推荐算法, ...
2025-10-16在大模型从实验室走向生产环境的过程中,“稳定性” 是决定其能否实用的关键 —— 一个在单轮测试中表现优异的模型,若在高并发 ...
2025-10-15在机器学习入门领域,“鸢尾花数据集(Iris Dataset)” 是理解 “特征值” 与 “目标值” 的最佳案例 —— 它结构清晰、维度适 ...
2025-10-15在数据驱动的业务场景中,零散的指标(如 “GMV”“复购率”)就像 “散落的零件”,无法支撑系统性决策;而科学的指标体系,则 ...
2025-10-15在神经网络模型设计中,“隐藏层层数” 是决定模型能力与效率的核心参数之一 —— 层数过少,模型可能 “欠拟合”(无法捕捉数据 ...
2025-10-14在数字化浪潮中,数据分析师已成为企业 “从数据中挖掘价值” 的核心角色 —— 他们既要能从海量数据中提取有效信息,又要能将分 ...
2025-10-14在企业数据驱动的实践中,“指标混乱” 是最常见的痛点:运营部门说 “复购率 15%”,产品部门说 “复购率 8%”,实则是两者对 ...
2025-10-14在手游行业,“次日留存率” 是衡量一款游戏生死的 “第一道关卡”—— 它不仅反映了玩家对游戏的初始接受度,更直接决定了后续 ...
2025-10-13