因工作需求,学习了Hdfs分布式文件存储系统,所整合Flink + HDFS 作为一个Demo 帮助大家跳坑。 HDFS 采用NS高可用模式。
HDFSManager.Java
* 初始化HDFS链接。
package com.e.firsh.spb.utils.hdfs;
import com.alibaba.fastjson.JSONObject;
import com.e.firsh.base.esb.BO;
import com.e.firsh.base.utils.Registry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Set;
/**
* The type Hdfs manager.
*/
public class HDFSManager {
private static Logger logger = LoggerFactory.getLogger(HDFSManager.class);
private static Configuration configuration;
private static FileSystem fs;
public HDFSManager() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("fs.defaultFS", "hdfs://ns");
jsonObject.put("dfs.nameservices", "ns");
jsonObject.put("dfs.ha.namenodes.ns","nn1,nn2");
jsonObject.put("dfs.namenode.rpc-address.ns.nn1", "10.11.0.12:9000");
jsonObject.put("dfs.namenode.rpc-address.ns.nn2", "10.11.0.13:9000");
jsonObject.put("dfs.client.failover.proxy.provider.ns", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
init(jsonObject);
}
public boolean init(JSONObject args) {
Configuration conf = new Configuration();
Set<String> itr = args.keySet();
if (itr != null) {
for (String pname : itr) {
String pvalue = args.getString(pname);
conf.set(pname, pvalue);
}
}
try {
fs = FileSystem.get(conf);
return true;
} catch (Exception e) {
logger.error(e.getMessage() + ":{}", e);
}
return false;
}
public BO appendToFile(String destPath, String line) throws IOException {
BO respBO = BO.createResponseBO();
Path path = new Path(destPath);
FSDataOutputStream dos = null;
try {
if (!fs.exists(path)) {
dos = fs.create(path);
} else {
dos = fs.append(path);
}
byte[] readBuf = line.getBytes("UTF-8");
dos.write(readBuf, 0, readBuf.length);
dos.close();
respBO.setDataNameValue("length", readBuf.length);
} catch (Exception e) {
logger.error(e.getMessage() + ":{}", e);
respBO.setCompleteCode(BO.BO_V_CC_FAILED);
} finally {
if (dos != null) {
dos.close();
}
}
return respBO;
}
public static void main(String[] args) {
HDFSManager hdfsManager = new HDFSManager();
try {
hdfsManager.appendToFile("/testmq","hello w");
} catch (IOException e) {
e.printStackTrace();
}
}
public static void init2(){
configuration = new Configuration();
/**TODO 添加Hadoop配置内容*/
configuration.set("dfs.namenode.name.dir", "file:///home/admin/data/hadoop/hdfs/name");
configuration.set("dfs.nameservices", "ns");
configuration.set("dfs.ha.namenodes.ns", "nn1,nn2");
configuration.set("dfs.namenode.rpc-address.ns.nn1", "10.11.0.12:9000");
configuration.set("dfs.namenode.rpc-address.ns.nn2", "10.11.0.13:9000");
configuration.set("dfs.namenode.shared.edits.dir", "qjournal://10.11.0.12:8485;10.11.0.13:8485;10.11.0.14:8485/ns");
configuration.set("hadoop.tmp.dir", "/home/admin/data/hadoop/tmp");
configuration.set("fs.defaultFS", "hdfs://ns");
configuration.set("dfs.journalnode.edits.dir", "/home/admin/data/hadoop/journal");
configuration.set("dfs.client.failover.proxy.provider.ns", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
configuration.set("ha.zookeeper.quorum", "10.11.0.12:2181,10.11.0.13:2181,10.11.0.14:2181");
configuration.set("mapreduce.input.fileinputformat.split.minsize", "10");
try {
fs = FileSystem.get(configuration);
} catch (Exception e) {
logger.error(e.getMessage() + ":{}", e);
}
}
}
HDFSSink.Java
- 继承
RichSinkFunction
,重写open
和invoke
方法,还有close
。
package com.e.firsh.spb.sink;
import com.e.firsh.spb.utils.hdfs.HDFSManager;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
/**
* Created by zhangjianxin on 2017/8/1.
*/
public class HDFSSink extends RichSinkFunction<String> {
private HDFSManager hdfsManager;
private final static String HDFS_PATH ="/testmq";
@Override
public void invoke(String t) throws Exception {
hdfsManager.appendToFile(HDFS_PATH,t);
}
@Override
public void open(Configuration config) {
hdfsManager = new HDFSManager();
}
}
以上代码为简易版经过修改,思路清晰,(Flink自带一种HDFS的链接方式详情见链接:)
- https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/filesystem_sink.html