Flink Graph Vertex[来自项目中的经历]

来自项目中的经历

摘要: Graph构建

Graph对象是用户的操作入口,主要包含edgevertex两部分。边是由点组成,所以边中所有的点就是点的全集,但这个全集包含了重复的点,去重后就是Vertex

Flink Graph Demo

ConnectedComponentsData

package com.todcloud.flink.StreamGraph.utils;

/**
 * Created by zhangjianxin on 2017/7/14.
 */
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.LinkedList;
import java.util.List;

/**
 * Provides the default data sets used for the Connected Components example program.
 * The default data sets are used, if no parameters are given to the program.
 *
 */
public class ConnectedComponentsData {

    public static final long[] VERTICES  = new long[] {
            1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
    /**至高点*/
    public static DataSet getDefaultVertexDataSet(ExecutionEnvironment env) {
        List verticesList = new LinkedList();
        for (long vertexId : VERTICES) {
            verticesList.add(vertexId);
        }
        return env.fromCollection(verticesList);
    }
    /**边缘*/
    public static final Object[][] EDGES = new Object[][] {
            new Object[]{1L, 2L},
            new Object[]{2L, 3L},
            new Object[]{2L, 4L},
            new Object[]{3L, 5L},
            new Object[]{6L, 7L},
            new Object[]{8L, 9L},
            new Object[]{8L, 10L},
            new Object[]{5L, 11L},
            new Object[]{11L, 12L},
            new Object[]{10L, 13L},
            new Object[]{9L, 14L},
            new Object[]{13L, 14L},
            new Object[]{1L, 15L},
            new Object[]{16L, 1L}
    };
    /**制造数据采集*/
    public static DataSet> getDefaultEdgeDataSet(ExecutionEnvironment env) {

        List> edgeList = new LinkedList>();
        for (Object[] edge : EDGES) {
            edgeList.add(new Tuple2((Long) edge[0], (Long) edge[1]));
        }
        return env.fromCollection(edgeList);
    }

}

ConnectedComponents

package com.todcloud.flink.StreamGraph;

/**
 * Created by zhangjianxin on 2017/7/14.
 */
import com.todcloud.flink.StreamGraph.utils.ConnectedComponentsData;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;

/**
 *使用增量迭代实现连接的组件算法
 *
 * 

最初,算法为每个顶点分配一个惟一的ID. 在每一步中,顶点选择自己ID的最小值 *邻居的I Ds,作为它的新ID,并告诉它的邻居它的新ID。算法完成后,同一组件中的所有顶点将具有相同的ID * *

一个顶点的组件 ID没有改变,不需要在下一步传播它的信息 的一步。正因为如此, */ @SuppressWarnings("serial") public class ConnectedComponents { public static void main(String... args) throws Exception { /** 检查输入参数 */ final ParameterTool params = ParameterTool.fromArgs(args); /** 建立执行环境 */ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final int maxIterations = params.getInt("iterations", 10); /** 在web接口中提供可用的参数 */ env.getConfig().setGlobalJobParameters(params); /** 顶点和边读数据 */ DataSet vertices = getVertexDataSet(env, params); DataSet> edges = getEdgeDataSet(env, params).flatMap(new UndirectEdge()); /** 分配初始组件(等于顶点id) */ DataSet> verticesWithInitialId = vertices.map(new DuplicateValue()); /**打开一个增量迭代 */ DeltaIteration, Tuple2> iteration = verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0); /** 应用步骤逻辑 */ /** 加入边缘,选择最小的邻居 */ /** 如果候选的组件较小,则更新 */ DataSet> changes = iteration.getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin()) .groupBy(0).aggregate(Aggregations.MIN, 1) .join(iteration.getSolutionSet()).where(0).equalTo(0) .with(new ComponentIdFilter()); /**关闭增量迭代(增量和新工作集是相同的) */ DataSet> result = iteration.closeWith(changes, changes); /**排放结果 */ if (params.has("output")) { result.writeAsCsv(params.get("output"), "\n", " "); env.execute("连接组件的DEMO"); } else { System.out.println("打印输出到stdout. Use --output to specify output path."); result.print(); } } /** * 函数将值转换为两个字段的值 */ @ForwardedFields("*->f0") public static final class DuplicateValue implements MapFunction> { @Override public Tuple2 map(T vertex) { return new Tuple2(vertex, vertex); } } /** * 通过向每个输入边缘发射输入边本身和一个反向的版本,以无定向的边缘 */ public static final class UndirectEdge implements FlatMapFunction, Tuple2> { Tuple2 invertedEdge = new Tuple2(); @Override public void flatMap(Tuple2 edge, Collector> out) { invertedEdge.f0 = edge.f1; invertedEdge.f1 = edge.f0; out.collect(edge); out.collect(invertedEdge); } } /** * UDF that joins a (Vertex-ID, Component-ID) pair that represents the current component that * a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function * produces a (Target-vertex-ID, Component-ID) pair. */ @ForwardedFieldsFirst("f1->f1") @ForwardedFieldsSecond("f1->f0") public static final class NeighborWithComponentIDJoin implements JoinFunction, Tuple2, Tuple2> { @Override public Tuple2 join(Tuple2 vertexWithComponent, Tuple2 edge) { return new Tuple2(edge.f1, vertexWithComponent.f1); } } /** * Emit the candidate (Vertex-ID, Component-ID) pair if and only if the * candidate component ID is less than the vertex's current component ID. */ @ForwardedFieldsFirst("*") public static final class ComponentIdFilter implements FlatJoinFunction, Tuple2, Tuple2> { @Override public void join(Tuple2 candidate, Tuple2 old, Collector> out) { if (candidate.f1 < old.f1) { out.collect(candidate); } } } /** UTIL 研究方法 */ private static DataSet getVertexDataSet(ExecutionEnvironment env, ParameterTool params) { if (params.has("vertices")) { return env.readCsvFile(params.get("vertices")).types(Long.class).map( new MapFunction, Long>() { public Long map(Tuple1 value) { return value.f0; } }); } else { System.out.println("Executing Connected Components example with default vertices data set."); System.out.println("Use --vertices to specify file input."); return ConnectedComponentsData.getDefaultVertexDataSet(env); } } private static DataSet> getEdgeDataSet(ExecutionEnvironment env, ParameterTool params) { if (params.has("edges")) { return env.readCsvFile(params.get("edges")).fieldDelimiter(" ").types(Long.class, Long.class); } else { System.out.println("Executing Connected Components example with default edges data set."); System.out.println("Use --edges to specify file input."); return ConnectedComponentsData.getDefaultEdgeDataSet(env); } } }