-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable DRPC request to be sent via HTTP and/or Thrift #733
base: master
Are you sure you want to change the base?
Changes from 10 commits
4dca40c
93b19a0
1418e88
ea36939
3b27367
e65e5ad
78efcc3
a7e5cc1
8d0c3d7
3090104
1890304
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,10 @@ | |
(:import [backtype.storm.daemon Shutdownable]) | ||
(:import [java.net InetAddress]) | ||
(:use [backtype.storm bootstrap config log]) | ||
(:use compojure.core) | ||
(:use ring.middleware.reload) | ||
(:use [ring.adapter.jetty :only [run-jetty]]) | ||
(:require [ring.util.response :as resp]) | ||
(:gen-class)) | ||
|
||
(bootstrap) | ||
|
@@ -97,35 +101,63 @@ | |
(.interrupt clear-thread)) | ||
))) | ||
|
||
(defn handle-request [handler] | ||
(fn [request] | ||
(handler request))) | ||
|
||
(defn webapp [handler] | ||
(->(def http-routes | ||
(routes | ||
(GET "/drpc/:func/:args" [:as {cookies :cookies} func args & m] | ||
(.execute handler func args)) | ||
(GET "/drpc/:func/" [:as {cookies :cookies} func & m] | ||
(.execute handler func "")) | ||
(GET "/drpc/:func" [:as {cookies :cookies} func & m] | ||
(.execute handler func "")))) | ||
(wrap-reload '[backtype.storm.daemon.drpc]) | ||
handle-request)) | ||
|
||
(defn launch-server! | ||
([] | ||
(let [conf (read-storm-config) | ||
worker-threads (int (conf DRPC-WORKER-THREADS)) | ||
worker-threads (int (conf DRPC-WORKER-THREADS)) | ||
queue-size (int (conf DRPC-QUEUE-SIZE)) | ||
service-handler (service-handler) | ||
drpc-http-port (if (conf DRPC-HTTP-PORT) (int (conf DRPC-HTTP-PORT)) 0) | ||
drpc-port (int (conf DRPC-PORT)) | ||
drpc-service-handler (service-handler) | ||
;; requests and returns need to be on separate thread pools, since calls to | ||
;; "execute" don't unblock until other thrift methods are called. So if | ||
;; 64 threads are calling execute, the server won't accept the result | ||
;; invocations that will unblock those threads | ||
handler-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT))) | ||
handler-server (if (> drpc-port 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot turn off Thrift unless we provide HTTP APIs for the spout/return bolt, and update them to know how to use the HTTP APIs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thrift port will be turned off only if drpc server was launched with configuration stating so. As the default, it will be enabled. |
||
(THsHaServer. (-> (TNonblockingServerSocket. drpc-port) | ||
(THsHaServer$Args.) | ||
(.workerThreads 64) | ||
(.executorService (ThreadPoolExecutor. worker-threads worker-threads | ||
(.executorService (ThreadPoolExecutor. worker-threads worker-threads | ||
60 TimeUnit/SECONDS (ArrayBlockingQueue. queue-size))) | ||
(.protocolFactory (TBinaryProtocol$Factory.)) | ||
(.processor (DistributedRPC$Processor. service-handler)) | ||
(.processor (DistributedRPC$Processor. drpc-service-handler)) | ||
)) | ||
) | ||
invoke-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-INVOCATIONS-PORT))) | ||
(THsHaServer$Args.) | ||
(.workerThreads 64) | ||
(.protocolFactory (TBinaryProtocol$Factory.)) | ||
(.processor (DistributedRPCInvocations$Processor. service-handler)) | ||
(.processor (DistributedRPCInvocations$Processor. drpc-service-handler)) | ||
))] | ||
|
||
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop handler-server) (.stop invoke-server)))) | ||
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] | ||
(if handler-server (.stop handler-server)) | ||
(.stop invoke-server)))) | ||
(log-message "Starting Distributed RPC servers...") | ||
(future (.serve invoke-server)) | ||
(.serve handler-server)))) | ||
(if (> drpc-http-port 0) | ||
(run-jetty (webapp drpc-service-handler) | ||
{:port drpc-http-port :join? false}) | ||
) | ||
(if handler-server | ||
(.serve handler-server) | ||
) | ||
))) | ||
|
||
(defn -main [] | ||
(launch-server!)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is in here twice by accident.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops I missed the trailing '/'. Looks like jetty is not handling '/' characters very well.