Maven

  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-common</artifactId>
  4. <version>2.7.5</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hadoop</groupId>
  8. <artifactId>hadoop-hdfs</artifactId>
  9. <version>2.7.5</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.hadoop</groupId>
  13. <artifactId>hadoop-client</artifactId>
  14. <version>2.7.5</version>
  15. </dependency>

配置

  1. package bigdata.parser.common
  2. import bigdata.common.frame.vo.component.HdfsComponentConfig
  3. import org.apache.hadoop.conf.Configuration
  4. object HdfsConfiguration {
  5. def newInstance(mate: HdfsComponentConfig): Configuration = {
  6. val cfg = new Configuration()
  7. cfg.setBoolean("dfs.support.append", true)
  8. cfg.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
  9. cfg.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true")
  10. cfg.set("dfs.block.access.token.enable", "true")
  11. cfg.set("dfs.http.policy", "HTTP_ONLY")
  12. cfg.set("dfs.replication", "2")
  13. cfg.set("dfs.client.block.write.locateFollowingBlock.retries", "10") //close失败重试次数
  14. cfg.set("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
  15. cfg.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)
  16. cfg.set("fs.hdfs.impl.disable.cache", "true")
  17. cfg.set("fs.defaultFS", s"hdfs://${mate.nameService}")
  18. cfg.set("dfs.nameservices", mate.nameService)
  19. cfg.set(s"dfs.ha.namenodes.${mate.nameService}", "nn1,nn2")
  20. cfg.set(s"dfs.client.failover.proxy.provider.${mate.nameService}", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
  21. mate.hosts.foreach(item => {
  22. cfg.set(s"dfs.namenode.rpc-address.${mate.nameService}.${item._1}", item._2)
  23. })
  24. cfg.set("mapreduce.input.fileinputformat.input.dir.recursive", "true")
  25. cfg
  26. }
  27. }

HA

  1. val configuration = new Configuration
  2. configuration.setBoolean("dfs.support.append", true)
  3. configuration.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
  4. configuration.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true")
  5. configuration.set("dfs.block.access.token.enable", "true")
  6. configuration.set("dfs.http.policy", "HTTP_ONLY")
  7. configuration.set("dfs.replication", "1")
  8. configuration.set("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
  9. configuration.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)
  10. configuration.set("fs.hdfs.impl.disable.cache", "true")
  11. configuration.set("fs.defaultFS", s"hdfs://HaimaNX")
  12. configuration.set("dfs.nameservices", "HaimaNX")
  13. configuration.set(s"dfs.ha.namenodes.HaimaNX", "nn1,nn2")
  14. configuration.set(s"dfs.client.failover.proxy.provider.HaimaNX",
  15. "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
  16. configuration.set(s"dfs.namenode.rpc-address.HaimaNX.nn1", "mvp-hadoop26:8020")
  17. configuration.set(s"dfs.namenode.rpc-address.HaimaNX.nn2", "MVP-HADOOP27:8020")

无HA

读写

  1. try {
  2. Configuration cfg = new Configuration();
  3. if (ValidateUtils.notEmpty(config.getFields())) {
  4. for (Map.Entry<String, String> entry : config.getFields().entrySet()) {
  5. if (ValidateUtils.allNotEmpty(entry.getKey(), entry.getValue())) {
  6. cfg.set(entry.getKey(), entry.getValue());
  7. }
  8. }
  9. }
  10. login(cfg);
  11. URI uri = URI.create(config.getUri());
  12. fileSystem = FileSystem.get(uri, cfg);
  13. } catch (Exception e) {
  14. LOG.error("initOut HDFS FileSystem fail", e);
  15. return false;
  16. }
文档更新时间: 2019-06-21 20:02