`

Hadoop无法处理中文问题解决方案

阅读更多

由于Hadoop默认编码为UTF-8,并且将UTF-8进行了硬编码,所以我们在处理中文时需要重写OutputFormat类。方法为:

1、新建类GBKFileOutputFormat,代码如下:
import java.io.DataOutputStream;  
import java.io.IOException;  
import java.io.UnsupportedEncodingException;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
import org.apache.hadoop.io.NullWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.compress.CompressionCodec;  
import org.apache.hadoop.io.compress.GzipCodec;  
import org.apache.hadoop.mapreduce.OutputFormat;  
import org.apache.hadoop.mapreduce.RecordWriter;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.util.*;  
  
/** An {@link OutputFormat} that writes plain text files. */  
public class GBKFileOutputFormat<K, V> extends FileOutputFormat<K, V> {//TextInputFormat是默认的输出文件格式  
  protected static class LineRecordWriter<K, V>//默认  
    extends RecordWriter<K, V> {  
    private static final String utf8 = "GBK";  //硬编码,将“UTF-8”改为“GBK”  
    private static final byte[] newline;//行结束符?  
    static {  
      try {  
        newline = "\n".getBytes(utf8);  
      } catch (UnsupportedEncodingException uee) {  
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
      }  
    }  
  
    protected DataOutputStream out;  
    private final byte[] keyValueSeparator;//key和value的分隔符,默认的好像是Tab  
  
    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {//构造函数,初始化输出流及分隔符  
      this.out = out;  
      try {  
        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);  
      } catch (UnsupportedEncodingException uee) {  
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
      }  
    }  
  
    public LineRecordWriter(DataOutputStream out) {//默认的分隔符  
      this(out, "\t");  
    }  
  
    /** 
    * Write the object to the byte stream, handling Text as a special输出流是byte格式的 
    * case. 
    * @param o the object to print是要输出的对象 
    * @throws IOException if the write throws, we pass it on 
    */  
    private void writeObject(Object o) throws IOException {//应该是一行一行的写 key keyValueSeparator value \n  
      if (o instanceof Text) {//如果o是Text的实例  
        Text to = (Text) o;  
        out.write(to.getBytes(), 0, to.getLength());//写出  
      } else {  
        out.write(o.toString().getBytes(utf8));  
      }  
    }  
  
    public synchronized void write(K key, V value)//给写线程加锁,写是互斥行为  
      throws IOException {  
//下面是为了判断key和value是否为空值  
      boolean nullKey = key == null || key instanceof NullWritable;//这语句太牛了  
      boolean nullValue = value == null || value instanceof NullWritable;  
      if (nullKey && nullValue) {//  
        return;  
      }  
      if (!nullKey) {  
        writeObject(key);  
      }  
      if (!(nullKey || nullValue)) {  
        out.write(keyValueSeparator);  
      }  
      if (!nullValue) {  
        writeObject(value);  
      }  
      out.write(newline);  
    }  
  
    public synchronized  
    void close(TaskAttemptContext context) throws IOException {  
      out.close();  
    }  
  }  
  
  public RecordWriter<K, V>    getRecordWriter(TaskAttemptContext job//获得writer实例  
                        ) throws IOException, InterruptedException {  
    Configuration conf = job.getConfiguration();  
    boolean isCompressed = getCompressOutput(job);//  
    String keyValueSeparator= conf.get("mapred.textoutputformat.separator",  
                                      "\t");  
    CompressionCodec codec = null;//压缩格式 还是?  
    String extension = "";  
    if (isCompressed) {  
      Class<? extends CompressionCodec> codecClass =  
        getOutputCompressorClass(job, GzipCodec.class);  
      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);  
      extension = codec.getDefaultExtension();  
    }  
    Path file = getDefaultWorkFile(job, extension);//这个是获取缺省的文件路径及名称,在FileOutput中有对其的实现  
    FileSystem fs = file.getFileSystem(conf);  
    if (!isCompressed) {  
      FSDataOutputStream fileOut = fs.create(file, false);  
      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);  
    } else {  
      FSDataOutputStream fileOut = fs.create(file, false);  
      return new LineRecordWriter<K, V>(new DataOutputStream  
                                        (codec.createOutputStream(fileOut)),  
                                        keyValueSeparator);  
    }  
  }  

该类是在源代码中TextOutputFormat类基础上进行修改的,在这需要注意的一点是继承的父类FileOutputFormat是位于org.apache.hadoop.mapreduce.lib.output包中的

2、在主类中添加job.setOutputFormatClass(GBKFileOutputFormat.class);

分享到:
评论

相关推荐

    Hadoop权威指南 中文版

    本书从hadoop的缘起开始,由浅入深,结合理论和实践,全方位地介绍hado叩这一高性能处理海量数据集的理想工具。全书共14章,3个附录,... 如果您拥有海量数据,无论是gb级还是pb级,hadoop都将是您的完美解决方案。

    数据算法 Hadoop Spark大数据处理技巧 中文PDF

    《数据算法:Hadoop/Spark大数据处理技巧》介绍了很多基本设计模式、优化技术和数据挖掘及机器学习解决方案,以解决生物信息学、基因组学、统计和社交网络分析等领域的很多问题。这还概要介绍了MapReduce、Hadoop和...

    数据算法 Hadoop Spark大数据处理技巧.pdf

    Hadoop/Spark大数据处理技巧》介绍了很多基本设计模式、优化技术和数据挖掘及机器学习解决方案,以解决生物信息学、基因组学、统计和社交网络分析等领域的很多问题。这还概要介绍了MapReduce、Hadoop和Spark。, 主要...

    中文版Hadoop权威指南

    本书是hadoop权威参考,程序员可从中探索如何分析海量数据集,管理员可以从中了解如何安装与运行hadoop集群。  什么是谷歌帝国的基石?... 如果您拥有海量数据,无论是gb级还是pb级,hadoop都将是您的完美解决方案。

    Hadoop 权威指南(中文版)

    本书从hadoop的缘起开始,由浅入深,结合理论和实践,全方位地介绍hado叩这一高性能处理海量数据集的理想工具。全书共14章,3个附录,... 如果您拥有海量数据,无论是gb级还是pb级,hadoop都将是您的完美解决方案。

    数据算法 Hadoop Spark大数据处理技巧 中文完整版 高清带书签

    《数据算法:Hadoop/Spark大数据处理技巧》介绍了很多基本设计模式、优化技术和数据挖掘及机器学习解决方案,以解决生物信息学、基因组学、统计和社交网络分析等领域的很多问题。这还概要介绍了MapReduce、Hadoop和...

    HadoopSpark大数据处理技巧[中文版][高清]

    《数据算法:Hadoop/Spark大数据处理技巧》介绍了很多基本设计模式、优化技术和数据挖掘及机器学习解决方案,以解决生物信息学、基因组学、统计和社交网络分析等领域的很多问题。这还概要介绍了MapReduce、Hadoop和...

    论文研究-基于层次聚类的跨文本中文人名消歧研究.pdf

    运用中文自然语言处理和信息抽取系统识别命名实体和实体关系,生成实体信息对象(Entity Profile),采用实体信息对象(EP)中的个人信息特征,实体关系和上下文相关信息在Hadoop平台上基于凝聚的层次聚类方法解决了...

    hive编程指南中文版

    在本书中,读者还可以看到众多的实际使用场景,包括企业如何使用Hive解决了涉及PB级数据的问题。 · 使用Hive创建、修改和删除数据库、表、视图、函数和索引。 · 从文件到外部数据库,自定义数据存储格式和存储选项...

    大数据处理技术网页数据清洗及分词

    1. 在jar包执行时,会出现ansj中的类找不到的错误,解决方法是将ansj和nlp两个包上传到hadoop节点上,然后运行程序的执行命令时加上jar包就可以了。 2. 重复运行程序的时候因为之前生成结果文件但是没有删掉,运行...

    Fourinone分布式计算框架

    FourInOne对于分布式大数据量并行计算的解决方案不同于复杂的hadoop,它不像hadoop的中间计算结果依赖于hdfs,它使用不同于map/reduce的全新设计模式解决问题。FourInOne有“包工头”,“农民工”,“手工仓库”的几...

    BI与大数据区别.docx

    BI(Business Intelligence),中文翻译是商务智能,是一套完整的解决方案,用来将组织中现有的数据进行有效的整合,快速准确的提供报表并提出决策依据,帮助组织做出明智的业务经营决策。 大数据(Big Data)是从收集...

    Fourinone分布式并行计算四合一框架

     Fourinone对于分布式大数据量并行计算的解决方案不同于复杂的hadoop,它不像hadoop的中间计算结果依赖于hdfs,它使用不同于map/reduce的全新设计模式解决问题。Fourinone有“包工头”,“农民工”,“手工仓库”的...

    fourinone-3.04.25

    Fourinone对于分布式大数据量并行计算的解决方案不同于复杂的hadoop,它不像hadoop的中间计算结果依赖于hdfs,它使用不同于map/reduce的全新设计模式解决问题。Fourinone有“包工头”,“农民工”,“手工仓库”的几...

    大数据市场分析.pptx

    最新的曙光大数据战略将曙光从硬件设备供应商向解决方案和服务提供商转变。 大数据市场分析全文共29页,当前为第6页。 大数据发展现状 大数据市场分析全文共29页,当前为第7页。 大数据的发展现状1 大数据 发展现状1...

Global site tag (gtag.js) - Google Analytics