来自项目中的经历
Graph
对象是用户的操作入口,主要包含edge
和vertex
两部分。边是由点组成,所以边中所有的点就是点的全集,但这个全集包含了重复的点,去重后就是Vertex
。
package com.todcloud.flink.StreamGraph.utils;
/**
* Created by zhangjianxin on 2017/7/14.
*/
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.LinkedList;
import java.util.List;
/**
* Provides the default data sets used for the Connected Components example program.
* The default data sets are used, if no parameters are given to the program.
*
*/
public class ConnectedComponentsData {
public static final long[] VERTICES = new long[] {
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
/**至高点*/
public static DataSet getDefaultVertexDataSet(ExecutionEnvironment env) {
List verticesList = new LinkedList();
for (long vertexId : VERTICES) {
verticesList.add(vertexId);
}
return env.fromCollection(verticesList);
}
/**边缘*/
public static final Object[][] EDGES = new Object[][] {
new Object[]{1L, 2L},
new Object[]{2L, 3L},
new Object[]{2L, 4L},
new Object[]{3L, 5L},
new Object[]{6L, 7L},
new Object[]{8L, 9L},
new Object[]{8L, 10L},
new Object[]{5L, 11L},
new Object[]{11L, 12L},
new Object[]{10L, 13L},
new Object[]{9L, 14L},
new Object[]{13L, 14L},
new Object[]{1L, 15L},
new Object[]{16L, 1L}
};
/**制造数据采集*/
public static DataSet> getDefaultEdgeDataSet(ExecutionEnvironment env) {
List> edgeList = new LinkedList>();
for (Object[] edge : EDGES) {
edgeList.add(new Tuple2((Long) edge[0], (Long) edge[1]));
}
return env.fromCollection(edgeList);
}
}
package com.todcloud.flink.StreamGraph;
/**
* Created by zhangjianxin on 2017/7/14.
*/
import com.todcloud.flink.StreamGraph.utils.ConnectedComponentsData;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
/**
*使用增量迭代实现连接的组件算法
*
* 最初,算法为每个顶点分配一个惟一的ID. 在每一步中,顶点选择自己ID的最小值
*邻居的I Ds,作为它的新ID,并告诉它的邻居它的新ID。算法完成后,同一组件中的所有顶点将具有相同的ID
*
*
一个顶点的组件 ID没有改变,不需要在下一步传播它的信息 的一步。正因为如此,
*/
@SuppressWarnings("serial")
public class ConnectedComponents {
public static void main(String... args) throws Exception {
/** 检查输入参数 */
final ParameterTool params = ParameterTool.fromArgs(args);
/** 建立执行环境 */
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final int maxIterations = params.getInt("iterations", 10);
/** 在web接口中提供可用的参数 */
env.getConfig().setGlobalJobParameters(params);
/** 顶点和边读数据 */
DataSet vertices = getVertexDataSet(env, params);
DataSet> edges = getEdgeDataSet(env, params).flatMap(new UndirectEdge());
/** 分配初始组件(等于顶点id) */
DataSet> verticesWithInitialId = vertices.map(new DuplicateValue());
/**打开一个增量迭代 */
DeltaIteration, Tuple2> iteration =
verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);
/** 应用步骤逻辑 */
/** 加入边缘,选择最小的邻居 */
/** 如果候选的组件较小,则更新 */
DataSet> changes = iteration.getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
.groupBy(0).aggregate(Aggregations.MIN, 1)
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.with(new ComponentIdFilter());
/**关闭增量迭代(增量和新工作集是相同的) */
DataSet> result = iteration.closeWith(changes, changes);
/**排放结果 */
if (params.has("output")) {
result.writeAsCsv(params.get("output"), "\n", " ");
env.execute("连接组件的DEMO");
} else {
System.out.println("打印输出到stdout. Use --output to specify output path.");
result.print();
}
}
/**
* 函数将值转换为两个字段的值
*/
@ForwardedFields("*->f0")
public static final class DuplicateValue implements MapFunction> {
@Override
public Tuple2 map(T vertex) {
return new Tuple2(vertex, vertex);
}
}
/**
*
通过向每个输入边缘发射输入边本身和一个反向的版本,以无定向的边缘
*/
public static final class UndirectEdge implements FlatMapFunction, Tuple2> {
Tuple2 invertedEdge = new Tuple2();
@Override
public void flatMap(Tuple2 edge, Collector> out) {
invertedEdge.f0 = edge.f1;
invertedEdge.f1 = edge.f0;
out.collect(edge);
out.collect(invertedEdge);
}
}
/**
* UDF that joins a (Vertex-ID, Component-ID) pair that represents the current component that
* a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function
* produces a (Target-vertex-ID, Component-ID) pair.
*/
@ForwardedFieldsFirst("f1->f1")
@ForwardedFieldsSecond("f1->f0")
public static final class NeighborWithComponentIDJoin implements JoinFunction, Tuple2, Tuple2> {
@Override
public Tuple2 join(Tuple2 vertexWithComponent, Tuple2 edge) {
return new Tuple2(edge.f1, vertexWithComponent.f1);
}
}
/**
* Emit the candidate (Vertex-ID, Component-ID) pair if and only if the
* candidate component ID is less than the vertex's current component ID.
*/
@ForwardedFieldsFirst("*")
public static final class ComponentIdFilter implements FlatJoinFunction, Tuple2, Tuple2> {
@Override
public void join(Tuple2 candidate, Tuple2 old, Collector> out) {
if (candidate.f1 < old.f1) {
out.collect(candidate);
}
}
}
/** UTIL 研究方法 */
private static DataSet getVertexDataSet(ExecutionEnvironment env, ParameterTool params) {
if (params.has("vertices")) {
return env.readCsvFile(params.get("vertices")).types(Long.class).map(
new MapFunction, Long>() {
public Long map(Tuple1 value) {
return value.f0;
}
});
} else {
System.out.println("Executing Connected Components example with default vertices data set.");
System.out.println("Use --vertices to specify file input.");
return ConnectedComponentsData.getDefaultVertexDataSet(env);
}
}
private static DataSet> getEdgeDataSet(ExecutionEnvironment env, ParameterTool params) {
if (params.has("edges")) {
return env.readCsvFile(params.get("edges")).fieldDelimiter(" ").types(Long.class, Long.class);
} else {
System.out.println("Executing Connected Components example with default edges data set.");
System.out.println("Use --edges to specify file input.");
return ConnectedComponentsData.getDefaultEdgeDataSet(env);
}
}
}