配置HDFS同分布策略(Colocation)
功能简介
同分布(Colocation)功能是将存在关联关系的数据或可能要进行关联操作的数据存储在相同的存储节点上。HDFS文件同分布的特性,将那些需进行关联操作的文件存放在相同数据节点上,在进行关联操作计算时避免了到别的数据节点上获取数据,大大降低网络带宽的占用。
在使用Colocation功能之前,建议用户对Colocation的内部机制有一定了解,包括:
- Colocation分配节点原理
Colocation为locator分配数据节点的时候,locator的分配算法会根据已分配的情况,进行均衡的分配数据节点。
locator分配算法的原理是,查询目前存在的所有locators,读取所有locators所分配的数据节点,并记录其使用次数。根据使用次数,对数据节点进行排序,使用次数少的排在前面,优先选择排在前面的节点。每次选择一个节点后,计数加1,并重新排序,选择后续的节点。
- 扩容与Colocation分配
集群扩容之后,为了平衡地使用所有的数据节点,使新的数据节点的分配频率与旧的数据节点趋于一致,有如下两种策略可以选择,如表1所示。
表1 分配策略 编号
策略
说明
1
删除旧的locators,为集群中所有数据节点重新创建locators。
- 在未扩容之前分配的locators,平衡的使用了所有数据节点。当扩容后,新加入的数据节点并未分配到已经创建的locators中,所以使用Colocation来存储数据的时候,只会往旧的数据节点存储数据。
- 由于locators与特定数据节点相关,所以当集群进行扩容的时候,就需要对Colocation的locators分配进行重新规划。
2
创建一批新的locators,并重新规划数据存放方式。
旧的locators使用的是旧的数据节点,而新创建的locators偏重使用新的数据节点,所以需要根据实际业务对数据的使用需求,重新规划locators的使用。
一般的,建议用户在进行集群扩容之后采用策略一来重新分配locators,可以避免数据偏重使用新的数据节点。
- Colocation与数据节点容量
由于使用Colocation进行存储数据的时候,会固定存储在指定的locator所对应的数据节点上面,所以如果不对locator进行规划,会造成数据节点容量不均衡。下面总结了保证数据节点容量均衡的两个主要的使用原则,如表2所示。
HDFS的二次开发过程中,可以获取DFSColocationAdmin和DFSColocationClient实例,进行从location创建group、删除group、写文件和删除文件的操作。
- 使用Colocation功能,用户指定了DataNode,会造成某些节点上数据量很大。数据倾斜严重,导致HDFS写任务失败。
- 由于数据倾斜,导致MapReduce只会在某几个节点访问,造成这些节点上负载很大,而其他节点闲置。
- 针对单个应用程序任务,只能使用一次DFSColocationAdmin和DFSColocationClient实例。如果每次对文件系统操作都获取此实例,会创建过多HDFS链接,消耗HDFS资源。
- Colocation提供了文件同分布的功能,执行集群balancer或mover操作时,会移动数据块,使Colocation功能失效。因此,使用Colocation功能时,建议将HDFS配置项dfs.datanode.block-pinning.enabled设置为true,此时执行集群Balancer或Mover操作时,使用Colocation写入的文件将不会被移动,从而保证了文件同分布。
代码样例
完整样例代码可参考com.huawei.bigdata.hdfs.examples.ColocationExample。
- 在运行Colocation工程时,需要设置运行用户,此用户需绑定supergroup用户组。
- 在运行Colocation工程时,HDFS的配置项fs.defaultFS不能配置为viewfs://ClusterX。
- 初始化
private static void init() throws IOException { // 设置用户,若用户没有设置HADOOP_USER_NAME,则使用USER if (System.getenv("HADOOP_USER_NAME") == null && System.getProperty("HADOOP_USER_NAME") == null) { System.setProperty("HADOOP_USER_NAME", USER); } }
- 获取实例
样例:Colocation的操作使用DFSColocationAdmin和DFSColocationClient实例,在进行创建group等操作前需获取实例。
dfsAdmin = new DFSColocationAdmin(conf); dfs = new DFSColocationClient(); dfs.initialize(URI.create(conf.get("fs.defaultFS")), conf);
- 创建group
/** * 创建group * * @throws java.io.IOException */ private static void createGroup() throws IOException { dfsAdmin.createColocationGroup(COLOCATION_GROUP_GROUP01, Arrays.asList(new String[] { "lid01", "lid02", "lid03" })); }
- 写文件,写文件前必须创建对应的group
样例:写入testfile.txt文件。
/** * 创建并写入文件 * * @throws java.io.IOException */ private static void put() throws IOException { FSDataOutputStream out = dfs.create(new Path(TESTFILE_TXT), true, COLOCATION_GROUP_GROUP01, "lid01"); // 代写入HDFS的数据 byte[] readBuf = "Hello World".getBytes("UTF-8"); out.write(readBuf, 0, readBuf.length); out.close(); }
- 删除文件
- 删除group