JXTA Master/Slave Application in Clojure
Categories: Technical

I’m currently working on a distributed application in Clojure that has a master/slave architecture. Being the lazy bastard I am, I wanted to use an existing library that would make this as easy as possible. I considered using a DHT library for Java, such as JDHT or Bamboo but finally decided on a P2P library for Java called JXTA. In addition to needing to support a master/slave architecture, I needed to be able to run slaves from a separate network than the master. The JXTA library is poorly documented and I found very few examples of actual working code, so this post provides a shell of an application to build upon, intended for anyone who might want to use JXTA. The sample code is in Clojure, but could easily be translated to any language that runs on the JVM.

The basic architecture is the following: A single relay node is used for both the slaves and master to connect to. This node is used only for routing messages between the master and slaves. In order for this to work, this peer must run on a box that both the master and slave can establish a TCP connection to. Here is the method that configures the relay node:

(defn configureRdvNode []
(let [seedingURI (URI/create Jxta/RDV_URI)]
(doto (.getConfigurator manager)
(.setHome (new File Jxta/JXTA_HOME))
(.setUseMulticast false)
(.addSeedRelay seedingURI)
(.addSeedRendezvous seedingURI)
(.addRdvSeedingURI seedingURI)
(.addRelaySeedingURI seedingURI)
(.setMode (+ NetworkConfigurator/RDV_SERVER NetworkConfigurator/RELAY_SERVER))
(.setUseOnlyRelaySeeds true)
(.setUseOnlyRendezvousSeeds true)
(.setTcpEnabled true)
(.setTcpIncoming true)
(.setTcpOutgoing true)
(.save))))

A single master node sends out discovery messages which the relay peer will forward to any slaves connected to it. Here is the method that sends out the discovery messages:

(defn registrarLoop [#^DiscoveryService discoveryService pipeAdv]
(ThreadUtils/onThread
#(while @registrate
(let [waitTime 10000]
(println "master publishing register pipe advertisement")
(.publish discoveryService pipeAdv)
(.remotePublish discoveryService pipeAdv)
(println "master sleeping for: " waitTime " ms.\n")
(Thread/sleep waitTime)))))

ThreadUtils/onThread takes a function (which remember in Clojure all functions implement the Runnable and Callable interfaces) and runs it in a new thread.

Each slave simply waits for a discovery request, and once it receives one, it establishes a bi-directional communication channel with the master (known in JXTA as a pipe). Note that in this scenario the slaves do NOT connect directly to the master, all communication gets routed through the relay peer. Here is the method that establishes the communication pipe once a discovery advertisement message has been received:

;attempt to create a bidirectional pipe based on the pipe advertisement
(defn createPipeFromAdv [#^PipeAdvertisement adv]
(if (or (nil? @pipe) (not (.isBound @pipe)))
(dosync (ref-set pipe (new JxtaBiDiPipe @netPeerGroup adv Integer/MAX_VALUE pipeMsgListener true)))))

The pipe is a ref so that other methods can send and receive messages using the pipe, and so that it can be cleaned up properly at a later point in time.

After a slave has established this pipe, it sends a heartbeat message to the master every 30 seconds via the following code:

;loop as long as a pipe is available, sending a heartbeat message every 30 seconds
(defn heartbeat []
(ThreadUtils/onThread
#(while (and (not (nil? @pipe)) (.isBound @pipe))
(do
(sendHeartbeat @pipe)
(Thread/sleep 30000)))))

The master prints each message it receives via a PipeMsgListener:

(def pipeMsgListener
(proxy [PipeMsgListener] []
(pipeMsgEvent [#^PipeMsgEvent event]
(let [source (.getSource event)
msg (.getMessage event)
elements (iterator-seq (.getMessageElements msg))]
(if-not (nil? @callback)
(@callback
(map
(fn [element]
(struct Jxta/InputMessage (str (.getPipeID (.getPipeAdvertisement source)))
(.getElementName element) (str element) (System/currentTimeMillis)))
elements)))))))

And that’s all there is to it. Keep in mind that I’m new to JXTA so there may be some calls here into the API that are unnecessary. The full source is available for download here and is licensed under the BSD license so feel free to use this in your open source or commercial products!

Leave a Reply