In Storm and Trident topologies are configured through client-side API calls and then serialized and submitted to the Storm cluster coordinator (Nimbus) using Apache Thrift as the RPC layer. Storm provides a convenient CLI tool for submitting topologies to Nimbus. However there’s no documented approach to submitting topologies exclusively through the Storm API. Topology submission with the Storm API provides advantages over spawning an external process for each topology submission, as it is faster, less resource intensive and streamlines error handling.
Storm CLI
In the Storm CLI, a Storm or Trident topology can be submitted with a command similar to:
$ bin/storm jar {path-to-uber-jar} {topology-submission-main-class} {arg1} {arg2} ...
The bin/storm command (which is a Python script), reads the default configuration from defaults.yaml from the classpath and combines it with either the local Storm configuration settings from ~/.storm/storm.yaml or the configuration file provided in the CLI through the --config option. Then it spawns a new Java process with a few custom System properties such as the location of the submitted uber JAR. The main class is called which typically contains the following code:
StormTopology topology = buildTopology(); StormSubmitter.submitTopology("my-topology", conf, topology);
The Storm submitter uploads the JAR file referenced in the System property storm.jar to Nimbus and then it calls the Nimbus again providing the remote location of the JAR file, the serialized version of topology and the topology configuration.
The above workflow is the same for both Storm and Trident topologies as Trident topologies are compiled into regular Storm topologies before submission.
Programmatic submission of topologies
In programmatic submission we will reconstruct the above process without setting any system-wide properties to avoid side-effects.
Configuration
First we configure topology-specific settings. Some of these (e.g. the number of workers) are topology-specific and may be requested by the Topology constructor rather than set at this stage.
Then we load the default configuration in order to retrieve common settings for interacting with the Nimbus server. Note that only a few values from the default configuration are inserted into the submitted configuration as this will override server-wide settings on the servers (worker nodes). E.g. if we have added the complete default configuration into the submitted configuration, the worker processes that run the topology will be setup with the Zookeeper URL localhost:2181 which is wrong in most cases.
Config topologyConf = new Config(); topologyConf.setDebug(true); topologyConf.setNumWorkers(6); Map defaultConf = Utils.readStormConfig(); Map conf = new HashMap(); conf.put(Config.NIMBUS_HOST, "nimbus.mydomain.com"); conf.put(Config.NIMBUS_THRIFT_PORT, defaultConf.get(Config.NIMBUS_THRIFT_PORT)); conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, defaultConf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN)); conf.putAll(topologyConf);
Code Upload
When configuration is complete, the uber JAR file is uploaded to Nimbus:
String inputJar = "storm-test-1.0-SNAPSHOT-jar-with-dependencies.jar"; String remoteJar = StormSubmitter.submitJar(conf, inputJar);
Topology Submission
Finally, the topology is constructed with a call to the designated topology builder in the uber JAR file:
StormTopology topology = getTopology(String.format("file://%s", inputJar), "io.modio.blog.storm.submit.ExclamationTopology", "buildTopology");
private static StormTopology getTopology(String path, String className, String methodName) throws MalformedURLException, ClassNotFoundException, NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, RemoteSubmitter.class.getClassLoader()); Class<?> clazz = loader.loadClass(className); Method method = clazz.getMethod(methodName, new Class[] {}); return (StormTopology) method.invoke(null, new Object[] {}); }
A remote reference to the Nimbus is retrieved and the Topology is sent:
NimbusClient client = NimbusClient.getConfiguredClient(conf); try { client.getClient().submitTopology("my-topology", remoteJar, JSONValue.toJSONString(conf), topology); } catch (AlreadyAliveException e) { e.printStackTrace(); }
I have some question…
1. If you have the full source code, can you send it to me? Or give me a link.
2. Is there a fix for the latest version?
Thank you.