更新时间:2025-05-27 GMT+08:00
创建Storm Topology
功能介绍
一个Topology是Spouts和Bolts组成的有向无环图。
应用程序是通过storm jar的方式提交,则需要在main函数中调用创建Topology的函数,并在storm jar参数中指定main函数所在类。
代码样例
下面代码片段在com.huawei.storm.example.wordcount包的“WordCountTopology”类的“main”方法中,作用在于构建应用程序并提交。
public static void main(String[] args)
throws Exception
{
TopologyBuilder builder = buildTopology();
/*
* 任务的提交认为三种方式
* 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交
* 2、远程方式提交,这种需要将应用程序的jar包打包好之后在IntelliJ IDEA中运行main方法提交
* 3、本地提交 ,在本地执行应用程序,一般用来测试
* 命令行方式和远程方式安全和普通模式都支持
* 本地提交仅支持普通模式
*
* 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可
*/
submitTopology(builder, SubmitType.CMD);
}
private static void submitTopology(TopologyBuilder builder, SubmitType type) throws Exception
{
switch (type)
{
case CMD:
{
cmdSubmit(builder, null);
break;
}
case REMOTE:
{
remoteSubmit(builder);
break;
}
case LOCAL:
{
localSubmit(builder);
break;
}
}
}
/**
* 命令行方式远程提交
* 步骤如下:
* 打包成Jar包,然后在客户端命令行上面进行提交
* 远程提交的时候,要先将该应用程序和其他外部依赖(非excemple工程提供,用户自己程序依赖)的jar包打包成一个大的jar包
* 再通过storm客户端中storm -jar的命令进行提交
*
* 如果是安全环境,客户端命令行提交之前,必须先通过kinit命令进行安全登录
*
* 运行命令如下:
* ./storm jar ../example/example.jar com.huawei.storm.example.WordCountTopology
*/
private static void cmdSubmit(TopologyBuilder builder, Config conf)
throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException
{
if (conf == null)
{
conf = new Config();
}
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, conf, builder.createTopology());
}
private static void localSubmit(TopologyBuilder builder)
throws InterruptedException
{
Config conf = new Config();
conf.setDebug(true);
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
private static void remoteSubmit(TopologyBuilder builder)
throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException,
IOException
{
Config config = createConf();
String userJarFilePath = "替换为用户jar包地址";
System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath);
//安全模式下的一些准备工作
if (isSecurityModel())
{
securityPrepare(config);
}
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology());
}
private static TopologyBuilder buildTopology()
{
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));
return builder;
}
如果拓扑开启了ack,推荐acker的数量不大于所设置的worker数量。
父主题: 开发Storm应用