hadoop的HDFS和MapReduce本身都是用户处理许多数据的大文件,关于小文件来说,因为namenode会在记载每个block政策,假设存在许多的小文件,会占用namenode的许多内存空间,并且HDFS存储文件是按block来存储,即使一个appstore文件的巨细不足一个block的巨细,文件仍是会占用一个block的shell脚本根本指令存储空间,所以许多的小文件会对HDFS的存储和拜访都带来倒霉的影响。
hadoop关于小文件的处理首要有Hadoop Archive,Sequence file和CombineFileInShellputFormat三种办法。

Hadoop Archive

Hadoop Archive是hadoop的归档approve指令,能够将hdfs上的小文件打包成一shell脚本个har文件,这种办法虽然不会减少小文件占用许多存储空间的问题,可是会减少namenodeshell脚本根本指令的内存空间。一起har文件支撑hdfs指令对shell脚本编程100例其的拜访。

指令:hadoop archive -archiveName 归档称谓 -p 父目录 [-r <拷贝因子>] 原途径(能够多个) 意图途径

-archiveNames设置归档生成文件的名字

approvep 需求进行归档的文件的父目录

比如:

$ hadoop fs -ls /shell脚本编程100例user/teshell编程st/yhj/input/
Found 3 items
-rwappearance-r--r--   3 root hdfs        760 2018-07-04 11approve:48 /user/test/yhj/input/word1.txt
-rw-r--r--   3 root hdfs         82 2018-07-04 11:48 /user/tshell指令est/yhj/input/word2.txt
-rw-r--r--   3 root hdfs       1738 2018-07-04 11:4shell脚本编程100例8 /user/teshell怎样读st/yhj/input/word3.txt
$ hadoop archive -archiAPPveName wordappear.har -p /user/test/yhj/input/ word1.txt word2.txt word3.txt /user/test/yhj/harInput/
$ hadoop fs -ls /user/test/yhj/harInput/
Found 1 items
drwxr-xr-x   - hdfs hdfs          0 2018-07-05 20:18 /user/tshell是什么意思中文est/yhj/harInput/worapproved.har

HAR文件的生成是经过作业一appreciate个mapreduce的程序生成,所以需求集群环境中装个mapreduce

HappearAR是在Hadoop file system之上的一个文件系统,因此悉数fs shell指令对HapproachAR文件均可用,但运用不同的URI。别的,请shell是什么意思中文注意档案是不可变的。所以,重命名,删去并创立回来一个过失,例如:

$ hadoop fs -ls /user/test/yhj/harInput/word.har
Found 4 items
-rw-r--r--   3 hdfapproachs hdfs          0 2018-07-05 20:18 /user/test/yhj/harInput/word.har/_SUCCESS
-rw-r--r--   5 hdfs hdfs        255 2018-07-05 20:18 /user/testAPP/yhj/harappreciateInpuappreciatet/word.har/_index
-rw-r--r--   5 hdfs hdfs         22 2018-07-05appearance 20:18 /user/test/yhj/harInput/word.har/_masterindex
-rw-r--r--   3 hdfs hdfs       2580 2018-07-05 20:18 /user/test/yhjShell/harInput/word.har/part-0
$ hadoopshell指令 fs -ls har:/user/testshell脚本根本指令/yhj/harInput/word.har
Found 3 items
-rw-r--r--   3 hdfs hdfs        760 2018-07-04 11:48 har:///user/test/yhj/harInput/word.har/word1.txt
-rw-r--r--   3 hdfs hshell指令dfs         82 2018-07-04 11:48 har:///user/teshell脚本st/yhappleidj/harInput/word.har/word2.txt
-rw-r--r--   3 hdfs hdfs       1738shell脚本 2018-07-04 11:48 har:///user/test/yhj/harInput/wordshell编程.har/word3.txt

能够看到Hadoop存档目录包括元数据(选用_index和_masterindex办法)、数据部分datshell编程a(part- *)文件、归档文件的称谓和部分文件中的方位(_ishellyndex文件)。

HAR文件也能够被mapreduce读取,途径的URI能够运用不同的URI,比如比如中的文件输入的途径URI能够下面两种办法运用

hShelldfs://10.1.13.111:8020/user/test/yhj/hShellarInpshellyut/word.har
har://hdfs-10.approve1.13.111:8020/user/test/yshell怎样读hj/harInput/word.har

可是这个比如的文件来说,两个输入途径发生map的个数是不同的appear,har的途径发生的map有三个,对approach应三个word*.txt,而hdfsshell是什么意思中文的途径只需一个,对应word.happ装置下载ar/part-0

假设是文件支撑行记载切分运用mapreduce来处理数据(文件的前后数据不相互影响),建议运用hdfs的URI途径appearance,因为存档目录的part-*可能包括多个小文件的数据,这样能够减少map的个数,不会为每个单独的小文件建议一application个map。

CombineFileInputFormat

将许多小文件做为mapreduce的输入是不合适的,因为FileInputFormappearat只会切割大文件(文件巨细逾越设定的分片巨细,默以shell是什么意思中文为HDFS的块approve巨细),关于小于分片巨细的文件,每个文件作为一个分片,假设文件大shell是什么意思中文小小于一个块的巨细,mapreduce会为每个小文件发生一个map,这样会发生许多小文件,appstore而每个map只会处理少量数据,每次map操作都会发生开支。当然能够经过mapreshellfishd.min.split.size和mapred.max.split.size来操appearance控map数量。

CombineFileInputFormat是mapreduce针对小文件而规划的approve,CombineFileInputFormat能够将多个小文件打包进一个分片,别的,比直接设置map数量好的在于,CombineFileInputFormat在决议将那shell编程些块放入一个分片是会考虑到块地址的节点和机架的方位,避approach免操作分片是过多的数据传输。

CombineFileInputFormat是一个抽象类,hadoopapprove自带的结束的有shell脚本编程100例CombineTextInputFormat,咱们能够经过继承CombineFileInputFormat结束createRecordReader办法,自定义RecordReader类来结束理海量小文件的MapReduce。

InputForapp装置下载mat首要有两个办法,getSplits(计算得到分片),creatappstoreeRecordReader(发生回来RecordReader,RecordReader生成输出map读入的键值对)

CombineFileInputFoshell编程rmat中现已结束了getSplits,即将多个小文件打包进一个分片shelly中CombineFileSplit,咱们需求结束createRecordReader办法,回来一个可shell编程以读取该分片中内容的RecordReader。

MyCombineInputFormat的结束

public class MyCombineInputFormat extends CombineFileInputFormat<LongWritable, Text>{
@Override
public RecordReader createRecordReader(appreciateInpushellfishtSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
Rshell脚本编程100例ecordReader<LongWritable, TAPPext> reader = newShell CombineFileRecordReappstoreaderapprove<>((CombiShellneFileSplit) inputSplit, taskAttemptContext, MyCombineFileRecordReader.class);
try {
reader.initialize(inputSplit, tappearaskAtapproachtemptContext);
} catch (InterruptedException e) {
e.appearanceprintStackTrace();
}
return reader;appreciate
}
}

这儿实践回来了一个appstoreCombineFileRecordReader的政策,CombineFileRecordReader经过approveCombineFileSplit,context和Clshellyass<? extends RecordReappreciateadshelly-laner>shell脚本编程100例类型结构,MyCombineFileRecordReader是咱们关于CombineFileSplit中每一个文件的发生map的输入的办法。
CombineFileRecordReader中的nextKeyVaShelllue办法,会为每一个打包在CombineFileSplit中的文件结构一个RecordReader方Shell法,读取文件中的记载。

public class CombineFileRecorapproachdReadeShellr<K, V> extends RecordReader<K, V> {
...
publicapprove CombineFileRecordReader(CombineFileSplit split, TaskAttempshell怎样读tContextappear context, Class<? extends RecordReader<Kshellfish, V>> rrClass) throws IOExshell是什么意思中文ceptionshell指令 {
this.split = split;
this.context = context;
this.idx = 0;
this.curReashell脚本根本指令der = null;
this.progresapproachs = 0L;
trappstorey {
this.rrConshell脚本编程100例structorapple = rrClass.getDeclaredConstructor(cAPPonstructorSignature);app装置下载
this.rrConstructor.setAccessible(true);
} catch (Exception var5) {
throw new RuntimeException(rrClass.getName() + " does not have valid constructor", var5);
}
this.initNextRecordReader();
}
protected boolean initNextRecordshell编程Reader() throws IOException {
if(this.curReader != null) {
this.curReader.close();
this.curReader = null;
ifshell指令(this.idx > 0) {
this.progress += this.split.getLength(this.idx - 1);
}
}
if(this.idx == this.split.getNumapproachPaths()) {
return false;
} else {
this.context.progress();
trappreciatey {
Configuration conf = this.context.getConfiguration();
conf.set("mapreduce.map.input.file", this.split.getPath(this.idx).toString()shell编程);
conf.shellysetLong("mapreduce.mappearap.input.start", this.split.getOffset(this.idx));
conf.setLong("mapreduce.map.input.lengtapp装置下载h", this.split.getLength(this.idx));
this.curReader = (RecordReader)this.rrConstructor.newInstanshellyce(new Object[]{this.split, this.context, Inshell编程teger.valueOf(this.idx)});
if(this.idx >shell脚本根本指令 0) {
this.curReader.initialize(this.split, this.context);
}
} catch (Exception var2) {
throw new Rshell脚本编程100例untimeException(var2);
}
++this.idx;
return true;
}
public booleaappearn nextKeyValue(application) throws IOExshell编程ception, InterruptedException {
do {
if(this.curReader != null && this.curReader.nextKeyValue()) {
return true;
}
} while(this.ishellynitNextRecordReader());
return false;shell脚本编程100例
}
public K getCurrentKey() throwapp装置下载s IOException,APP InterruptedException {
return this.curReader.getCurrentKey();
}
public V getCurrentValappearanceue() throws IOException, Intshell脚本根本指令erruptedappstoreException {
return this.curReader.getCurrentValue();
}
...
}

在nextKeyValue办法中经过自定义的RecordReadappearer的nextKeyValue读取当时文件的政策,当读app装置下载完当时文件中的信息,后会经过initNextRecordReader回来初始化的下一个文件的RecordReader,所以咱们只需结束相应的读取一个文件的RecordReader即可。

MyCombineFileRecordReader的结束

public clshell脚本根本指令ass MyCombineFileRecordReader extendappstores RappreciateecordReader<LongWritable, Text&gtapple; {
private Cshell脚本根本指令ombineFileSplit cshell脚本编程100例ombineFileSplit;
private int currentIndex;
private LineRecordReapproveader reader = nappearanceew LineRecordReader();
private int totalNum;
public MyCombineFileRecordReader(CombineFileSplit combinshell指令eFileSplit, TaskAttemptContext context, Integer index){
super();
this.combineFileSplappreciateit = cshelly-lanombineFileSplit;
this.currentIndex = index;
this.totalNum = combineFileSplit.getNumPaths();
}
@Overrappstoreide
publappleic void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
FileSplit fileSplit = new FileSplit(combineFileshelly-lanSplit.getPath(currentIndapproveex), combineFileSplit.getOffset(currentIndex),
combineFileSplit.getLength(currentappreciateIndex), combinappstoreeFileSplit.getappleidLocations());
contexapprovet.shell脚本编程100例getConfiguration().set("maprshell是什么意思中文educe.map.input.file.name", fileSplit.getPashell怎样读th().getName());
this.reader.initialize(fileSplit, cshell脚本根本指令ontextAPP);
}
@Override
public booleanshellfish nextKeyValue() throws IOException, InterruptedException {
if(currentIndex >= 0 && currentIndex < totalNum){
return reader.nextKeyValue();
}else {
returshell脚本编程100例n false;
}
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return reader.getCurrentKey();
}
@Override
public Tshell脚本根本指令ext getCurrentValue() throws IOException, InterruptedException {
return reader.getCurrentValue();
}
@Override
public float getPrapproveogress() throws IOException, InterruptedException {
if(currentIndex >= 0 && currentIndex < totalNushell怎样读m){
rapproveeturn (float)currentIndex/totalNum;
}
return 0;
}
@Override
public void close() throws IOException {
reader.close();
}
}

MyCombineFilshell怎样读eRecordReader中经过LineRecordReader按行来读取文本记载,在inappleitialize办法中经过CombineFileSplit和index(shell脚本编程100例CombineFileSplit中文件信息的索引方位)来得到相应文件的信息,创立对应的FileSplit,接着创立LineRecordshell脚本根本指令Reader政策,在nextKeyValue中托付给LineRecordReader为mappshellyer发生键-值政策。

最后进口函数和appleidmap类的结束,将shell是什么意思中文InputFormatClass替换成自定义的MyCombineInputFormat类

public class CombineInputFromatMain extends Configured implemeshell脚本根本指令nts Tool{
public static class CombineInputFormatMap extends Mapper<Object, Text, Text, Text>{
private TeShellxt outKey = new Text();
@Override
protected void map(Object key, Text valueshell指令, Context context) throws IOException, InterruptedException {
outKey.set(context.getConfiguration().geshellyt("mapreduce.map.input.file.name"));
context.write(outKey, value);
}
}
@Override
pshell脚本编程100例ublic int run(String[] args) throws Exception {
//设shell脚本编程100例定默许job和设置输入输出途径的函数
Job job = JobDefaultInit.getClusterDefaultJob(this, getConf(shell脚本根本指令), args);
job.setJobName("CombineInputFormat Text");
job.setJarByClass(CombineInputFromatMain.class);
job.setMapperClass(CombineInputFoappearrmatMap.class);
job.setOutputKeyClass(Text.class);
jshell是什么意思中文ob.setOutputValueClashellfishss(Text.class);
job.setNumReduceTasks(0);
job.setInputFormatClashellfishss(MyCombineInputFormat.class);
returappearn job.waitForCompletion(true) ? 0:1;
}
public static void main(String[] args) throws Exception {
System.exshell指令it(ToolRshell是什么意思中文unner.run(neshelly-lanw CombineappstoreInputFromatMappstoreain(), args));
}
}

在这比如中将三个word*.txt文件打包进一个分片,实践只发生了一个map。

Sequence file

sequence file由一系列的二进制key/value组成,假设为key小文件名,value为文件内容shell指令,则能够shell编程将大批小文件合并成一个大文件。

次序文件由文件头和shellfish随后的记载内容组成,次序文件的前三个字节为SEQ(次序文件代码),紧接着shell脚本编程100例一个字节表明文件的版本号,文件头还包括键和值的类型,数据是否紧缩shelly的标志位,是否进行快紧缩的标志位, 数据的紧缩办法,用户自定义的数据以及同步标识。次序文件读取内容只能从同步标识开端读取。同步标识位appleid于记载和记载之间,也就是说无法从记载中心开端读取次序文件的内容。

Sequence file的格局首要有三种,分为未紧缩,记载紧缩和块紧缩。首要格局的存储办法能够查看官shell脚本根本指令方给出的apapplei:
hadoop.apache.org/docs/curren…

将小文件合并成一个sequshell脚本根本指令ence file的结束(代码参shell怎样读看hadoop 权威攻略)

public class SmallFilesToSequenceFileConvappearerter extends Configured implements Tool {
public static class WholeFileInputFormat extends FileInputFormat<LongWritable, Text>{
/**
*approach 不切分文件,一个split读入整个文件
* @param context
* @paramshell脚本编程100例 filename
* @return
*/
@Override
protected boolean isSplitaapproachble(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(Ishell指令nputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
RecordReader reader = new WholeFileRecordReader();
reappleidadAPPer.initialize(inputSplit, taskAttemptContext);
reappreciateturn reader;Shell
}
}
/**
* 自定义RecordReader,读取整个小文件内容
*/
publshell指令ic static class WshellfishholeFileRecapproachordReader extends RecordReader<approveLongWritable, Text>{
private FileSplit fileSplit;
private Configuration conf;
private LongWritable key = new LongWritable();
privshellyate Text value = new Textshellfish();
private boolean process = false;
@Override
public void iniappreciatetialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)approach throws IOExceptiapp装置下载on, InterrupteappleiddException {
tapp装置下载his.fileSplit = (FileSshelly-lanplit)inputSplshell指令it;
this.conf = taskAttemptContext.getConfigapplicationuration();
}
@Override
publiapplec boolean nshellfishextKeyValue() throws IOException, InterruapproachptedException {
if(!process){
FileSystem fsshelly = fileSplitappreciate.getPath().getFileSystem(conf);
FSDataInputStream in = null;
try {
in = new FSDatappearanceaInputStream(fs.open(fileappearanceSplit.getPath()));
byte[] contextByte = new byte[(int)fileSplit.appleidgetAPPLength()];
IOUtils.readFullshell是什么意思中文y(in, contextByte, 0, conteshellyxtByte.length);
//等同于 in.read(contexshellytshell指令Byte, 0, cShellontextByte.lengtshellyh);
String context = new String(contextByte, "utf-8");
key.set(fileSplit.getStart());
value.set(context);
}finally {
IOUtils.closeStreamshell是什么意思中文(in);
}
procShelless = true;
returnapple true;
}
return false;
}
@Override
public Lonshell指令gWritable getCurrappearanceentKey() throws IOException, InterruptedException {
return key;
}
@shell编程Override
public Text getCurrentValushelly-lane() throws IOException, InterrushellfishpappearancetedExshell脚本根本指令ception {
return value;
}
@Override
pubshellylic float getshellfishProgress() throws IOappearExcepshell脚本编程100例tion, InterruptedException {
return process? 1.0f:1.0f;
}
@Override
public void close() thshelly-lanrows IOException {
}
}
public static class SmallFilesToSequenceFileMap extends Mapper&appreciatelt;Objeappreciatect, Text, Textshell脚本, Text>{
private Text outKey = new Text();
@Overshell脚本根本指令ride
proappletected void setup(Context context) throws IOException, InterruptedException {
outKey.set(appstore((FileSplit)context.getInputSpappreciatelit(shell指令)).getPath().toString());
}
@Override
protected void map(Object key, Text value, Context context) throws IOExappstoreception, InterruptedException {
context.write(outKey, value);
}
}
@Override
pushell指令blic int run(String[]shelly-lan args) throws Excepshellfishtion {
//设定默许job和设置输入输出途径的函数
Job job = JobDefaultInit.getClusterDefaultJob(this, getConf(), args);
job.setJobName("SmallFiles To SequenceFile");
job.setMapperClass(SmallFilesToSequenceFileMap.appstoreclass);
job.setInputFormatClass(WholappstoreeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOshell编程utputFormat.class);
job.setOutputKeyClass(TexAPPt.class);
joappleidb.setOutputVappreciatealueClass(Text.classappear);
return job.waitForCompletion(true)? 0:1;
}
public static void mainappear(String[] args) throws Exception {
System.exit(ToolRunner.run(new SmallFilesToSequenceFileCappleidonverter(), args));
}
}

hdfsshell脚本编程100例能够经过指令行hadoop fs -text来闪现以文本的办法闪现apple次序文件

读取SequappleenceFilshell指令e简略结束

public class SequenceFileReadMapproachain extends Configured implements Tool{
public static class SequenceFileReadMap extends Mapper&lAPPt;Text, Text, Text, Text&shellygt;{
private Text oShellutKey = new TeAPPxt();
private Text outValue =shell指令 new Text();
@Override
protected void map(Text key, Text value, Context context) throws IOException, Inshell是什么意思中文terruptedException {
outKey.set("key : " + keyshell脚本根本指令.toString());shell编程
outValue.set("value : " + value.toString());
context.wriappearte(outKey, outValue);
}
}
@Override
public int run(String[] args) throws Exception {
Job job = JobDefaultInit.getCluapproachsterDefaultJob(this, gappreciateetConf(), args);
job.setJobName("Sequence File Read");
job.setMapperClass(SequenceFileReadMap.clapproachass);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInapplicationputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
return job.waitForCompletion(true)?0:1;shell脚本编程100例
}
public static void main(SShelltring[] args) throws Exception{
System.exit(ToolRunner.run(neshell编程w SequenceFileReadMain(), args));
}
}

这时分读取SequenceFile的时分,key对应的是小文件的名字,vapp装置下载alue是一个小文件的悉数内容,shell怎样读所以需求在map编写处理整个小文件内容的代码

参看材料:

blog.csdn.net/u011007180/…

www.cnblogs.com/Shellstaryea/pappstore/8…

dongxicheng.org/mapreduce/h…