-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve Evenscheduler to schedule executors based supervisor used slots #673
base: master
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -320,6 +320,17 @@ | |
(assoc m v (conj existing k)))) | ||
{} amap)) | ||
|
||
(defn coll-to-map | ||
"([:a 1][:a 2][:b 1][:c 2]) -> {:a [1 2] :b 1 :c 2}" | ||
[acoll] | ||
(reduce (fn [m item] | ||
(let [existing (get m (first item) [])] | ||
(assoc m (first item) (conj existing (second item))) | ||
) | ||
) | ||
{} acoll) | ||
) | ||
|
||
(defmacro print-vars [& vars] | ||
(let [prints (for [v vars] `(println ~(str v) ~v))] | ||
`(do ~@prints))) | ||
|
@@ -600,15 +611,16 @@ | |
(Collections/shuffle state rand)) | ||
(.get state (.get curr))) | ||
|
||
; this can be rewritten to be tail recursive | ||
;;'(1 2 3 4) '(5 6) '( 7 8 9 10) '() '(11) --> (1 5 7 11 2 6 8 3 9 4 10) | ||
(defn interleave-all [& colls] | ||
(if (empty? colls) | ||
[] | ||
(let [colls (filter (complement empty?) colls) | ||
my-elems (map first colls) | ||
rest-elems (apply interleave-all (map rest colls))] | ||
(concat my-elems rest-elems) | ||
))) | ||
(loop [result [] colls colls] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems this PR doesn't use the |
||
(let [colls (filter (complement empty?) colls)] | ||
(if (empty? colls) | ||
(apply concat result) | ||
(recur (conj result (map first colls)) (map rest colls)) | ||
))))) | ||
|
||
(defn update [m k afn] | ||
(assoc m k (afn (get m k)))) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,9 @@ | ||
(ns backtype.storm.scheduler-test | ||
(:use [clojure test]) | ||
(:use [backtype.storm bootstrap config testing]) | ||
(:use [backtype.storm bootstrap config testing util]) | ||
(:require [backtype.storm.daemon [nimbus :as nimbus]]) | ||
(:require [backtype.storm.scheduler [EvenScheduler :as EvenScheduler]]) | ||
(:require [backtype.storm.scheduler [DefaultScheduler :as DefaultScheduler]]) | ||
(:import [backtype.storm.generated StormTopology]) | ||
(:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails | ||
SchedulerAssignmentImpl Topologies TopologyDetails])) | ||
|
@@ -244,3 +246,76 @@ | |
(is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 3))))) | ||
(is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 5))))) | ||
)) | ||
|
||
(deftest test-use-resources-evenly | ||
(let [supervisor1 (SupervisorDetails. "supervisor1" "192.168.0.1" (list ) (map int (list 1 3 5 7 9))) | ||
supervisor2 (SupervisorDetails. "supervisor2" "192.168.0.2" (list ) (map int (list 2 4 6 8 10))) | ||
supervisor3 (SupervisorDetails. "supervisor3" "192.168.0.3" (list ) (map int (list 11 13 15 17 19))) | ||
supervisor4 (SupervisorDetails. "supervisor4" "192.168.0.4" (list ) (map int (list 12 14 16 18 20))) | ||
executor1 (ExecutorDetails. (int 1001) (int 1001)) | ||
executor2 (ExecutorDetails. (int 1002) (int 1002)) | ||
executor3 (ExecutorDetails. (int 1003) (int 1003)) | ||
executor4 (ExecutorDetails. (int 1004) (int 1004)) | ||
executor5 (ExecutorDetails. (int 1005) (int 1005)) | ||
executor6 (ExecutorDetails. (int 1006) (int 1006)) | ||
executor7 (ExecutorDetails. (int 1007) (int 1007)) | ||
executor8 (ExecutorDetails. (int 1008) (int 1008)) | ||
executor9 (ExecutorDetails. (int 1009) (int 1009)) | ||
executor10 (ExecutorDetails. (int 1010) (int 1010)) | ||
executor11 (ExecutorDetails. (int 1011) (int 1011)) | ||
executor12 (ExecutorDetails. (int 1012) (int 1012)) | ||
executor13 (ExecutorDetails. (int 1013) (int 1013)) | ||
topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"} | ||
(StormTopology.) | ||
4 | ||
{executor1 "spout1" | ||
executor2 "bolt1" | ||
executor3 "bolt2" | ||
executor4 "bolt3" | ||
executor5 "bolt4" | ||
executor6 "bolt5"}) | ||
topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} | ||
(StormTopology.) | ||
3 | ||
{executor11 "spout11" | ||
executor12 "bolt11" | ||
executor13 "bolt12"}) | ||
topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3"} | ||
(StormTopology.) | ||
4 | ||
{executor7 "spout7" | ||
executor8 "bolt8" | ||
executor9 "bolt9" | ||
executor10 "bolt10"}) | ||
topologies (Topologies. {"topology1" topology1 "topology2" topology2 "topology3" topology3}) | ||
executor->slot1 {executor1 (WorkerSlot. "supervisor1" (int 1)) | ||
executor2 (WorkerSlot. "supervisor1" (int 3)) | ||
executor3 (WorkerSlot. "supervisor1" (int 5)) | ||
executor4 (WorkerSlot. "supervisor2" (int 2)) | ||
executor5 (WorkerSlot. "supervisor3" (int 11)) | ||
executor6 (WorkerSlot. "supervisor4" (int 12)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the test data here actually violates the worker number constraints, you specified the worker number to 4 for topology1, but here it is assigned 6 different workers. |
||
} | ||
assignment1 (SchedulerAssignmentImpl. "topology1" executor->slot1) | ||
|
||
executor->slot2 {executor11 (WorkerSlot. "supervisor1" (int 7)) | ||
executor12 (WorkerSlot. "supervisor2" (int 4)) | ||
executor13 (WorkerSlot. "supervisor3" (int 13)) | ||
} | ||
assignment2 (SchedulerAssignmentImpl. "topology2" executor->slot2) | ||
|
||
cluster (Cluster. (nimbus/standalone-nimbus) | ||
{"supervisor1" supervisor1 "supervisor2" supervisor2 "supervisor3" supervisor3 "supervisor4" supervisor4} | ||
{"topology1" assignment1 "topology2" assignment2})] | ||
|
||
(is (= false (.needsScheduling cluster topology1))) | ||
(is (= false (.needsScheduling cluster topology2))) | ||
(is (= true (.needsScheduling cluster topology3))) | ||
|
||
(DefaultScheduler/default-schedule topologies cluster) | ||
;(EvenScheduler/schedule-topologies-evenly topologies cluster) | ||
|
||
(is (= 4 (count (set (.getUsedPorts cluster supervisor1))))) | ||
(is (= 3 (count (set (.getUsedPorts cluster supervisor2))))) | ||
(is (= 3 (count (set (.getUsedPorts cluster supervisor3))))) | ||
(is (= 3 (count (set (.getUsedPorts cluster supervisor4))))) | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all-slots
actually just contains theavailable-slots
, better keep the name consistent with its actual usage.