Mongodb作为Flink 的 Sink

因工作需求所整合Flink + Mongodb 作为一个Demo 帮助大家跳坑。

Mongodb 采用集群主从模式。

MongoSink.Java

继承RichSinkFunction,重写openinvoke方法,还有close

package com.e.firsh.spb.sink;

import com.alibaba.fastjson.JSON;
import com.e.firs.spb.utils.Mongo.MongoService;
import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.net.UnknownHostException;

/**
 * Created by zhangjianxin on 2017/7/31.
 */
public  class MongoSink extends RichSinkFunction<String> {
    public static  String CollectionName = "collection-a";
    private MongoService mongoService;
    @Override
    public void invoke(String t) {
        try {
            this.mongoService.saveJson(JSON.parseObject(t),CollectionName);
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void open(Configuration config) {
        mongoService = new MongoService();
        try {
            super.open(config);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
};

MongoService.Java

MongoDB Service 处理工具

package com.e.firsh.spb.utils.Mongo;

import com.alibaba.fastjson.JSONObject;

import java.net.UnknownHostException;
import java.util.List;

/**
 * Created by zhangjianxin on 2017/7/31.
 */
public class MongoService {



    private EntryDao EntryDao = new EntryDao();

    /**
     * 保存
     * @param saveJson
     * @throws UnknownHostException
     */
    public void saveJson(JSONObject saveJson, String CollectionName) throws UnknownHostException {
        this.EntryDao.save(CollectionName,saveJson,true);
    }




    /**
     * 更新
     * @param Entry
     * @throws UnknownHostException
     */
    public void update(Entry Entry,String CollectionName) throws UnknownHostException {
        this.EntryDao.update(Entry,CollectionName);
    }

    /**
     * 查询所有
     * @return
     * @throws UnknownHostException
     */
    public List<Entry> findAll(String CollectionName) throws UnknownHostException{
        return this.EntryDao.findAll(CollectionName);
    }


    /**
     * 删除操作
     * @param id
     * @throws UnknownHostException
     */
    public void remove(int id,String CollectionName) throws UnknownHostException{
        this.EntryDao.remove(id,CollectionName);
    }
}

MongoManager.Java

获得mongodb链接池的工具类,获得某个DB的链接。

package com.e.firsh.spb.utils.Mongo;

/**
 * Created by zhangjianxin on 2017/7/31.
 */
import com.mongodb.*;
import com.mongodb.client.MongoDatabase;

import java.util.Arrays;

public class MongoManager {

    private static Mongo mongo = null;
    private static MongoClient mongoClient= null;
    private MongoManager() { }

    static {
        initDBPrompties();
    }

    public static DB getDB(String dbName) {
        return mongo.getDB(dbName);
    }
    public static MongoDatabase getDatabase(String dbName) {
        return mongoClient.getDatabase(dbName);
    }

    /**
     * 初始化连接池
     */
    private static void initDBPrompties() {
        try {

            MongoClientOptions.Builder mcob = MongoClientOptions.builder();
            mcob.connectionsPerHost(1000);
            mcob.socketKeepAlive(true);
            mcob.readPreference(ReadPreference.secondaryPreferred());
            MongoClientOptions mco = mcob.build();

            mongoClient = new MongoClient(Arrays.asList(
                    new ServerAddress("127.0.0.1", 27017),
                    new ServerAddress("127.0.0.1", 27017),
                    new ServerAddress("127.0.0.1", 27017)), mco);


            mongo = new Mongo(MongoDBConstant.HOST, MongoDBConstant.PORT);
            MongoOptions opt = mongo.getMongoOptions();
            opt.connectionsPerHost = MongoDBConstant.POOLSIZE;
            opt.threadsAllowedToBlockForConnectionMultiplier = MongoDBConstant.BLOCKSIZE;
        } catch (MongoException e) {
            e.printStackTrace();
        }
    }
}

EntryDao.java

用来操作实体的工具类

package com.e.firsh.spb.utils.Mongo;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import com.alibaba.fastjson.JSONObject;
import com.mongodb.*;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.util.JSON;
import org.bson.Document;

/**
 * Created by zhangjianxin on 2017/7/31.
 */
public class EntryDao {

    /**
     * 删除操作
     * @param id
     * @throws UnknownHostException
     */
    public void remove(int id,String CollectionName) throws UnknownHostException{
        MongoDatabase mongo = MongoManager.getDatabase(MongoDBConstant.DB);
        MongoCollection coll = mongo.getCollection(CollectionName);


        Document baseDBO =new Document();
        baseDBO.put("id", id);

        //删除某一条记录
        coll.deleteOne(baseDBO);
    }


    /**
     *
     * @param CollectionName 集合名
     * @param saveJson  待存入JSON
     * @param useDefaultId  未传入_id时,若为true则使用MongoDB自动生成的_id。若为false则生成UUID作为主键
     * @return
     */
    public JSONObject save(String CollectionName, JSONObject saveJson, boolean useDefaultId) {
        JSONObject resp = new JSONObject();
        try {
            MongoDatabase mongo = MongoManager.getDatabase(MongoDBConstant.DB);
            MongoCollection coll = mongo.getCollection(CollectionName);
            Object _id = null;
           if (!saveJson.containsKey("_id")){
                if (!useDefaultId) {
                    _id = UUID.randomUUID().toString();
                    saveJson.put("Data", _id);
                }
            } else if(saveJson.containsKey("_id"))  {
                _id = saveJson.get("_id");
            }
            Document doc = Document.parse(saveJson.toString());
            coll.insertOne(doc);
            resp.put("Data", _id);
        } catch (MongoTimeoutException e1) {
            e1.printStackTrace();
            resp.put("ReasonMessage",e1.getClass() + ":" + e1.getMessage());
            return resp;
        } catch (Exception e) {
            e.printStackTrace();
            resp.put("ReasonMessage",e.getClass() + ":" + e.getMessage());
        }
        return resp;
    }

}

JsonUtils.java

JSON的小工具类

package com.tod.spb.utils.Mongo;

import java.io.OutputStream;

import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;


/**
 * Json转化工具,可以实现java对象和json字符串之间的互相转化<br />
 * Created by zhangjianxin on 2017/7/31.
 */
public class JsonUtils {
    static ObjectMapper objectMapper = new ObjectMapper();

    /**
     * java 对象转换为json 存入流中
     *
     * @param obj
     * @return  s
     */
    public static String toJson(Object obj) {
        String s = "";
        try {
            s = objectMapper.writeValueAsString(obj);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return s;
    }

    /**
     * java 对象转换为json 存入流中
     *
     * @param obj
     * @param out
     */
    public static void toJson(Object obj, OutputStream out) {
        try {
            objectMapper.writeValue(out, obj);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * json 转为java对象
     *
     * @param json
     * @param obj
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static void fromJson(String json, Object obj, Class valueType) {
        try {
            obj = objectMapper.readValue(json, valueType);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * json 转为java对象
     * @param json
     * @param obj
     * @param valueTypeRef
     */
    @SuppressWarnings("rawtypes")
    public static void fromJson(String json, Object obj, TypeReference valueTypeRef) {
        try {
            obj = objectMapper.readValue(json, valueTypeRef);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * json 转为java对象
     *
     * @param json
     * @return obj
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static Object fromJson(String json, Class valueType) {
        Object obj = null;
        try {
            obj = objectMapper.readValue(json, valueType);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return obj;
    }

    /**
     * json 转为java对象
     *
     * @param json
     * @return obj
     * @param valueTypeRef
     */
    @SuppressWarnings({ "rawtypes", "hiding" })
    public static <Object> Object fromJson(String json, TypeReference valueTypeRef) {
        Object obj = null;
        try {
            obj = objectMapper.readValue(json, valueTypeRef);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return obj;
    }
}

MongoDBHelper.java

用来测试MongoDB链接成功

package com.tod.spb.utils.Mongo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import java.net.UnknownHostException;


/**
 * Created by zhangjianxin on 2017/7/31.
 */
public class MongoDBHelper {

    public static  String CollectionName = "collection-a";
    /**
     * 保存
     * @param null
     * @throws UnknownHostException
     */
    private MongoService mongoService = new MongoService();


    public void save() throws UnknownHostException{
        JSONObject data = new JSONObject();
        data.put("tableName","1");
        data.put("body","jsonobject");
        data.put("number","12345");
        this.mongoService.saveJson(data,CollectionName);
    }



    public static void main(String[] args) {

        try {
            MongoDBHelper mongoDBHelper = new MongoDBHelper();
            mongoDBHelper.save();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
    }
}