更新时间:2024-06-27 GMT+08:00
分享

在IDEA中提交Storm拓扑

操作场景

Storm支持IntelliJ IDEA远程提交拓扑,目前样例代码中仅WordCountTopology支持远程提交,其他拓扑想实现远程提交,请参考WordCountTopology实现远程提交函数。

前提条件

  • 已执行打包Storm样例工程应用
  • 调整IntelliJ IDEA客户端机器时间,和Storm集群时间差不超过5分钟。
  • 确保本地的hosts文件中配置了远程集群所有主机的主机名和业务IP映射关系。

操作步骤

  1. 修改WordCountTopology.java类,使用remoteSubmit方式提交应用程序。并替换用户keytab文件名称,用户principal名称,和Jar文件地址。

    • 使用remoteSubmit方式提交应用程序
      public static void main(String[] args) 
               throws Exception 
           { 
               TopologyBuilder builder = buildTopology(); 
                
               /* 
                * 任务的提交认为三种方式 
                * 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交 
                * 2、远程方式提交,这种需要将应用程序的jar包打包好之后在IntelliJ IDEA中运行main方法提交 
                * 3、本地提交 ,在本地执行应用程序,一般用来测试 
                * 命令行方式和远程方式安全和普通模式都支持 
                * 本地提交仅支持普通模式 
                *  
                * 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可 
                */ 
                
               submitTopology(builder, SubmitType.REMOTE); 
           }
    • 根据实际情况修改userJarFilePath为实际的拓扑Jar包地址
      private static void remoteSubmit(TopologyBuilder builder) 
               throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, 
               IOException 
           { 
               Config config = createConf(); 
                
               String userJarFilePath = "D:\\example.jar"; 
               System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); 
                
               //安全模式下的一些准备工作 
               if (isSecurityModel()) 
               { 
                   securityPrepare(config); 
               } 
               config.setNumWorkers(1); 
               StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology()); 
           }
    • 安全模式下需要执行安全准备,根据实际情况修改userKeyTablePath和userPrincipal为导入并配置Storm样例工程章节的步骤2中所获取用户的keytab文件路径和principal
      private static void securityPrepare(Config config) 
               throws IOException 
           { 
               String userKeyTablePath = 
                   System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "user.keytab"; 
               String userPrincipal = "StreamingDeveloper"; 
               String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator +"krb5.conf"; 
                
               //windows路径下分隔符替换 
               userKeyTablePath = userKeyTablePath.replace("\\", "\\\\"); 
               krbFilePath = krbFilePath.replace("\\", "\\\\"); 
                
               String principalInstance = String.valueOf(config.get(Config.STORM_SECURITY_PRINCIPAL_INSTANCE)); 
               LoginUtil.setKrb5Config(krbFilePath); 
               LoginUtil.setZookeeperServerPrincipal("zookeeper/" + principalInstance); 
               LoginUtil.setJaasFile(userPrincipal, userKeyTablePath); 
           }

  2. 执行WordCountTopology.java类的Main方法提交应用程序。

相关文档