Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable nimbus & UI servers to be asisgned with available port, instead of pre-configured #622

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions storm-core/src/clj/backtype/storm/bootstrap.clj
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
(import (quote [backtype.storm.daemon.common StormBase Assignment
SupervisorInfo WorkerHeartbeat]))
(import (quote [backtype.storm.grouping CustomStreamGrouping]))
(import (quote [backtype.storm.utils HostPort]))
(import (quote [java.io File FileOutputStream FileInputStream]))
(import (quote [java.util Collection List Random Map HashMap Collections ArrayList LinkedList]))
(import (quote [org.apache.commons.io FileUtils]))
Expand Down
21 changes: 20 additions & 1 deletion storm-core/src/clj/backtype/storm/cluster.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
(:use [backtype.storm util log config])
(:require [backtype.storm [zookeeper :as zk]])
(:require [backtype.storm.daemon [common :as common]])

)

(defprotocol ClusterState
Expand Down Expand Up @@ -120,18 +119,26 @@
(remove-storm! [this storm-id])
(report-error [this storm-id task-id error])
(errors [this storm-id task-id])
(nimbus-info [this]) ;; fetch nimbus host + port as HostPort
(set-nimbus! [this info]) ;; announce nimbus host+port
(ui-port [this]) ;; fetch ui port as Integer
(set-ui-port! [this info]) ;; announce ui port

(disconnect [this])
)


(def NIMBUS-ROOT "nimbus")
(def UI-ROOT "ui")
(def ASSIGNMENTS-ROOT "assignments")
(def CODE-ROOT "code")
(def STORMS-ROOT "storms")
(def SUPERVISORS-ROOT "supervisors")
(def WORKERBEATS-ROOT "workerbeats")
(def ERRORS-ROOT "errors")

(def NIMBUS-SUBTREE (str "/" NIMBUS-ROOT))
(def UI-SUBTREE (str "/" UI-ROOT))
(def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT))
(def STORMS-SUBTREE (str "/" STORMS-ROOT))
(def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT))
Expand Down Expand Up @@ -222,6 +229,18 @@
(reify
StormClusterState

(nimbus-info [this]
(maybe-deserialize (get-data cluster-state NIMBUS-SUBTREE false)))

(set-nimbus! [this info]
(set-data cluster-state NIMBUS-SUBTREE (Utils/serialize info)))

(ui-port [this]
(maybe-deserialize (get-data cluster-state UI-SUBTREE false)))

(set-ui-port! [this info]
(set-data cluster-state UI-SUBTREE (Utils/serialize info)))

(assignments [this callback]
(when callback
(reset! assignments-callback callback))
Expand Down
3 changes: 1 addition & 2 deletions storm-core/src/clj/backtype/storm/daemon/common.clj
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@
(def LS-LOCAL-ASSIGNMENTS "local-assignments")
(def LS-APPROVED-WORKERS "approved-workers")



(defrecord WorkerHeartbeat [time-secs storm-id executors port])

(defrecord ExecutorStats [^long processed
Expand Down Expand Up @@ -331,3 +329,4 @@
(->> executor->node+port
(mapcat (fn [[e node+port]] (for [t (executor-id->tasks e)] [t node+port])))
(into {})))

25 changes: 21 additions & 4 deletions storm-core/src/clj/backtype/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(:import [org.apache.thrift7.protocol TBinaryProtocol TBinaryProtocol$Factory])
(:import [org.apache.thrift7 TException])
(:import [org.apache.thrift7.transport TNonblockingServerTransport TNonblockingServerSocket])
(:import [java.net InetAddress])
(:import [java.nio ByteBuffer])
(:import [java.io FileNotFoundException])
(:import [java.nio.channels Channels WritableByteChannel])
Expand Down Expand Up @@ -1125,18 +1126,34 @@
(waiting? [this]
(timer-waiting? (:timer nimbus))))))

(defn launch-server! [conf nimbus]
(defn announce-nimbus-info [nimbus host port]
(let [storm-cluster-state (:storm-cluster-state nimbus)
nimbus-host-port (HostPort. host port)]
(.set-nimbus! storm-cluster-state nimbus-host-port)))

(defn config-with-nimbus-port-assigned [conf]
(let [port_in_conf (.intValue (Integer. (conf NIMBUS-THRIFT-PORT)))
nimbus_port (assign-server-port port_in_conf)]
(assoc (assoc conf
NIMBUS-THRIFT-PORT (Integer. nimbus_port))
NIMBUS-HOST (.getCanonicalHostName (InetAddress/getLocalHost)))))

(defn launch-server! [conf inimbus]
(validate-distributed-mode! conf)
(let [service-handler (service-handler conf nimbus)
options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
(let [extended-conf (config-with-nimbus-port-assigned conf)
nimbus-port (extended-conf NIMBUS-THRIFT-PORT)
nimbus (nimbus-data extended-conf inimbus)
service-handler (service-handler extended-conf inimbus)
options (-> (TNonblockingServerSocket. nimbus-port)
(THsHaServer$Args.)
(.workerThreads 64)
(.protocolFactory (TBinaryProtocol$Factory.))
(.processor (Nimbus$Processor. service-handler))
)
server (THsHaServer. options)]
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
(log-message "Starting Nimbus server...")
(log-message "Starting Nimbus server with port " nimbus-port)
(announce-nimbus-info nimbus (extended-conf NIMBUS-HOST) nimbus-port)
(.serve server)))


Expand Down
110 changes: 59 additions & 51 deletions storm-core/src/clj/backtype/storm/daemon/supervisor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -317,68 +317,76 @@
(.add processes-event-manager sync-processes)
)))

(defn assign-worker-ports [conf]
(let [new-ports (vec (for [port (get conf SUPERVISOR-SLOTS-PORTS)] (assign-server-port port)))]
(assoc conf SUPERVISOR-SLOTS-PORTS new-ports)
))

;; in local state, supervisor stores who its current assignments are
;; another thread launches events to restart any dead processes if necessary
(defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]
(log-message "Starting Supervisor with conf " conf)
(.prepare isupervisor conf (supervisor-isupervisor-dir conf))
(FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf)))
(let [supervisor (supervisor-data conf shared-context isupervisor)
[event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
sync-processes (partial sync-processes supervisor)
synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
heartbeat-fn (fn [] (.supervisor-heartbeat!
(:storm-cluster-state supervisor)
(:supervisor-id supervisor)
(SupervisorInfo. (current-time-secs)
(:my-hostname supervisor)
(:assignment-id supervisor)
(keys @(:curr-assignment supervisor))
;; used ports
(.getMetadata isupervisor)
(let [nimbusHostPort (.nimbus-info (cluster/mk-storm-cluster-state conf))
conf (assoc (assoc conf NIMBUS-HOST (.host nimbusHostPort)) NIMBUS-THRIFT-PORT (.port nimbusHostPort))
conf (assign-worker-ports conf)]
(log-message "Starting Supervisor with conf " conf)
(.prepare isupervisor conf (supervisor-isupervisor-dir conf))
(FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf)))
(let [supervisor (supervisor-data conf shared-context isupervisor)
[event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
sync-processes (partial sync-processes supervisor)
synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
heartbeat-fn (fn [] (.supervisor-heartbeat!
(:storm-cluster-state supervisor)
(:supervisor-id supervisor)
(SupervisorInfo. (current-time-secs)
(:my-hostname supervisor)
(:assignment-id supervisor)
(keys @(:curr-assignment supervisor))
;; used ports
(.getMetadata isupervisor)
(conf SUPERVISOR-SCHEDULER-META)
((:uptime supervisor)))))]
(heartbeat-fn)
;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
(schedule-recurring (:timer supervisor)
0
(conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
heartbeat-fn)
(when (conf SUPERVISOR-ENABLE)
;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
;; to date even if callbacks don't all work exactly right
(schedule-recurring (:timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
(heartbeat-fn)
;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
(schedule-recurring (:timer supervisor)
0
(conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
(fn [] (.add processes-event-manager sync-processes))))
(log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor))
(reify
Shutdownable
(shutdown [this]
(log-message "Shutting down supervisor " (:supervisor-id supervisor))
(reset! (:active supervisor) false)
(conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
heartbeat-fn)
(when (conf SUPERVISOR-ENABLE)
;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
;; to date even if callbacks don't all work exactly right
(schedule-recurring (:timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
(schedule-recurring (:timer supervisor)
0
(conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
(fn [] (.add processes-event-manager sync-processes))))
(log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor))
(reify
Shutdownable
(shutdown [this]
(log-message "Shutting down supervisor " (:supervisor-id supervisor))
(reset! (:active supervisor) false)
(cancel-timer (:timer supervisor))
(.shutdown event-manager)
(.shutdown processes-event-manager)
(.disconnect (:storm-cluster-state supervisor)))
SupervisorDaemon
(get-conf [this]
conf)
(get-id [this]
(:supervisor-id supervisor))
(shutdown-all-workers [this]
(let [ids (my-worker-ids conf)]
(doseq [id ids]
(shutdown-worker supervisor id)
)))
DaemonCommon
(waiting? [this]
(or (not @(:active supervisor))
(and
(timer-waiting? (:timer supervisor))
(every? (memfn waiting?) managers)))
))))
SupervisorDaemon
(get-conf [this]
conf)
(get-id [this]
(:supervisor-id supervisor))
(shutdown-all-workers [this]
(let [ids (my-worker-ids conf)]
(doseq [id ids]
(shutdown-worker supervisor id)
)))
DaemonCommon
(waiting? [this]
(or (not @(:active supervisor))
(and
(timer-waiting? (:timer supervisor))
(every? (memfn waiting?) managers)))
)))))

(defn kill-supervisor [supervisor]
(.shutdown supervisor)
Expand Down
5 changes: 5 additions & 0 deletions storm-core/src/clj/backtype/storm/testing.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
[common :as common]
[worker :as worker]
[executor :as executor]])
(:require [backtype.storm.ui [core :as ui]])
(:require [backtype.storm [process-simulator :as psim]])
(:import [org.apache.commons.io FileUtils])
(:import [java.io File])
Expand Down Expand Up @@ -109,6 +110,8 @@
{STORM-CLUSTER-MODE "local"
STORM-ZOOKEEPER-PORT zk-port
STORM-ZOOKEEPER-SERVERS ["localhost"]})
daemon-conf (nimbus/config-with-nimbus-port-assigned daemon-conf)
ui-conf (ui/config-with-ui-port-assigned daemon-conf)
nimbus-tmp (local-temp-path)
port-counter (mk-counter supervisor-slot-port-min)
nimbus (nimbus/service-handler
Expand All @@ -127,6 +130,8 @@
supervisor-confs (if (sequential? supervisors)
supervisors
(repeat supervisors {}))]
(nimbus/announce-nimbus-info cluster-map (daemon-conf NIMBUS-HOST) (daemon-conf NIMBUS-THRIFT-PORT))
(ui/announce-ui-port daemon-conf)
(doseq [sc supervisor-confs]
(add-supervisor cluster-map :ports ports-per-supervisor :conf sc))
cluster-map
Expand Down
Loading